مقدمة
أباتشي كافكا منصة مفتوحة المصدر لمعالجة الأحداث والتدفقات الموزعة، مكتوبة بلغة جافا، ومُصممة لمعالجة تدفقات البيانات في الوقت الفعلي. تتميز بقابليتها للتوسع، مع إنتاجية عالية وتوافر عالٍ. طورتها مؤسسة أباتشي للبرمجيات، وحظيت كافكا باعتماد واسع النطاق لموثوقيتها وسهولة استخدامها وقدرتها على تحمل الأخطاء. تستخدمها أكبر المؤسسات العالمية لإدارة كميات كبيرة من البيانات بطريقة موزعة وفعالة.
في هذا البرنامج التعليمي، ستقوم بتنزيل وإعداد Apache Kafka. ستتعلم كيفية إنشاء المواضيع وحذفها، بالإضافة إلى إرسال واستقبال الأحداث باستخدام البرامج النصية المُرفقة. ستتعرف أيضًا على مشاريع مشابهة لها نفس الهدف، وكيفية مُقارنتها بـ Kafka.
المتطلبات الأساسية
- جهاز يحتوي على 4 جيجابايت على الأقل من ذاكرة الوصول العشوائي و2 وحدة معالجة مركزية.
- تم تثبيت Java 8 أو أعلى على Droplet أو الجهاز المحلي لديك.
الخطوة 1 - تنزيل وتكوين Apache Kafka
في هذا القسم، ستقوم بتنزيل Apache Kafka وفك ضغطه على جهازك. لمزيد من الأمان، ستقوم بإعداده باستخدام حساب مستخدم خاص بك. بعد ذلك، ستقوم بتكوينه وتشغيله باستخدام KRaft.
أولاً، أنشئ مستخدمًا منفصلًا لتشغيل كافكا. بتنفيذ الأمر التالي، ستنشئ مستخدمًا باسم كافكا يخلق:
sudo adduser kafkaسيُطلب منك إدخال كلمة مرور حسابك. أدخل كلمة مرور قوية واضغط على Enter. يدخل بالنسبة لكل حقل، تخطي ملء المعلومات الإضافية.
أخيرًا، قم بالتبديل إلى مستخدم Kafka المحدد:
su kafkaبعد ذلك، ستُنزّل حزمة إصدار Kafka من صفحة التنزيلات الرسمية. الإصدار الأحدث حاليًا هو 3.6.1. إذا كنت تستخدم نظام macOS أو Linux، يُمكنك تنزيل Kafka باستخدام curl.
استخدم هذا الأمر لتنزيل Kafka وتثبيته في /tmp مكان:
curl -o /tmp/kafka.tgz https://dlcdn.apache.org/kafka/3.6.1/kafka_2.13-3.6.1.tgzلديك الإصدار أدناه ~/كافكاستحفظه في المجلد الرئيسي. أنشئه بتشغيل:
mkdir ~/kafkaثم، عن طريق تشغيله، ~/كافكا يستخرج:
tar -xzf /tmp/kafka.tgz -C ~/kafka --strip-components=1نظرًا لأن الأرشيف الذي قمت بتنزيله يحتوي على مجلد جذر يحمل نفس اسم إصدار Kafka، فإن الأمر –strip-components=1 سوف يتخطاه ويستخرج كل شيء فيه.
في وقت كتابة هذا التقرير، كان كافكا 3 آخر إصدار رئيسي يدعم نظامين لإدارة البيانات الوصفية: Apache ZooKeeper وKafka KRaft (اختصارًا لـ Kafka Raft). ZooKeeper هو مشروع مفتوح المصدر يوفر طريقة قياسية لتنسيق البيانات الموزعة للتطبيقات، وهو أيضًا من تطوير مؤسسة Apache Software Foundation.
مع ذلك، بدءًا من إصدار Kafka 3.3، تم دعم KRaft. KRaft هو نظام مصمم خصيصًا لتنسيق نسخ Kafka فقط، مما يُبسط عملية التثبيت ويسمح بتوسعة أكبر بكثير. مع KRaft، يتحمل Kafka المسؤولية الكاملة عن البيانات بدلاً من الاحتفاظ بالبيانات الوصفية الإدارية خارجيًا.
مع استمرار توفر دعم ZooKeeper، من المتوقع إزالته من Kafka 4 وما بعده. في هذا البرنامج التعليمي، ستُثبّت Kafka باستخدام KRaft.
يجب عليك إنشاء مُعرِّف فريد لمجموعة كافكا الجديدة. حاليًا، ستتكون من عقدة واحدة فقط. انتقل إلى المجلد الذي يتواجد فيه كافكا حاليًا:
cd ~/kafkaكافكا مع كرافت يكوّن نفسه في config/kraft/server.properties يحفظ، بينما ملف التكوين تكوين ZooKeeper/خصائص الخادم إنها.
قبل تشغيله لأول مرة، عليك تجاوز بعض الإعدادات الافتراضية. افتح الملف للتحرير بتشغيل الأمر التالي:
nano config/kraft/server.properties...
############################# Log Basics #############################
# A comma separated list of directories under which to store log files
log.dirs=/tmp/kafka-logs
...إعدادات سجلات السجل يحدد مكان حفظ ملفات سجل كافكا. افتراضيًا، يتم تخزينها في /tmp/kafka-logs يحفظها، إذ يُضمن إمكانية كتابتها، ولو مؤقتًا. استبدل القيمة بالمسار المحدد:
...
############################# Log Basics #############################
# A comma separated list of directories under which to store log files
log.dirs=/home/kafka/kafka-logs
...بما أنك أنشأتَ مستخدمًا منفصلًا لكافكا، فستضع مسار مجلد السجل ضمن مجلد المستخدم الرئيسي. إذا لم يكن موجودًا، فسينشئه كافكا. عند الانتهاء، احفظ الملف وأغلقه.
الآن بعد أن قمت بتكوين Kafka، قم بتشغيل الأمر التالي لإنشاء معرف مجموعة عشوائي:
KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"ثم قم بإنشاء مساحة تخزين لملفات السجل عن طريق تشغيل الأمر التالي وإدخال المعرف:
bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c config/kraft/server.propertiesسيكون الناتج:
Output
Formatting /home/kafka/kafka-logs with metadata.version 3.6-IV2.أخيرًا، يمكنك بدء تشغيل خادم Kafka للمرة الأولى:
bin/kafka-server-start.sh config/kraft/server.propertiesسيكون الناتج النهائي مشابهًا لهذا:
Output
...
[2024-02-26 10:38:26,889] INFO Awaiting socket connections on 0.0.0.0:9092. (kafka.network.DataPlaneAcceptor)
[2024-02-26 10:38:26,890] INFO [BrokerServer id=1] Waiting for all of the authorizer futures to be completed (kafka.server.BrokerServer)
[2024-02-26 10:38:26,890] INFO [BrokerServer id=1] Finished waiting for all of the authorizer futures to be completed (kafka.server.BrokerServer)
[2024-02-26 10:38:26,890] INFO [BrokerServer id=1] Waiting for all of the SocketServer Acceptors to be started (kafka.server.BrokerServer)
[2024-02-26 10:38:26,890] INFO [BrokerServer id=1] Finished waiting for all of the SocketServer Acceptors to be started (kafka.server.BrokerServer)
[2024-02-26 10:38:26,890] INFO [BrokerServer id=1] Transition from STARTING to STARTED (kafka.server.BrokerServer)
[2024-02-26 10:38:26,891] INFO Kafka version: 3.6.1 (org.apache.kafka.common.utils.AppInfoParser)
[2024-02-26 10:38:26,891] INFO Kafka commitId: 5e3c2b738d253ff5 (org.apache.kafka.common.utils.AppInfoParser)
[2024-02-26 10:38:26,891] INFO Kafka startTimeMs: 1708943906890 (org.apache.kafka.common.utils.AppInfoParser)
[2024-02-26 10:38:26,892] INFO [KafkaRaftServer nodeId=1] Kafka Server started (kafka.server.KafkaRaftServer)يُظهر الإخراج أن Kafka قد نجح في التهيئة باستخدام KRaft ويقوم بإنشاء اتصالات في 0.0.0.0:9092 يقبل.
متى CTRL + C اضغط على "،" لإنهاء العملية. بما أنه من غير المُفضّل تشغيل Kafka أثناء فتح جلسة، في الخطوة التالية، ستُنشئ خدمة لتشغيل Kafka في الخلفية.
الخطوة 2 - إنشاء خدمة systemd لـ Kafka
في هذا القسم، ستنشئ خدمة Systemd لتشغيل Kafka في الخلفية باستمرار. يمكن تشغيل خدمات Systemd وإيقافها وإعادة تشغيلها باستمرار.
ضع تكوين الخدمة في ملف يسمى خدمة خادم الكود في القائمة /lib/systemd/system أنت تحفظ، حيث نظام دي يُخزّن خدماتك. أنشئه باستخدام مُحرّر النصوص الخاص بك:
sudo nano /etc/systemd/system/kafka.serviceأضف الأسطر التالية:
[Unit]
Description=kafka-server
[Service]
Type=simple
User=kafka
ExecStart=/bin/sh -c '/home/kafka/kafka/bin/kafka-server-start.sh /home/kafka/kafka/config/kraft/server.properties > /home/kafka/kafka/kafka.log 2>&1'
ExecStop=/home/kafka/kafka/bin/kafka-server-stop.sh
Restart=on-abnormal
[Install]
WantedBy=multi-user.targetهنا، عليك أولاً تحديد وصف الخدمة. ثم في [خدمة] تُحدد نوع الخدمة (يعني بسيط أن الأمر يجب أن يُشغّل ببساطة) وتُقدّم الأمر الذي سيتم تشغيله. كما تُحدّد المستخدم الذي سيتم تشغيله كـ كافكا هو، ويجب إعادة تشغيل الخدمة تلقائيًا في حالة خروج Kafka.
قسم [ثَبَّتَ] يُخبر النظام ببدء هذه الخدمة عند إمكانية تسجيل الدخول إلى خادمك. عند الانتهاء، احفظ الملف وأغلقه.
ابدأ خدمة Kafka عن طريق تشغيل الأمر التالي:
sudo systemctl start kafkaتأكد من بدء التشغيل بشكل صحيح من خلال عرض حالته:
sudo systemctl status kafkaسوف ترى إخراجًا مشابهًا لما يلي:
Output
● kafka.service - kafka-server
Loaded: loaded (/etc/systemd/system/kafka.service; disabled; preset: enabled)
Active: active (running) since Mon 2024-02-26 11:17:30 UTC; 2min 40s ago
Main PID: 1061 (sh)
Tasks: 94 (limit: 4646)
Memory: 409.2M
CPU: 10.491s
CGroup: /system.slice/kafka.service
├─1061 /bin/sh -c "/home/kafka/kafka/bin/kafka-server-start.sh /home/kafka/kafka/config/kraft/server.properties > /home/kafka/kafka/kafka.log 2>&1"
└─1062 java -Xmx1G -Xms1G -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -XX:MaxInlineLevel=15 -Djava.awt.headless=true "-Xlog:gc*:file=/home/kafka/kafka/bin/../logs/kaf>
Feb 26 11:17:30 kafka-test1 systemd[1]: Started kafka.service - kafka-server.لبدء تشغيل Kafka تلقائيًا بعد إعادة تشغيل الخادم، قم بتمكين الخدمة عن طريق تشغيل الأمر التالي:
sudo systemctl enable kafkaفي هذه المرحلة، تكون قد أنشأتَ وفعّلتَ خدمة systemd لـ Kafka، بحيث تبدأ مع كل تشغيل للخادم. بعد ذلك، ستتعلم كيفية إنشاء المواضيع وحذفها في Kafka، بالإضافة إلى كيفية إنشاء الرسائل النصية ومعالجتها باستخدام البرامج النصية المتاحة.
الخطوة 3 - إنتاج الرسائل الموضوعية واستهلاكها
بعد إعداد خادم كافكا، ستتعرف على المواضيع وكيفية إدارتها باستخدام البرامج النصية المُرفقة. ستتعلم أيضًا كيفية إرسال واستقبال الرسائل من موضوع معين. كما هو موضح في مقال "تدفق الأحداث"، يرتبط نشر الرسائل واستقبالها بالمواضيع. يمكن ربط الموضوع بفئة تنتمي إليها الرسالة.
من النص المقدم kafka-topics.sh يمكنك إدارة المواضيع في كافكا عبر واجهة سطر الأوامر يستخدم لإنشاء موضوع يسمى الموضوع الأول قم بتشغيل الأمر التالي:
bin/kafka-topics.sh --create --topic first-topic --bootstrap-server localhost:9092تتطلب جميع نصوص Kafka المقدمة تحديد عنوان الخادم باستخدام --bootstrap-server حدد.
سيكون الناتج:
Output
Created topic first-topic.لإدراج جميع المواضيع المتاحة، بدلاً من --يخلق في --قائمة يرسل:
bin/kafka-topics.sh --list --bootstrap-server localhost:9092ترى الموضوع الذي قمت بإنشائه:
Output first-topic
يمكنك العثور على معلومات وإحصائيات مفصلة حول الموضوع بالذهاب إلى --يصف يحصل:
bin/kafka-topics.sh --describe --topic first-topic --bootstrap-server localhost:9092سيكون الناتج مثل هذا:
Output
Topic: first-topic TopicId: VtjiMIUtRUulwzxJL5qVjg PartitionCount: 1 ReplicationFactor: 1 Configs: segment.bytes=1073741824
Topic: first-topic Partition: 0 Leader: 1 Replicas: 1 Isr: 1
يُحدد السطر الأول اسم الموضوع ومعرّفه ومعامل التكرار، وهو 1 لأن الموضوع موجود فقط على الجهاز الحالي. أما السطر الثاني، فقد تم تقصيره عمدًا، ويعرض معلومات حول القسم الأول (والوحيد) من الموضوع. يتيح لك كافكا تقسيم الموضوع، مما يعني إمكانية توزيع أجزاء مختلفة منه على خوادم مختلفة، مما يزيد من قابلية التوسع. هنا، يوجد قسم واحد فقط.
بعد إنشاء موضوع، يمكنك إنشاء رسائل له باستخدام البرنامج النصي kafka-console-producer.sh. نفّذ الأمر التالي لبدء تشغيل المُنتِج:
bin/kafka-console-producer.sh --topic first-topic --bootstrap-server localhost:9092سوف ترى إشعارًا فارغًا:
>المنتج ينتظر رسالتك النصية. أدخل الاختبار و يدخل اضغط على . سيظهر الإشعار كما يلي:
>test
>ينتظر المُنتِج الآن الرسالة التالية، مما يعني أن الرسالة السابقة قد وصلت بنجاح إلى كافكا. يمكنك إدخال أي عدد من الرسائل للاختبار. للخروج من المُنتِج، CTRL+C يضعط .
لاسترجاع الرسائل من موضوع ما، تحتاج إلى مستهلك. يوفر كافكا مستهلكًا بسيطًا بالشكل kafka-console-consumer.sh إنه يقدم. قم بتشغيله عن طريق تشغيل:
bin/kafka-console-consumer.sh --topic first-topic --bootstrap-server localhost:9092مع ذلك، لن يكون هناك أي إخراج. هذا لأن المستخدم يُرسل بيانات من الموضوع، ولا يتم إنتاج أو إرسال أي شيء في الوقت الحالي. لقراءة الرسائل التي أنتجتها قبل بدء المستخدم، عليك قراءة الموضوع من البداية بتشغيل الأمر التالي:
bin/kafka-console-consumer.sh --topic first-topic --from-beginning --bootstrap-server localhost:9092يقوم المستهلك بإعادة تشغيل جميع أحداث الموضوع وجلب الرسائل:
Outputtest
...مثل البناء، للخروج، CTRL+C يضعط .
للتحقق من أن المستهلك يُدخِل البيانات بالفعل، افتحه في جلسة طرفية منفصلة. افتح جلسة SSH ثانوية وشغّل المستهلك بالتكوين الافتراضي:
bin/kafka-console-consumer.sh --topic first-topic --bootstrap-server localhost:9092في الجلسة الأولية، قم بتشغيل المنشئ:
bin/kafka-console-producer.sh --topic first-topic --bootstrap-server localhost:9092ثم أدخل الرسائل المطلوبة:
>second test
>third test
>ستشاهد على الفور أنه تم استلامها من قبل المستهلك:
Output
second test
third test
بعد اكتمال الاختبار، قم بإنهاء كل من المنتج والمستهلك.
لحذف الموضوع الأول، --يمسح ل kafka-topics.sh تحويل:
bin/kafka-topics.sh --delete --topic first-topic --bootstrap-server localhost:9092لن يكون هناك أي نتيجة. يمكنك سرد المواضيع للتحقق من حذفها:
bin/kafka-topics.sh --list --bootstrap-server localhost:9092سيكون الناتج:
Output
__consumer_offsets
__Consumer_Equivalent هو موضوع كافكا داخلي يُخزّن مدة قراءة المستخدم للموضوع. في هذه المرحلة، تكون قد أنشأت موضوع كافكا وأنشأت رسائل فيه. بعد ذلك، استهلكت الرسائل باستخدام البرنامج النصي المُقدّم، واستلمتها أخيرًا في الوقت الفعلي. في الخطوة التالية، ستتعرف على كيفية مقارنة كافكا بوسطاء الأحداث الآخرين والبرامج المشابهة.
مقارنة مع هياكل معمارية مماثلة
يُعتبر Apache Kafka حلاً فعالاً لحالات استخدام بث الأحداث. ومع ذلك، يُستخدم Apache Pulsar وRabbitMQ على نطاق واسع أيضاً، ويبرزان كخيارين متعددي الاستخدامات، وإن كانا يختلفان في نهجهما. يتمثل الاختلاف الرئيسي بين قوائم انتظار الرسائل وتدفقات الأحداث في أن المهمة الرئيسية للأولى هي توصيل الرسائل إلى العملاء بأسرع ما يمكن، بغض النظر عن ترتيبها. عادةً ما تخزن هذه الأنظمة الرسائل في الذاكرة حتى يتم تأكيد استلامها من قِبل المستخدمين. تُعد تصفية الرسائل وتوجيهها جوانب مهمة، حيث يمكن للمستخدمين إظهار اهتمامهم بفئات محددة من البيانات. يُعد RabbitMQ مثالاً واضحاً على نظام مراسلة تقليدي، حيث يمكن لعدة مستخدمين الاشتراك في موضوع ما وتلقي نسخ متعددة من الرسالة. من ناحية أخرى، يركز بث الأحداث على الاستمرارية. يجب أرشفة الأحداث، والحفاظ عليها بشكل منظم، ومعالجتها مرة واحدة. توجيهها إلى مستخدمين محددين ليس مهماً، فالفكرة هي أن جميع المستخدمين يعالجون الأحداث بنفس الطريقة. Apache Pulsar هو نظام مراسلة مفتوح المصدر طورته مؤسسة Apache Software Foundation ويدعم بث الأحداث. بخلاف كافكا، الذي بُني عليه من الصفر، بدأ بولسار كحل تقليدي لطوابير انتظار الرسائل، ثم اكتسب لاحقًا قدرات بث الأحداث. لذا، يُعد بولسار مفيدًا عند الحاجة إلى الجمع بين الطريقتين، دون الحاجة إلى نشر تطبيقات منفصلة.
نتيجة
لديك الآن Apache Kafka يعمل بأمان في خلفية خادمك، مُهيأً كخدمة نظام. كما تعلمتَ كيفية التعامل مع المواضيع من سطر الأوامر، بالإضافة إلى إنشاء الرسائل ومعالجتها. لكن الميزة الرئيسية لـ Kafka تكمن في تنوع برامج العملاء التي تُمكّنك من دمجه في تطبيقاتك.









