Как настроить многоузловой кластер Kafka с помощью KCraft

0 Акции
0
0
0
0

Введение

Apache Kafka — это распределённая платформа обработки событий и потоков данных с открытым исходным кодом, написанная на Java и предназначенная для обработки потоков данных в режиме реального времени. Она обладает масштабируемостью, высокой пропускной способностью и доступностью. Она рассчитана на поддержку сотен узлов в кластере.

В этом руководстве вы создадите кластер Kafka, использующий протокол консенсуса KRaft. Вы узнаете, как настроить узлы для работы в кластере и как тематические разделы назначаются различным узлам. Вы также узнаете, как назначать темы конкретным брокерам в кластере.

Предпосылки
  • Три устройства с не менее чем 4 ГБ ОЗУ и 2 процессорами
  • Полностью зарегистрированное доменное имя с тремя поддоменами, указывающими на три точки.
  • Apache Kafka установлен и настроен на ваших Droplets.

Шаг 1 — Настройка узлов Kafka

На этом этапе вы настроите три сервера Kafka, созданные вами в рамках предварительных требований, для включения их в один кластер KRaft. С помощью KRaft узлы сами организуют и выполняют административные задачи без дополнительных накладных расходов и зависимости от Apache ZooKeeper.

Настройка первого узла

Начнём с настройки первого узла. Для начала остановите службу на первом Droplet, выполнив следующую команду:

sudo systemctl stop kafka

Как пользователь Kafka, перейдите в каталог, где находится Kafka, и откройте его файл конфигурации для редактирования, выполнив следующую команду:

vi /config/kraft/server.properties

Найдите следующие строки:

...
# The role of this server. Setting this puts us in KRaft mode
process.roles=broker,controller
# The node id associated with this instance's roles
node.id=1
# The connect string for the controller quorum
controller.quorum.voters=1@localhost:9092
...

Эти три параметра настраивают узел Kafka на работу одновременно в качестве брокера и контроллера, то есть он получает и потребляет данные (брокер) и выполняет административные задачи (контроллер). Такое разделение полезно в крупных развертываниях, где контроллеры можно разместить отдельно для повышения эффективности и избыточности.

node.id указывает идентификатор узла в кластере. Это первый узел, поэтому его значение должно быть равно 1. Все узлы должны иметь уникальный идентификатор, поэтому идентификаторы второго и третьего узлов будут равны 2 и 3 соответственно.

controller.quorum.voters сопоставляет идентификаторы узлов с соответствующими им адресами и портами для связи. Здесь вы указываете адреса всех узлов кластера, чтобы каждый узел знал о других узлах. Измените строку следующим образом:

...
[email protected]_domain:9093,[email protected]_domain:9093,[email protected]_domain:9093
...

Здесь вы перечисляете все три узла кластера с их идентификаторами. Не забудьте заменить your_domain на адрес вашего домена, указанный вами при выполнении предварительных требований.

Далее найдите в файле следующие строки:

...
listeners=PLAINTEXT://:9092,CONTROLLER://:9093
# Name of listener used for communication between brokers.
inter.broker.listener.name=PLAINTEXT
# Listener name, hostname and port the broker will advertise to clients.
# If not set, it uses the value for "listeners".
advertised.listeners=PLAINTEXT://localhost:9092
...

Параметр listeners определяет адреса, которые прослушивает узел Kafka, а advertised.listeners — адреса, отправляемые клиентам для подключения к узлу. Это позволяет указать подмножество фактических адресов, которые должны использовать клиенты.

Измените строки так, заменив your_domain на ваше фактическое доменное имя:

...
listeners=PLAINTEXT://kafka1.your_domain:9092,CONTROLLER://kafka1.your_domain:9093
# Name of listener used for communication between brokers.
inter.broker.listener.name=PLAINTEXT
# Listener name, hostname and port the broker will advertise to clients.
# If not set, it uses the value for "listeners".
advertised.listeners=PLAINTEXT://kafka1.your_domain:9092
...

Поскольку этот узел будет находиться в кластере, вы явно указали адреса дроплета, на котором он будет работать.

Затем найдите параметр num.partitions:

...
# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=1
...

Как указано в комментарии, это количество разделов по умолчанию, настроенное для каждого нового потока. Поскольку у вас три узла, установите его кратным двум:

...
num.partitions=6
...

Значение 6 здесь гарантирует, что каждый узел по умолчанию поддерживает два тематических раздела.

Затем настройте коэффициент итерации для внутренних потоков, которые поддерживают смещения потребителей и статус транзакций. Найдите следующие строки:

...
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
...

Установите для них следующие значения:

...
offsets.topic.replication.factor=2
transaction.state.log.replication.factor=2
...

Здесь вы указываете, что как минимум два узла должны быть синхронизированы по внутренним метаданным. После этого сохраните и закройте файл.

После установки номера раздела по умолчанию необходимо перенастроить хранилище журналов. Для начала удалите существующие файлы журналов, выполнив команду:

rm -rf /home/kafka/kafka-logs/*

Затем создайте новый идентификатор кластера и сохраните его в переменной среды:

KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"

Покажите это в терминале:

echo $KAFKA_CLUSTER_ID

Выходной идентификатор будет следующим:

OutputMjj4bch9Q3-B0TEXv8_zPg

Запишите это значение, оно понадобится вам для настройки второго и третьего узлов.

Наконец, выполните следующую команду, чтобы создать репозиторий отчетов:

./bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c config/kraft/server.properties

Вывод будет примерно таким:

Output...
Formatting /home/kafka/kafka-logs with metadata.version 3.7-IV4.
Настройка второго и третьего узлов

Настройка остальных узлов очень похожа на настройку первого узла. Обратите внимание, что вы также обновляете node.id:

...
node.id=node_number
...

Соответствующие значения для второго и третьего узлов — 2 и 3 соответственно, устанавливающие соответствующие адреса для прослушивателей и рекламируемых прослушивателей.

При перестройке хранилища журналов повторно используйте идентификатор кластера с первого узла:

KAFKA_CLUSTER_ID="your_cluster_id"

После завершения запустите службу Kafka на всех трех узлах, выполнив команду:

sudo systemctl start kafka

На этом этапе вы настроили три узла Kafka для работы в кластере KRaft. Вы создаёте тему, а также отправляете и потребляете сообщения в своём кластере.

Шаг 2 — Подключение к кластеру

На этом этапе вы подключитесь к кластеру Kafka с помощью скриптов оболочки, поставляемых вместе с Kafka. Вы также создадите тему и попробуете производить и использовать данные из кластера. Затем вы отключите один из узлов и посмотрите, как Kafka снижает потери.

Kafka предоставляет скрипт kafka-metadata-quorum.sh, который отображает информацию о кластере и его участниках. Чтобы запустить его, выполните следующую команду:

./bin/kafka-metadata-quorum.sh --bootstrap-controller kafka1.your_domain:9093 describe --status

Вы подключитесь к одному из узлов через порт 9093, который является конечной точкой контроллера (но не брокера). Не забудьте заменить kafka1.your_domain на домен, указывающий на один из ваших узлов Kafka.

Вывод будет примерно таким:

OutputClusterId: G3TeIZoeTSCvG2YOWvPE2w
LeaderId: 3
LeaderEpoch: 2
HighWatermark: 383
MaxFollowerLag: 0
MaxFollowerLagTimeMs: 55
CurrentVoters: [1,2,3]
CurrentObservers: []

Скрипт выводит начальную информацию о состоянии кластера. В показанном выводе видно, что узел 3 выбран лидером, и все три узла ([1,2,3]) находятся в наборе для голосования и согласны с этим решением.

Создайте тему под названием «Первая тема», выполнив следующее:

./bin/kafka-topics.sh --create --topic first-topic --bootstrap-server kafka1.your_domain:9092 --replication-factor 2

Результат будет следующим:

Created topic first-topic.

Затем запустите скрипт kafka-topics.sh, чтобы увидеть, как распределены разделы на узлах:

./bin/kafka-topics.sh --describe --bootstrap-server kafka1.your_domain:9092 --topic first-topic

Установка коэффициента повторения 2 гарантирует, что тема будет доступна как минимум на двух узлах.

Вывод будет примерно таким:

OutputTopic: first-topic TopicId: 4kVImoFNTQeyk3r2zQbdvw PartitionCount: 6 ReplicationFactor: 2 Configs: segment.bytes=1073741824
Topic: first-topic Partition: 0 Leader: 3 Replicas: 3,1 Isr: 3,1
Topic: first-topic Partition: 1 Leader: 1 Replicas: 1,2 Isr: 1,2
Topic: first-topic Partition: 2 Leader: 2 Replicas: 2,3 Isr: 2,3
Topic: first-topic Partition: 3 Leader: 1 Replicas: 1,3 Isr: 1,3
Topic: first-topic Partition: 4 Leader: 3 Replicas: 3,2 Isr: 3,2
Topic: first-topic Partition: 5 Leader: 2 Replicas: 2,1 Isr: 2,1

Видно, что у каждого раздела есть лидер, две реплики и два синхронизированных набора реплик (ISR). Лидер раздела — это серверный узел, который предоставляет данные раздела клиентам, в то время как реплики содержат только копии. Узел-реплика считается ISR, если по умолчанию он синхронизирован с лидером в течение последних десяти секунд. Этот интервал настраивается для каждой темы отдельно.

Теперь, когда тема создана, вы будете генерировать её сообщения с помощью скрипта kafka-console-producer.sh. Для запуска продюсера выполните следующую команду:

./bin/kafka-console-producer.sh --topic first-topic --bootstrap-server kafka1.your_domain:9092

Вы увидите пустое уведомление:

>

Провайдер ждёт вашего SMS. Введите тест и нажмите ENTER. Уведомление будет выглядеть так:

>Hello World!
>

Теперь производитель ожидает следующее сообщение, что означает, что предыдущее сообщение успешно доставлено в Kafka. Вы можете ввести любое количество сообщений для тестирования. Чтобы выйти из производителя, нажмите CTRL+C.

Вам нужен потребитель для перечитывания сообщений темы. В Kafka есть простой потребитель — kafka-console-consumer.sh. Запустите его, выполнив:

./bin/kafka-console-consumer.sh --topic first-topic --from-beginning --bootstrap-server kafka1.your_domain:9092

Вы увидите прочитанные сообщения из темы:

OutputHello World!
...

Моделирование отказа узла

На третьем узле Kafka остановите службу, выполнив следующее:

sudo systemctl stop kafka

Затем объясните тему с помощью практики:

./bin/kafka-topics.sh --describe --bootstrap-server kafka1.your_domain:9092 --topic first-topic

Вывод будет примерно таким:

OutputTopic: first-topic TopicId: 4kVImoFNTQeyk3r2zQbdvw PartitionCount: 6 ReplicationFactor: 2 Configs: segment.bytes=1073741824
Topic: first-topic Partition: 0 Leader: 1 Replicas: 3,1 Isr: 1
Topic: first-topic Partition: 1 Leader: 1 Replicas: 1,2 Isr: 1,2
Topic: first-topic Partition: 2 Leader: 2 Replicas: 2,3 Isr: 2
Topic: first-topic Partition: 3 Leader: 1 Replicas: 1,3 Isr: 1
Topic: first-topic Partition: 4 Leader: 2 Replicas: 3,2 Isr: 2
Topic: first-topic Partition: 5 Leader: 2 Replicas: 2,1 Isr: 2,1

Хотя узел 3 указан как реплика, он отсутствует в наборах ISR, поскольку недоступен. При повторном присоединении к кластеру он синхронизируется с другими узлами и пытается восстановить прежнее положение.

Попробуйте еще раз прочитать сообщения в первой теме:

./bin/kafka-console-consumer.sh --topic first-topic --from-beginning --bootstrap-server kafka1.your_domain:9092

Вы увидите, что они доступны как обычно:

OutputHello World!
...

Благодаря наличию реплик первые два узла берут на себя управление и обслуживают потребителя. Теперь можно запустить Kafka на третьем сервере:

sudo systemctl start kafka

На этом этапе вы увидели, как Kafka справляется с недоступностью узла в кластере. Теперь вы узнаете, как удалить узел из кластера.

Шаг 3 — Передача данных между узлами

На этом этапе вы узнаете, как перемещать темы между узлами в кластере Kafka. При добавлении узла в существующий кластер с темами Kafka не перемещает в него автоматически никакие разделы, что может быть полезно. Это также полезно при удалении узлов, поскольку существующие разделы не переносятся автоматически на оставшиеся узлы.

Kafka предоставляет скрипт kafka-reassign-partitions.sh, который может генерировать, выполнять и проверять планы миграции. Вы будете использовать его для создания плана миграции разделов первой темы на первые два узла.

Сначала нужно указать, какие темы следует перенести. Скрипт принимает JSON-файл с определениями тем, поэтому создайте и откройте его для редактирования:

vi topics-to-move.json

Добавьте следующие строки:

{
"topics": [
{
"topic": "first-topic"
}
],
"version": 1
}

В разделе «Заголовки» определите объект, ссылающийся на первую тему. После этого сохраните и закройте файл.

Выполните следующую команду для создания плана миграции, заменив kafka1.your_domain на домен, указывающий на один из ваших узлов Kafka:

./bin/kafka-reassign-partitions.sh --bootstrap-server kafka1.your_domain:9092 --topics-to-move-json-file topics-to-move.json --broker-list "1,2" --generate

Вы передаете “1,2” в –broker-list, который представляет собой идентификатор целевых брокеров.

Вывод будет примерно таким:

OutputCurrent partition replica assignment
{"version":1,"partitions":[{"topic":"first-topic","partition":0,"replicas":[3,1],"log_dirs":["any","any"]},{"topic":"first-topic","partition":1,"replicas":[1,2],"log_dirs":["any","any"]},{"topic":"first-topic","partition":2,"replicas":[2,3],"log_dirs":["any","any"]},{"topic":"first-topic","partition":3,"replicas":[1,3],"log_dirs":["any","any"]},{"topic":"first-topic","partition":4,"replicas":[3,2],"log_dirs":["any","any"]},{"topic":"first-topic","partition":5,"replicas":[2,1],"log_dirs":["any","any"]}]}
Proposed partition reassignment configuration
{"version":1,"partitions":[{"topic":"first-topic","partition":0,"replicas":[2,1],"log_dirs":["any","any"]},{"topic":"first-topic","partition":1,"replicas":[1,2],"log_dirs":["any","any"]},{"topic":"first-topic","partition":2,"replicas":[2,1],"log_dirs":["any","any"]},{"topic":"first-topic","partition":3,"replicas":[1,2],"log_dirs":["any","any"]},{"topic":"first-topic","partition":4,"replicas":[2,1],"log_dirs":["any","any"]},{"topic":"first-topic","partition":5,"replicas":[1,2],"log_dirs":["any","any"]}]}

Этот скрипт создал два плана, описывающих текущую и предлагаемую структуру разделов соответственно. Первый план предоставляется на случай, если вы захотите отменить изменения позже. Обратите внимание на второй план, который вы сохраните в отдельном файле с именем migration-plan.json. Создайте его и откройте для редактирования:

vi migration-plan.json

Добавьте второй исполняемый файл:

{"version":1,"partitions":[{"topic":"first-topic","partition":0,"replicas":[2,1],"log_dirs":["any","any"]},{"topic":"first-topic","partition":1,"replicas":[1,2],"log_dirs":["any","any"]},{"topic":"first-topic","partition":2,"replicas":[2,1],"log_dirs":["any","any"]},{"topic":"first-topic","partition":3,"replicas":[1,2],"log_dirs":["any","any"]},{"topic":"first-topic","partition":4,"replicas":[2,1],"log_dirs":["any","any"]},{"topic":"first-topic","partition":5,"replicas":[1,2],"log_dirs":["any","any"]}]}

Сохраните и закройте файл. Затем выполните следующую команду для его запуска:

./bin/kafka-reassign-partitions.sh --bootstrap-server kafka1.your_domain:9092 --reassignment-json-file migration-plan.json --execute

Результат будет следующим:

OutputCurrent partition replica assignment
{"version":1,"partitions":[{"topic":"first-topic","partition":0,"replicas":[3,1],"log_dirs":["any","any"]},{"topic":"first-topic","partition":1,"replicas":[1,2],"log_dirs":["any","any"]},{"topic":"first-topic","partition":2,"replicas":[2,3],"log_dirs":["any","any"]},{"topic":"first-topic","partition":3,"replicas":[1,3],"log_dirs":["any","any"]},{"topic":"first-topic","partition":4,"replicas":[3,2],"log_dirs":["any","any"]},{"topic":"first-topic","partition":5,"replicas":[2,1],"log_dirs":["any","any"]}]}
Save this to use as the --reassignment-json-file option during rollback
Successfully started partition reassignments for first-topic-0,first-topic-1,first-topic-2,first-topic-3,first-topic-4,first-topic-5

Скрипт сообщил о начале миграции. Чтобы увидеть ход выполнения, введите --verify:

./bin/kafka-reassign-partitions.sh --bootstrap-server kafka1.your_domain:9092 --reassignment-json-file migration-plan.json --verify

Через некоторое время вывод будет выглядеть так:

OutputStatus of partition reassignment:
Reassignment of partition first-topic-0 is completed.
Reassignment of partition first-topic-1 is completed.
Reassignment of partition first-topic-2 is completed.
Reassignment of partition first-topic-3 is completed.
Reassignment of partition first-topic-4 is completed.
Reassignment of partition first-topic-5 is completed.
Clearing broker-level throttles on brokers 1,2,3
Clearing topic-level throttles on topic first-topic

Теперь можно объяснить первую проблему, чтобы проверить отсутствие разделов на Брокере 3:

./bin/kafka-topics.sh --describe --bootstrap-server kafka1.your_domain:9092 --topic first-topic

Результат будет следующим:

OutputTopic: first-topic TopicId: 4kVImoFNTQeyk3r2zQbdvw PartitionCount: 6 ReplicationFactor: 2 Configs: segment.bytes=1073741824
Topic: first-topic Partition: 0 Leader: 2 Replicas: 2,1 Isr: 1,2
Topic: first-topic Partition: 1 Leader: 1 Replicas: 1,2 Isr: 1,2
Topic: first-topic Partition: 2 Leader: 2 Replicas: 2,1 Isr: 2,1
Topic: first-topic Partition: 3 Leader: 1 Replicas: 1,2 Isr: 1,2
Topic: first-topic Partition: 4 Leader: 2 Replicas: 2,1 Isr: 2,1
Topic: first-topic Partition: 5 Leader: 1 Replicas: 1,2 Isr: 2,1

Только брокеры 1 и 2 присутствуют в качестве реплик и ISR, что означает, что миграция прошла успешно.

На этом этапе вы создали план миграции для перемещения первой темы от брокера 3 к оставшимся темам и узнали, как проверить, что миграция прошла гладко.

Результат

Теперь у вас есть кластер Kafka, состоящий из трёх узлов, взаимодействующих по протоколу KRaft. Вы также научились проверять кластер и структуру разделов. Вы проверили избыточность кластера, отключив узел и прочитав данные из топика. Наконец, вы научились переключать топики на узлы кластера.

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *

Вам также может понравиться