مقدمة
أباتشي كافكا منصة مفتوحة المصدر لمعالجة الأحداث والتدفقات الموزعة، مكتوبة بلغة جافا، ومُصممة لمعالجة تدفقات البيانات في الوقت الفعلي. تتميز بقابليتها للتوسع، مع إنتاجية عالية وتوافر عالٍ. وهي مصممة لدعم مئات العقد في كل مجموعة.
في هذا البرنامج التعليمي، ستُنشئ مجموعة Kafka باستخدام بروتوكول إجماع KRaft. ستتعلم كيفية تكوين العُقد لتكون جزءًا من مجموعة، وكيفية تخصيص أقسام المواضيع لكل عُقدة. ستتعلم أيضًا كيفية تخصيص المواضيع لوسطاء مُحددين في المجموعة.
المتطلبات الأساسية
- ثلاث قطرات مع ما لا يقل عن 4 جيجابايت من ذاكرة الوصول العشوائي و 2 وحدة معالجة مركزية
- اسم نطاق مسجل بالكامل مع ثلاثة نطاقات فرعية تشير إلى القطرات الثلاث.
- تم تثبيت Apache Kafka وتكوينه على Droplets الخاص بك.
الخطوة 1 - تكوين عقد Kafka
في هذه الخطوة، ستقوم بتكوين خوادم Kafka الثلاثة التي أنشأتها كجزء من المتطلبات الأساسية لتكون جزءًا من مجموعة KRaft نفسها. باستخدام KRaft، يمكن للعقد نفسها تنظيم وتنفيذ المهام الإدارية دون تكلفة إضافية أو اعتماد على Apache ZooKeeper.
تكوين العقدة الأولى
ستبدأ بتكوين العقدة الأولى. أولًا، أوقف الخدمة على أول نقطة وصول بتنفيذ الأمر التالي:
sudo systemctl stop kafkaباعتبارك مستخدم Kafka، انتقل إلى الدليل الذي يوجد فيه Kafka وافتح ملف التكوين الخاص به للتحرير عن طريق تشغيل الأمر التالي:
vi /config/kraft/server.propertiesابحث عن الأسطر التالية:
...
# The role of this server. Setting this puts us in KRaft mode
process.roles=broker,controller
# The node id associated with this instance's roles
node.id=1
# The connect string for the controller quorum
controller.quorum.voters=1@localhost:9092
...تُهيئ هذه المعلمات الثلاثة عقدة كافكا للعمل كوسيط ووحدة تحكم في آنٍ واحد، أي أنها تستقبل البيانات وتستهلكها (الوسيط) وتُنفّذ المهام الإدارية (وحدة التحكم). يُعدّ هذا الفصل مفيدًا في عمليات النشر الكبيرة، حيث يُمكن فصل وحدات التحكم لزيادة الكفاءة والتكرار.
يُحدد مُعرّف العقدة في المجموعة. هذه هي العقدة الأولى، لذا يجب أن تبقى على القيمة 1. يجب أن يكون لجميع العقد مُعرّف فريد، لذا سيكون مُعرّفا العقدتين الثانية والثالثة 2 و3 على التوالي.
يقوم ملف controller.quorum.voters بربط مُعرِّفات العُقد بعناوينها ومنافذها المُقابلة للاتصال. هنا، تُحدِّد عناوين جميع عُقد المجموعة بحيث تكون كل عُقدة على دراية بالعُقد الأخرى. غيِّر السطر ليصبح كما يلي:
...
[email protected]_domain:9093,[email protected]_domain:9093,[email protected]_domain:9093
...هنا، يمكنك إدراج جميع العقد الثلاث في المجموعة مع مُعرِّفاتها. تذكر استبدال your_domain بعنوان نطاقك الذي حددته أثناء المتطلبات الأساسية.
بعد ذلك، ابحث عن الأسطر التالية في الملف:
...
listeners=PLAINTEXT://:9092,CONTROLLER://:9093
# Name of listener used for communication between brokers.
inter.broker.listener.name=PLAINTEXT
# Listener name, hostname and port the broker will advertise to clients.
# If not set, it uses the value for "listeners".
advertised.listeners=PLAINTEXT://localhost:9092
...يُحدد المستمعون العناوين التي تستمع إليها عقدة كافكا، بينما يُحدد المستمعون المُعلنون العناوين المُرسلة إلى العملاء للاتصال بالعقدة. يتيح لك هذا تحديد مجموعة فرعية من العناوين الفعلية التي يجب على العملاء استخدامها.
قم بتغيير الخطوط لتبدو بهذا الشكل، واستبدال your_domain باسم نطاقك الفعلي:
...
listeners=PLAINTEXT://kafka1.your_domain:9092,CONTROLLER://kafka1.your_domain:9093
# Name of listener used for communication between brokers.
inter.broker.listener.name=PLAINTEXT
# Listener name, hostname and port the broker will advertise to clients.
# If not set, it uses the value for "listeners".
advertised.listeners=PLAINTEXT://kafka1.your_domain:9092
...نظرًا لأن هذه العقدة ستكون في مجموعة، فقد قمت بتوجيه العناوين صراحةً إلى القطرة التي سيتم تشغيلها عليها.
بعد ذلك، ابحث عن إعداد num.partitions:
...
# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=1
...كما هو موضح في التعليق، هذا هو العدد الافتراضي للأقسام المُهيأة لكل سلسلة جديدة. بما أن لديك ثلاث عقد، فاضبطه على مضاعف اثنين:
...
num.partitions=6
...القيمة 6 هنا تضمن أن كل عقدة تحتفظ بقسمين للموضوع بشكل افتراضي.
بعد ذلك، يمكنك تكوين عامل التكرار للخيوط الداخلية، والذي يحافظ على إزاحات المستهلك وحالة المعاملة. ابحث عن الأسطر التالية:
...
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
...قم بتعيينها على القيم التالية:
... offsets.topic.replication.factor=2 transaction.state.log.replication.factor=2 ...
هنا، يجب أن تكون عقدتان على الأقل متزامنتين فيما يتعلق بالبيانات الوصفية الداخلية. عند الانتهاء، احفظ الملف وأغلقه.
بعد ضبط رقم القسم الافتراضي، عليك إعادة تهيئة تخزين السجلات. أولًا، احذف ملفات السجلات الحالية بتشغيل الأمر التالي:
rm -rf /home/kafka/kafka-logs/*
بعد ذلك، قم بإنشاء معرف مجموعة جديد وقم بتخزينه في متغير بيئي:
KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"أظهره في المحطة:
echo $KAFKA_CLUSTER_IDسيكون معرف الإخراج:
OutputMjj4bch9Q3-B0TEXv8_zPgخذ هذه القيمة بعين الاعتبار؛ فسوف تحتاج إليها لتكوين العقدة الثانية والثالثة.
أخيرًا، قم بتشغيل الأمر التالي لإنشاء مستودع التقارير:
./bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c config/kraft/server.propertiesسيكون الناتج مشابهًا لهذا:
Output...
Formatting /home/kafka/kafka-logs with metadata.version 3.7-IV4.تكوين العقدة الثانية والثالثة
إعداد العقد الأخرى مشابه جدًا لما فعلته للعقدة الأولى. لاحظ أنك تُحدّث أيضًا مُعرّف العقدة:
...
node.id=node_number
...القيم المناسبة للعقد الثانية والثالثة هي 2 و 3 على التوالي، مما يؤدي إلى تحديد العناوين المناسبة للمستمعين والمستمعين المعلنين.
عند إعادة بناء تخزين السجل، أعد استخدام معرف المجموعة من العقدة الأولى:
KAFKA_CLUSTER_ID="your_cluster_id"عند الانتهاء، ابدأ تشغيل خدمة Kafka على جميع العقد الثلاث عن طريق تشغيل:
sudo systemctl start kafkaفي هذه المرحلة، تكون قد هيأت ثلاث عقد Kafka لتكون جزءًا من مجموعة KRaft. أنشئ موضوعًا، وأنتج رسائل واستقبلها في مجموعتك.
الخطوة 2 – الاتصال بالمجموعة
في هذه الخطوة، ستتصل بمجموعة كافكا باستخدام نصوص الشل المرفقة بها. ستنشئ أيضًا موضوعًا وتحاول إنتاج البيانات واستهلاكها من المجموعة. بعد ذلك، ستُعطل إحدى العقد لترى كيف يُقلل كافكا من الخسائر.
يوفر كافكا نصًا برمجيًا يُسمى kafka-metadata-quorum.sh يعرض معلومات حول المجموعة وأعضائها. نفّذ الأمر التالي لتشغيله:
./bin/kafka-metadata-quorum.sh --bootstrap-controller kafka1.your_domain:9093 describe --statusستتصل بإحدى العقد على المنفذ 9093، وهو نقطة النهاية لوحدة التحكم (ولكن ليس للوسيط). تذكر استبدال kafka1.your_domain بالنطاق الذي يشير إلى إحدى عقد Kafka لديك.
سيكون الناتج مشابهًا لهذا:
OutputClusterId: G3TeIZoeTSCvG2YOWvPE2w
LeaderId: 3
LeaderEpoch: 2
HighWatermark: 383
MaxFollowerLag: 0
MaxFollowerLagTimeMs: 55
CurrentVoters: [1,2,3]
CurrentObservers: []يسرد البرنامج النصي المعلومات الأولية عن حالة المجموعة. في المخرجات المعروضة، يمكنك رؤية أن العقدة 3 قد انتُخبت قائدةً، وأن جميع العقد الثلاث ([1،2،3]) موجودة في مجموعة التصويت، وتتفق على هذا القرار.
قم بإنشاء موضوع يسمى الموضوع الأول عن طريق تشغيل الأمر التالي:
./bin/kafka-topics.sh --create --topic first-topic --bootstrap-server kafka1.your_domain:9092 --replication-factor 2سيكون الناتج:
Created topic first-topic.ثم قم بتشغيل البرنامج النصي kafka-topics.sh لمعرفة كيفية ترتيب الأقسام على العقد:
./bin/kafka-topics.sh --describe --bootstrap-server kafka1.your_domain:9092 --topic first-topicيؤدي تعيين عامل التكرار إلى 2 إلى ضمان توفر الموضوع على عقدتين على الأقل.
سيكون الناتج مشابهًا لهذا:
OutputTopic: first-topic TopicId: 4kVImoFNTQeyk3r2zQbdvw PartitionCount: 6 ReplicationFactor: 2 Configs: segment.bytes=1073741824
Topic: first-topic Partition: 0 Leader: 3 Replicas: 3,1 Isr: 3,1
Topic: first-topic Partition: 1 Leader: 1 Replicas: 1,2 Isr: 1,2
Topic: first-topic Partition: 2 Leader: 2 Replicas: 2,3 Isr: 2,3
Topic: first-topic Partition: 3 Leader: 1 Replicas: 1,3 Isr: 1,3
Topic: first-topic Partition: 4 Leader: 3 Replicas: 3,2 Isr: 3,2
Topic: first-topic Partition: 5 Leader: 2 Replicas: 2,1 Isr: 2,1يمكنك ملاحظة أن كل قسم يحتوي على قائد، ونسختين متماثلتين، ومجموعتي نسخ متماثلة متزامنتين (ISR). قائد القسم هو عقدة خادم تُقدم بيانات القسم للعملاء، بينما النسخ المتماثلة تحتفظ بنسخ فقط. تُعتبر عقدة النسخة المتماثلة ISR إذا كانت متزامنة مع القائد خلال الثواني العشر الماضية افتراضيًا. هذه الفترة قابلة للتخصيص لكل موضوع.
بعد إنشاء موضوع، يمكنك إنتاج رسائله باستخدام البرنامج النصي kafka-console-producer.sh. نفّذ الأمر التالي لبدء تشغيل المُنتِج:
./bin/kafka-console-producer.sh --topic first-topic --bootstrap-server kafka1.your_domain:9092سوف ترى إشعارًا فارغًا:
>ينتظر مزود الخدمة رسالتك النصية. أدخل بيانات الاختبار واضغط على زر الإدخال. سيظهر الإشعار كما يلي:
>Hello World!
>ينتظر المُنتِج الآن الرسالة التالية، مما يعني أن الرسالة السابقة قد وصلت بنجاح إلى كافكا. يمكنك إدخال أي عدد من الرسائل للاختبار. للخروج من المُنتِج، اضغط CTRL+C.
تحتاج إلى مستهلك لإعادة قراءة رسائل الموضوع. يوفر كافكا مستهلكًا بسيطًا باسم kafka-console-consumer.sh. شغّله بتنفيذ:
./bin/kafka-console-consumer.sh --topic first-topic --from-beginning --bootstrap-server kafka1.your_domain:9092ستشاهد الرسائل التي يتم قراءتها من الموضوع:
OutputHello World!
...محاكاة فشل العقدة
في عقدة Kafka الثالثة، أوقف الخدمة عن طريق تشغيل الأمر التالي:
sudo systemctl stop kafkaثم اشرح الموضوع مع التدريب:
./bin/kafka-topics.sh --describe --bootstrap-server kafka1.your_domain:9092 --topic first-topicسيكون الناتج مشابهًا لهذا:
OutputTopic: first-topic TopicId: 4kVImoFNTQeyk3r2zQbdvw PartitionCount: 6 ReplicationFactor: 2 Configs: segment.bytes=1073741824
Topic: first-topic Partition: 0 Leader: 1 Replicas: 3,1 Isr: 1
Topic: first-topic Partition: 1 Leader: 1 Replicas: 1,2 Isr: 1,2
Topic: first-topic Partition: 2 Leader: 2 Replicas: 2,3 Isr: 2
Topic: first-topic Partition: 3 Leader: 1 Replicas: 1,3 Isr: 1
Topic: first-topic Partition: 4 Leader: 2 Replicas: 3,2 Isr: 2
Topic: first-topic Partition: 5 Leader: 2 Replicas: 2,1 Isr: 2,1على الرغم من إدراج العقدة 3 كنسخة طبق الأصل، إلا أنها غير موجودة في مجموعات ISR لعدم توفرها. عند انضمامها مجددًا إلى المجموعة، تُزامن مع العقد الأخرى وتحاول استعادة موقعها السابق.
حاول قراءة الرسائل في الموضوع الأول مرة أخرى:
./bin/kafka-console-consumer.sh --topic first-topic --from-beginning --bootstrap-server kafka1.your_domain:9092سوف ترى أنه يمكن الوصول إليها كالمعتاد:
OutputHello World!
...بفضل وجود النسخ المتماثلة، تتولى العقدتان الأوليتان التحكم وتخدمان المستخدم. الآن، يمكنك تشغيل كافكا على الخادم الثالث:
sudo systemctl start kafkaفي هذه المرحلة، تكون قد رأيت كيف يُخفف كافكا من مشكلة عدم توفر عقدة في المجموعة. ستتعلم الآن كيفية إزالة عقدة من المجموعة.
الخطوة 3 - نقل البيانات بين العقد
في هذه الخطوة، ستتعلم كيفية نقل المواضيع بين العقد في مجموعة Kafka. عند إضافة عقدة إلى مجموعة حالية تحتوي على مواضيع، لا ينقل Kafka أي أقسام إليها تلقائيًا، وهو أمر قد ترغب في القيام به. هذا مفيد أيضًا لإزالة العقد، حيث لا تُنقل الأقسام الموجودة تلقائيًا إلى العقد المتبقية.
يوفر كافكا برنامجًا نصيًا يُسمى kafka-reassign-partitions.sh، يُمكّن من إنشاء خطط الترحيل وتنفيذها والتحقق منها. ستستخدمه لإنشاء خطة لترحيل أقسام الموضوع الأول إلى أول عقدتين.
أولاً، عليك تحديد المواضيع المراد نقلها. يقبل البرنامج النصي ملف JSON يحتوي على تعريفات المواضيع، لذا أنشئه وافتحه للتعديل:
vi topics-to-move.jsonأضف الأسطر التالية:
{
"topics": [
{
"topic": "first-topic"
}
],
"version": 1
}تحت العناوين، يمكنك تحديد كائن يشير إلى الموضوع الأول. عند الانتهاء، احفظ الملف وأغلقه.
قم بتشغيل الأمر التالي لتوليد خطة الترحيل، واستبدال kafka1.your_domain بالنطاق الذي يشير إلى إحدى عقد Kafka الخاصة بك:
./bin/kafka-reassign-partitions.sh --bootstrap-server kafka1.your_domain:9092 --topics-to-move-json-file topics-to-move.json --broker-list "1,2" --generateتقوم بإرسال "1,2" إلى –broker-list الذي يمثل معرف الوسطاء المستهدفين.
سيكون الناتج مشابهًا لهذا:
OutputCurrent partition replica assignment
{"version":1,"partitions":[{"topic":"first-topic","partition":0,"replicas":[3,1],"log_dirs":["any","any"]},{"topic":"first-topic","partition":1,"replicas":[1,2],"log_dirs":["any","any"]},{"topic":"first-topic","partition":2,"replicas":[2,3],"log_dirs":["any","any"]},{"topic":"first-topic","partition":3,"replicas":[1,3],"log_dirs":["any","any"]},{"topic":"first-topic","partition":4,"replicas":[3,2],"log_dirs":["any","any"]},{"topic":"first-topic","partition":5,"replicas":[2,1],"log_dirs":["any","any"]}]}
Proposed partition reassignment configuration
{"version":1,"partitions":[{"topic":"first-topic","partition":0,"replicas":[2,1],"log_dirs":["any","any"]},{"topic":"first-topic","partition":1,"replicas":[1,2],"log_dirs":["any","any"]},{"topic":"first-topic","partition":2,"replicas":[2,1],"log_dirs":["any","any"]},{"topic":"first-topic","partition":3,"replicas":[1,2],"log_dirs":["any","any"]},{"topic":"first-topic","partition":4,"replicas":[2,1],"log_dirs":["any","any"]},{"topic":"first-topic","partition":5,"replicas":[1,2],"log_dirs":["any","any"]}]}أنشأ هذا البرنامج النصي خطتين إجمالاً، تصفان تخطيطي التقسيم الحالي والمقترح على التوالي. تُتاح الخطة الأولى للرجوع عن التغييرات لاحقًا. انتبه للخطة الثانية، التي ستحفظها في ملف منفصل باسم migration-plan.json. أنشئها وافتحها للتعديل:
vi migration-plan.jsonأضف الملف القابل للتنفيذ الثاني:
{"version":1,"partitions":[{"topic":"first-topic","partition":0,"replicas":[2,1],"log_dirs":["any","any"]},{"topic":"first-topic","partition":1,"replicas":[1,2],"log_dirs":["any","any"]},{"topic":"first-topic","partition":2,"replicas":[2,1],"log_dirs":["any","any"]},{"topic":"first-topic","partition":3,"replicas":[1,2],"log_dirs":["any","any"]},{"topic":"first-topic","partition":4,"replicas":[2,1],"log_dirs":["any","any"]},{"topic":"first-topic","partition":5,"replicas":[1,2],"log_dirs":["any","any"]}]}احفظ الملف وأغلقه. ثم شغّل الأمر التالي لتشغيله:
./bin/kafka-reassign-partitions.sh --bootstrap-server kafka1.your_domain:9092 --reassignment-json-file migration-plan.json --executeسيكون الناتج:
OutputCurrent partition replica assignment
{"version":1,"partitions":[{"topic":"first-topic","partition":0,"replicas":[3,1],"log_dirs":["any","any"]},{"topic":"first-topic","partition":1,"replicas":[1,2],"log_dirs":["any","any"]},{"topic":"first-topic","partition":2,"replicas":[2,3],"log_dirs":["any","any"]},{"topic":"first-topic","partition":3,"replicas":[1,3],"log_dirs":["any","any"]},{"topic":"first-topic","partition":4,"replicas":[3,2],"log_dirs":["any","any"]},{"topic":"first-topic","partition":5,"replicas":[2,1],"log_dirs":["any","any"]}]}
Save this to use as the --reassignment-json-file option during rollback
Successfully started partition reassignments for first-topic-0,first-topic-1,first-topic-2,first-topic-3,first-topic-4,first-topic-5أشار النص إلى بدء عملية الترحيل. للاطلاع على تقدم عملية الترحيل، استخدم الأمر --verify:
./bin/kafka-reassign-partitions.sh --bootstrap-server kafka1.your_domain:9092 --reassignment-json-file migration-plan.json --verifyبعد فترة من الوقت، سوف يبدو الناتج مثل هذا:
OutputStatus of partition reassignment:
Reassignment of partition first-topic-0 is completed.
Reassignment of partition first-topic-1 is completed.
Reassignment of partition first-topic-2 is completed.
Reassignment of partition first-topic-3 is completed.
Reassignment of partition first-topic-4 is completed.
Reassignment of partition first-topic-5 is completed.
Clearing broker-level throttles on brokers 1,2,3
Clearing topic-level throttles on topic first-topicالآن يمكنك شرح المشكلة الأولى للتأكد من عدم وجود أقسام على Broker 3:
./bin/kafka-topics.sh --describe --bootstrap-server kafka1.your_domain:9092 --topic first-topicوسيكون الناتج على النحو التالي:
OutputTopic: first-topic TopicId: 4kVImoFNTQeyk3r2zQbdvw PartitionCount: 6 ReplicationFactor: 2 Configs: segment.bytes=1073741824
Topic: first-topic Partition: 0 Leader: 2 Replicas: 2,1 Isr: 1,2
Topic: first-topic Partition: 1 Leader: 1 Replicas: 1,2 Isr: 1,2
Topic: first-topic Partition: 2 Leader: 2 Replicas: 2,1 Isr: 2,1
Topic: first-topic Partition: 3 Leader: 1 Replicas: 1,2 Isr: 1,2
Topic: first-topic Partition: 4 Leader: 2 Replicas: 2,1 Isr: 2,1
Topic: first-topic Partition: 5 Leader: 1 Replicas: 1,2 Isr: 2,1يتواجد فقط الوسيطان 1 و2 كنسخ متماثلة وISRs، مما يعني أن الهجرة كانت ناجحة.
في هذه المرحلة، قمت بإنشاء خطة هجرة لنقل الموضوع الأول من الوسيط 3 إلى الموضوعات المتبقية وتعلمت كيفية التحقق من أن الهجرة تمت بسلاسة.
نتيجة
لديك الآن مجموعة كافكا تتكون من ثلاث عقد تتواصل باستخدام بروتوكول KRaft. تعلمتَ أيضًا كيفية فحص المجموعة وتخطيط التقسيم. اختبرتَ تكرار المجموعة بتعطيل عقدة والقراءة من موضوع. وأخيرًا، تعلمتَ كيفية تحويل المواضيع إلى عقد المجموعة.









