مقدمة
أباتشي كافكا منصة مفتوحة المصدر لمعالجة الأحداث والتدفقات الموزعة، مكتوبة بلغة جافا، ومُصممة لمعالجة تدفقات البيانات في الوقت الفعلي. تتميز بقابليتها للتوسع، مع إنتاجية عالية وتوافر عالٍ. طورتها مؤسسة أباتشي للبرمجيات، وحظيت كافكا باعتماد واسع النطاق لموثوقيتها وسهولة استخدامها وقدرتها على تحمل الأخطاء. تستخدمها أكبر المؤسسات العالمية لإدارة كميات كبيرة من البيانات بطريقة موزعة وفعالة.
في هذا البرنامج التعليمي، ستقوم بتنزيل وإعداد Apache Kafka. ستتعلم كيفية إنشاء المواضيع وحذفها، بالإضافة إلى إرسال واستقبال الأحداث باستخدام البرامج النصية المُرفقة. ستتعرف أيضًا على مشاريع مشابهة لها نفس الهدف، وكيفية مُقارنتها بـ Kafka.
المتطلبات الأساسية
- جهاز مزود بذاكرة وصول عشوائي (RAM) بسعة 4 غيغابايت على الأقل ومعالجَي معالجة مركزية (CPU). في حالة خادم أوبونتو
- تم تثبيت Java 8 أو أعلى على Droplet أو الجهاز المحلي لديك.
الخطوة 1 - تنزيل وتكوين Apache Kafka
في هذا القسم، ستقوم بتنزيل Apache Kafka وفك ضغطه على جهازك. لمزيد من الأمان، ستقوم بإعداده باستخدام حساب مستخدم خاص بك. بعد ذلك، ستقوم بتكوينه وتشغيله باستخدام KRaft.
أولاً، أنشئ مستخدمًا منفصلًا لتشغيل كافكا. أنشئ مستخدمًا باسم كافكا بتنفيذ الأمر التالي:
sudo adduser kafkaسيُطلب منك إدخال كلمة مرور حسابك. أدخل كلمة مرور قوية، وتخطَّ إدخال أي معلومات إضافية بالضغط على زر الإدخال (ENTER) في كل حقل.
أخيرًا، قم بالتبديل إلى مستخدم Kafka المحدد:
su kafkaبعد ذلك، ستُنزّل حزمة إصدار Kafka من صفحة التنزيلات الرسمية. وقت كتابة هذا التقرير، كان الإصدار الأحدث هو 3.7.0، وهو مُصمّم لـ Scala 2.13. إذا كنت تستخدم نظام macOS أو Linux، يُمكنك تنزيل Kafka باستخدام curl.
استخدم هذا الأمر لتنزيل Kafka ووضعه في /tmp:
curl -o /tmp/kafka.tgz https://downloads.apache.org/kafka/3.7.0/kafka_2.13-3.7.0.tgzستُخزَّن النسخة في ~/kafka، في المجلد الرئيسي. أنشئها بتشغيل الأمر:
mkdir ~/kafkaثم قم باستخراجه إلى ~/kafka عن طريق تشغيل:
tar -xzf /tmp/kafka.tgz -C ~/kafka --strip-components=1نظرًا لأن الأرشيف الذي قمت بتنزيله يحتوي على مجلد جذر يحمل نفس اسم إصدار Kafka، فإن الأمر –strip-components=1 سوف يتخطاه ويستخرج كل شيء فيه.
في وقت كتابة هذا التقرير، كان كافكا 3 آخر إصدار رئيسي يدعم نظامين لإدارة البيانات الوصفية: Apache ZooKeeper وKafka KRaft (اختصارًا لـ Kafka Raft). ZooKeeper هو مشروع مفتوح المصدر يوفر طريقة قياسية لتنسيق البيانات الموزعة للتطبيقات، وهو أيضًا من تطوير مؤسسة Apache Software Foundation.
مع ذلك، بدءًا من إصدار Kafka 3.3، تم دعم KRaft. KRaft هو نظام مصمم خصيصًا لتنسيق نسخ Kafka فقط، مما يُبسط عملية التثبيت ويسمح بتوسعة أكبر بكثير. مع KRaft، يتحمل Kafka المسؤولية الكاملة عن البيانات بدلاً من الاحتفاظ بالبيانات الوصفية الإدارية خارجيًا.
مع استمرار توفر دعم ZooKeeper، من المتوقع إزالته من Kafka 4 وما بعده. في هذا البرنامج التعليمي، ستُثبّت Kafka باستخدام KRaft.
يجب عليك إنشاء مُعرِّف فريد لمجموعة كافكا الجديدة. حاليًا، تتكون المجموعة من عقدة واحدة فقط. انتقل إلى المجلد الذي يوجد فيه كافكا الآن:
cd ~/kafkaيخزن Kafka مع KRaft تكوينه في config/kraft/server.properties، بينما ملف تكوين ZooKeeper هو config/server.properties.
قبل تشغيله لأول مرة، عليك تجاوز بعض الإعدادات الافتراضية. افتح الملف للتحرير بتشغيل الأمر التالي:
nano config/kraft/server.propertiesابحث عن الأسطر التالية:
...
############################# Log Basics #############################
# A comma separated list of directories under which to store log files
log.dirs=/tmp/kafka-logs
...يُحدد إعداد log.dirs مكان حفظ ملفات سجلات كافكا. افتراضيًا، يُخزّنها في /tmp/kafka-logs لضمان إمكانية الكتابة إليها، ولو مؤقتًا. استبدل القيمة بالمسار المُحدد:
... ############################# Log Basics ############################# # A comma separated list of directories under which to store log files log.dirs=/home/kafka/kafka-logs ...
بما أنك أنشأتَ مستخدمًا منفصلًا لكافكا، فستضع مسار مجلد السجل ضمن مجلد المستخدم الرئيسي. إذا لم يكن موجودًا، فسينشئه كافكا. عند الانتهاء، احفظ الملف وأغلقه.
الآن بعد أن قمت بتكوين Kafka، قم بتشغيل الأمر التالي لإنشاء معرف مجموعة عشوائي:
KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"ثم قم بإنشاء مساحة تخزين لملفات السجل عن طريق تشغيل الأمر التالي وإدخال المعرف:
bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c config/kraft/server.propertiesسيكون الناتج:
Output
Formatting /home/kafka/kafka-logs with metadata.version 3.7-IV4.أخيرًا، يمكنك بدء تشغيل خادم Kafka للمرة الأولى:
bin/kafka-server-start.sh config/kraft/server.propertiesسيكون الناتج النهائي مشابهًا لهذا:
Output
...
[2024-02-26 10:38:26,889] INFO Awaiting socket connections on 0.0.0.0:9092. (kafka.network.DataPlaneAcceptor)
[2024-02-26 10:38:26,890] INFO [BrokerServer id=1] Waiting for all of the authorizer futures to be completed (kafka.server.BrokerServer)
[2024-02-26 10:38:26,890] INFO [BrokerServer id=1] Finished waiting for all of the authorizer futures to be completed (kafka.server.BrokerServer)
[2024-02-26 10:38:26,890] INFO [BrokerServer id=1] Waiting for all of the SocketServer Acceptors to be started (kafka.server.BrokerServer)
[2024-02-26 10:38:26,890] INFO [BrokerServer id=1] Finished waiting for all of the SocketServer Acceptors to be started (kafka.server.BrokerServer)
[2024-02-26 10:38:26,890] INFO [BrokerServer id=1] Transition from STARTING to STARTED (kafka.server.BrokerServer)
[2024-02-26 10:38:26,891] INFO Kafka version: 3.7.0 (org.apache.kafka.common.utils.AppInfoParser)
[2024-02-26 10:38:26,891] INFO Kafka commitId: 5e3c2b738d253ff5 (org.apache.kafka.common.utils.AppInfoParser)
[2024-02-26 10:38:26,891] INFO Kafka startTimeMs: 1708943906890 (org.apache.kafka.common.utils.AppInfoParser)
[2024-02-26 10:38:26,892] INFO [KafkaRaftServer nodeId=1] Kafka Server started (kafka.server.KafkaRaftServer)يُظهر الإخراج أن Kafka تم تهيئة البرنامج بنجاح باستخدام KRaft ويقبل الاتصالات على 0.0.0.0:9092.
تخرج العملية عند الضغط على CTRL + C. نظرًا لأنه ليس من المفضل تشغيل Kafka مع وجود جلسة مفتوحة، ففي الخطوة التالية ستقوم بإنشاء خدمة لتشغيل Kafka في الخلفية.
الخطوة 2 - إنشاء خدمة systemd لـ Kafka
في هذا القسم، ستنشئ خدمة Systemd لتشغيل Kafka في الخلفية باستمرار. يمكن تشغيل خدمات Systemd وإيقافها وإعادة تشغيلها باستمرار.
يمكنك تخزين إعدادات الخدمة في ملف يُسمى code-server.service في مجلد /lib/systemd/system، حيث يخزن systemd خدماته. أنشئه باستخدام محرر النصوص الخاص بك:
sudo nano /etc/systemd/system/kafka.serviceأضف الأسطر التالية:
[Unit]
Description=kafka-server
[Service]
Type=simple
User=kafka
ExecStart=/bin/sh -c '/home/kafka/kafka/bin/kafka-server-start.sh /home/kafka/kafka/config/kraft/server.properties > /home/kafka/kafka/kafka.log 2>&1'
ExecStop=/home/kafka/kafka/bin/kafka-server-stop.sh
Restart=on-abnormal
[Install]
WantedBy=multi-user.targetهنا، حدد وصف الخدمة أولاً. ثم في حقل [الخدمة]، حدد نوع الخدمة (يعني بسيط أن الأمر يجب تنفيذه ببساطة) وأدخل الأمر الذي سيتم تنفيذه. حدد أيضًا أن المستخدم الذي سيتم تشغيل الخدمة به هو kafka، وأنه يجب إعادة تشغيل الخدمة تلقائيًا عند إغلاق Kafka.
يُرشد قسم [التثبيت] النظام لبدء هذه الخدمة عند تسجيل الدخول إلى الخادم. عند الانتهاء، احفظ الملف وأغلقه.
ابدأ خدمة Kafka عن طريق تشغيل الأمر التالي:
sudo systemctl start kafkaتأكد من بدء التشغيل بشكل صحيح من خلال عرض حالته:
sudo systemctl status kafkaسوف ترى إخراجًا مشابهًا لما يلي:
Output
● kafka.service - kafka-server
Loaded: loaded (/etc/systemd/system/kafka.service; disabled; preset: enabled)
Active: active (running) since Mon 2024-02-26 11:17:30 UTC; 2min 40s ago
Main PID: 1061 (sh)
Tasks: 94 (limit: 4646)
Memory: 409.2M
CPU: 10.491s
CGroup: /system.slice/kafka.service
├─1061 /bin/sh -c "/home/kafka/kafka/bin/kafka-server-start.sh /home/kafka/kafka/config/kraft/server.properties > /home/kafka/kafka/kafka.log 2>&1"
└─1062 java -Xmx1G -Xms1G -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -XX:MaxInlineLevel=15 -Djava.awt.headless=true "-Xlog:gc*:file=/home/kafka/kafka/bin/../logs/kaf>
Feb 26 11:17:30 kafka-test1 systemd[1]: Started kafka.service - kafka-server.لبدء تشغيل Kafka تلقائيًا بعد إعادة تشغيل الخادم، قم بتمكين الخدمة عن طريق تشغيل الأمر التالي:
sudo systemctl enable kafkaفي هذه المرحلة، تكون قد أنشأتَ وفعّلتَ خدمة systemd لـ Kafka، بحيث تبدأ مع كل تشغيل للخادم. بعد ذلك، ستتعلم كيفية إنشاء المواضيع وحذفها في Kafka، بالإضافة إلى كيفية إنشاء الرسائل النصية ومعالجتها باستخدام البرامج النصية المتاحة.
الخطوة 3 - إنتاج الرسائل الموضوعية واستهلاكها
بعد إعداد خادم كافكا، ستتعرف على المواضيع وكيفية إدارتها باستخدام البرامج النصية المُرفقة. ستتعلم أيضًا كيفية إرسال واستقبال الرسائل من موضوع معين.
كما هو موضح في مقالة "تدفق الأحداث"، يرتبط نشر الرسائل واستلامها بالموضوعات. يمكن ربط الموضوع بالفئة التي تنتمي إليها الرسالة.
يمكن استخدام البرنامج النصي kafka-topics.sh لإدارة المواضيع في كافكا عبر سطر الأوامر. لإنشاء موضوع باسم first-topic، شغّل الأمر التالي:
bin/kafka-topics.sh --create --topic first-topic --bootstrap-server localhost:9092تتطلب جميع نصوص Kafka المقدمة منك تحديد عنوان الخادم باستخدام --bootstrap-server.
سيكون الناتج:
Output
Created topic first-topic.لإدراج جميع المواضيع الموجودة، استبدل –create بـ –list:
bin/kafka-topics.sh --list --bootstrap-server localhost:9092ترى الموضوع الذي قمت بإنشائه:
Output
first-topicيمكنك الحصول على معلومات وإحصائيات مفصلة حول الموضوع عن طريق تمرير --describe:
bin/kafka-topics.sh --describe --topic first-topic --bootstrap-server localhost:9092سيكون الناتج مثل هذا:
Output
Topic: first-topic TopicId: VtjiMIUtRUulwzxJL5qVjg PartitionCount: 1 ReplicationFactor: 1 Configs: segment.bytes=1073741824
Topic: first-topic Partition: 0 Leader: 1 Replicas: 1 Isr: 1يُحدد السطر الأول اسم الموضوع ومعرّفه ومعامل التكرار، وهو 1 لأن الموضوع موجود فقط على الجهاز الحالي. أما السطر الثاني، فقد تم تقصيره عمدًا، ويعرض معلومات حول القسم الأول (والوحيد) من الموضوع. يتيح لك كافكا تقسيم الموضوع، مما يعني إمكانية توزيع أجزاء مختلفة منه على خوادم مختلفة، مما يزيد من قابلية التوسع. هنا، يوجد قسم واحد فقط.
بعد إنشاء موضوع، يمكنك إنشاء رسائل له باستخدام البرنامج النصي kafka-console-producer.sh. نفّذ الأمر التالي لبدء تشغيل المُنتِج:
bin/kafka-console-producer.sh --topic first-topic --bootstrap-server localhost:9092سوف ترى إشعارًا فارغًا:
>ينتظر مزود الخدمة رسالتك النصية. أدخل بيانات الاختبار واضغط على زر الإدخال. سيظهر الإشعار كما يلي:
>test
>ينتظر المُنتِج الآن الرسالة التالية، مما يعني أن الرسالة السابقة قد وصلت بنجاح إلى كافكا. يمكنك إدخال أي عدد من الرسائل للاختبار. للخروج من المُنتِج، اضغط CTRL+C.
لاسترجاع رسائل المواضيع، ستحتاج إلى مستهلك. يوفر كافكا مستهلكًا بسيطًا بصيغة kafka-console-consumer.sh. شغّله بتنفيذ:
bin/kafka-console-consumer.sh --topic first-topic --bootstrap-server localhost:9092مع ذلك، لن يكون هناك أي إخراج. هذا لأن المستخدم يُرسل بيانات من الموضوع، ولا يتم إنتاج أو إرسال أي شيء في الوقت الحالي. لقراءة الرسائل التي أنتجتها قبل بدء المستخدم، عليك قراءة الموضوع من البداية بتشغيل الأمر التالي:
bin/kafka-console-consumer.sh --topic first-topic --from-beginning --bootstrap-server localhost:9092يقوم المستهلك بإعادة تشغيل جميع أحداث الموضوع وجلب الرسائل:
Outputtest
...كما هو الحال مع المنشئ، اضغط على CTRL+C للخروج.
للتحقق من أن المستهلك يُدخِل البيانات بالفعل، افتحه في جلسة طرفية منفصلة. افتح جلسة SSH ثانوية وشغّل المستهلك بالتكوين الافتراضي:
bin/kafka-console-consumer.sh --topic first-topic --bootstrap-server localhost:9092في الجلسة الأولية، قم بتشغيل المنشئ:
bin/kafka-console-producer.sh --topic first-topic --bootstrap-server localhost:9092ثم أدخل الرسائل المطلوبة:
>second test
>third test
>سوف ترى على الفور أنهم تلقوها من قبل المستهلك:
Output
second test
third testبعد اكتمال الاختبار، قم بإنهاء كل من المنتج والمستهلك.
لحذف الموضوع الأول، مرر --delete إلى kafka-topics.sh:
bin/kafka-topics.sh --delete --topic first-topic --bootstrap-server localhost:9092لن يكون هناك أي نتيجة. يمكنك سرد المواضيع للتحقق من حذفها:
bin/kafka-topics.sh --list --bootstrap-server localhost:9092سيكون الناتج:
Output
__consumer_offsets__Consumer_Equivalent هو موضوع داخلي لـ Kafka يخزن مقدار الوقت الذي قرأ فيه المستهلك موضوعًا ما.
في هذه المرحلة، أنشأتَ موضوعًا في Kafka وأنشأتَ رسائل فيه. بعد ذلك، استهلكتَ الرسائل باستخدام البرنامج النصي المُرفق، واستلمتَها فورًا. بعد ذلك، ستتعرف على كيفية مُقارنة Kafka بوسطاء الأحداث الآخرين والبرامج المُشابهة.
مقارنة مع هياكل معمارية مماثلة
يُعتبر Apache Kafka الحل الأمثل لحالات استخدام بث الأحداث. ومع ذلك، يُستخدم Apache Pulsar وRabbitMQ أيضًا على نطاق واسع، ويتميّزان بكونهما خيارين متعددي الاستخدامات، وإن كانا مختلفين في نهجهما.
الفرق الرئيسي بين قائمة انتظار الرسائل وتدفق الأحداث هو أن المهمة الأساسية للأولى هي إيصال الرسائل إلى المستخدمين بأسرع ما يمكن، بغض النظر عن ترتيبها. عادةً ما تخزن هذه الأنظمة الرسائل في الذاكرة حتى يتم تأكيد استلامها من المستخدمين. يُعدّ تصفية الرسائل وتوجيهها جانبين مهمين، إذ يُمكن للمستخدمين إظهار اهتمامهم بفئات محددة من البيانات. يُعدّ RabbitMQ مثالاً واضحاً على نظام مراسلة تقليدي، حيث يُمكن لعدة مستخدمين الاشتراك في موضوع واحد واستلام نسخ متعددة من الرسالة.
من ناحية أخرى، يُركز بث الأحداث على الاستمرارية. يجب أرشفة الأحداث وصيانتها ومعالجتها مرة واحدة. ليس من المهم توجيهها إلى مستهلكين محددين، فالفكرة هي أن جميع المستهلكين يُعالجون الأحداث بنفس الطريقة.
أباتشي بولسار هو نظام مراسلة مفتوح المصدر، طورته مؤسسة أباتشي للبرمجيات، ويدعم بث الأحداث. بخلاف كافكا، الذي بُني عليه من الصفر، بدأ بولسار كحل تقليدي لطوابير انتظار الرسائل، ثم اكتسب لاحقًا قدرات بث الأحداث. لذا، يُعد بولسار مفيدًا عند الحاجة إلى الجمع بين النهجين، دون الحاجة إلى نشر تطبيقات منفصلة.
نتيجة
لديك الآن Apache Kafka يعمل بأمان في خلفية خادمك، مُهيأً كخدمة نظام. كما تعلمتَ كيفية التعامل مع المواضيع من سطر الأوامر، بالإضافة إلى إنشاء الرسائل ومعالجتها. لكن الميزة الرئيسية لـ Kafka تكمن في تنوع برامج العملاء التي تُمكّنك من دمجه في تطبيقاتك.









