Введение
Apache Kafka — это распределённая платформа обработки событий и потоков данных с открытым исходным кодом, написанная на Java и предназначенная для обработки потоков данных в режиме реального времени. Она обладает масштабируемостью, высокой пропускной способностью и доступностью. Она рассчитана на поддержку сотен узлов в кластере.
В этом руководстве вы создадите кластер Kafka, использующий протокол консенсуса KRaft. Вы узнаете, как настроить узлы для работы в кластере и как тематические разделы назначаются различным узлам. Вы также узнаете, как назначать темы конкретным брокерам в кластере.
Предпосылки
- Три устройства с не менее чем 4 ГБ ОЗУ и 2 процессорами
- Полностью зарегистрированное доменное имя с тремя поддоменами, указывающими на три точки.
- Apache Kafka установлен и настроен на ваших Droplets.
Шаг 1 — Настройка узлов Kafka
На этом этапе вы настроите три сервера Kafka, созданные вами в рамках предварительных требований, для включения их в один кластер KRaft. С помощью KRaft узлы сами организуют и выполняют административные задачи без дополнительных накладных расходов и зависимости от Apache ZooKeeper.
Настройка первого узла
Начнём с настройки первого узла. Для начала остановите службу на первом Droplet, выполнив следующую команду:
sudo systemctl stop kafkaКак пользователь Kafka, перейдите в каталог, где находится Kafka, и откройте его файл конфигурации для редактирования, выполнив следующую команду:
vi /config/kraft/server.propertiesНайдите следующие строки:
...
# The role of this server. Setting this puts us in KRaft mode
process.roles=broker,controller
# The node id associated with this instance's roles
node.id=1
# The connect string for the controller quorum
controller.quorum.voters=1@localhost:9092
...Эти три параметра настраивают узел Kafka на работу одновременно в качестве брокера и контроллера, то есть он получает и потребляет данные (брокер) и выполняет административные задачи (контроллер). Такое разделение полезно в крупных развертываниях, где контроллеры можно разместить отдельно для повышения эффективности и избыточности.
node.id указывает идентификатор узла в кластере. Это первый узел, поэтому его значение должно быть равно 1. Все узлы должны иметь уникальный идентификатор, поэтому идентификаторы второго и третьего узлов будут равны 2 и 3 соответственно.
controller.quorum.voters сопоставляет идентификаторы узлов с соответствующими им адресами и портами для связи. Здесь вы указываете адреса всех узлов кластера, чтобы каждый узел знал о других узлах. Измените строку следующим образом:
...
[email protected]_domain:9093,[email protected]_domain:9093,[email protected]_domain:9093
...Здесь вы перечисляете все три узла кластера с их идентификаторами. Не забудьте заменить your_domain на адрес вашего домена, указанный вами при выполнении предварительных требований.
Далее найдите в файле следующие строки:
...
listeners=PLAINTEXT://:9092,CONTROLLER://:9093
# Name of listener used for communication between brokers.
inter.broker.listener.name=PLAINTEXT
# Listener name, hostname and port the broker will advertise to clients.
# If not set, it uses the value for "listeners".
advertised.listeners=PLAINTEXT://localhost:9092
...Параметр listeners определяет адреса, которые прослушивает узел Kafka, а advertised.listeners — адреса, отправляемые клиентам для подключения к узлу. Это позволяет указать подмножество фактических адресов, которые должны использовать клиенты.
Измените строки так, заменив your_domain на ваше фактическое доменное имя:
...
listeners=PLAINTEXT://kafka1.your_domain:9092,CONTROLLER://kafka1.your_domain:9093
# Name of listener used for communication between brokers.
inter.broker.listener.name=PLAINTEXT
# Listener name, hostname and port the broker will advertise to clients.
# If not set, it uses the value for "listeners".
advertised.listeners=PLAINTEXT://kafka1.your_domain:9092
...Поскольку этот узел будет находиться в кластере, вы явно указали адреса дроплета, на котором он будет работать.
Затем найдите параметр num.partitions:
...
# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=1
...Как указано в комментарии, это количество разделов по умолчанию, настроенное для каждого нового потока. Поскольку у вас три узла, установите его кратным двум:
...
num.partitions=6
...Значение 6 здесь гарантирует, что каждый узел по умолчанию поддерживает два тематических раздела.
Затем настройте коэффициент итерации для внутренних потоков, которые поддерживают смещения потребителей и статус транзакций. Найдите следующие строки:
...
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
...Установите для них следующие значения:
... offsets.topic.replication.factor=2 transaction.state.log.replication.factor=2 ...
Здесь вы указываете, что как минимум два узла должны быть синхронизированы по внутренним метаданным. После этого сохраните и закройте файл.
После установки номера раздела по умолчанию необходимо перенастроить хранилище журналов. Для начала удалите существующие файлы журналов, выполнив команду:
rm -rf /home/kafka/kafka-logs/*
Затем создайте новый идентификатор кластера и сохраните его в переменной среды:
KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"Покажите это в терминале:
echo $KAFKA_CLUSTER_IDВыходной идентификатор будет следующим:
OutputMjj4bch9Q3-B0TEXv8_zPgЗапишите это значение, оно понадобится вам для настройки второго и третьего узлов.
Наконец, выполните следующую команду, чтобы создать репозиторий отчетов:
./bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c config/kraft/server.propertiesВывод будет примерно таким:
Output...
Formatting /home/kafka/kafka-logs with metadata.version 3.7-IV4.Настройка второго и третьего узлов
Настройка остальных узлов очень похожа на настройку первого узла. Обратите внимание, что вы также обновляете node.id:
...
node.id=node_number
...Соответствующие значения для второго и третьего узлов — 2 и 3 соответственно, устанавливающие соответствующие адреса для прослушивателей и рекламируемых прослушивателей.
При перестройке хранилища журналов повторно используйте идентификатор кластера с первого узла:
KAFKA_CLUSTER_ID="your_cluster_id"После завершения запустите службу Kafka на всех трех узлах, выполнив команду:
sudo systemctl start kafkaНа этом этапе вы настроили три узла Kafka для работы в кластере KRaft. Вы создаёте тему, а также отправляете и потребляете сообщения в своём кластере.
Шаг 2 — Подключение к кластеру
На этом этапе вы подключитесь к кластеру Kafka с помощью скриптов оболочки, поставляемых вместе с Kafka. Вы также создадите тему и попробуете производить и использовать данные из кластера. Затем вы отключите один из узлов и посмотрите, как Kafka снижает потери.
Kafka предоставляет скрипт kafka-metadata-quorum.sh, который отображает информацию о кластере и его участниках. Чтобы запустить его, выполните следующую команду:
./bin/kafka-metadata-quorum.sh --bootstrap-controller kafka1.your_domain:9093 describe --statusВы подключитесь к одному из узлов через порт 9093, который является конечной точкой контроллера (но не брокера). Не забудьте заменить kafka1.your_domain на домен, указывающий на один из ваших узлов Kafka.
Вывод будет примерно таким:
OutputClusterId: G3TeIZoeTSCvG2YOWvPE2w
LeaderId: 3
LeaderEpoch: 2
HighWatermark: 383
MaxFollowerLag: 0
MaxFollowerLagTimeMs: 55
CurrentVoters: [1,2,3]
CurrentObservers: []Скрипт выводит начальную информацию о состоянии кластера. В показанном выводе видно, что узел 3 выбран лидером, и все три узла ([1,2,3]) находятся в наборе для голосования и согласны с этим решением.
Создайте тему под названием «Первая тема», выполнив следующее:
./bin/kafka-topics.sh --create --topic first-topic --bootstrap-server kafka1.your_domain:9092 --replication-factor 2Результат будет следующим:
Created topic first-topic.Затем запустите скрипт kafka-topics.sh, чтобы увидеть, как распределены разделы на узлах:
./bin/kafka-topics.sh --describe --bootstrap-server kafka1.your_domain:9092 --topic first-topicУстановка коэффициента повторения 2 гарантирует, что тема будет доступна как минимум на двух узлах.
Вывод будет примерно таким:
OutputTopic: first-topic TopicId: 4kVImoFNTQeyk3r2zQbdvw PartitionCount: 6 ReplicationFactor: 2 Configs: segment.bytes=1073741824
Topic: first-topic Partition: 0 Leader: 3 Replicas: 3,1 Isr: 3,1
Topic: first-topic Partition: 1 Leader: 1 Replicas: 1,2 Isr: 1,2
Topic: first-topic Partition: 2 Leader: 2 Replicas: 2,3 Isr: 2,3
Topic: first-topic Partition: 3 Leader: 1 Replicas: 1,3 Isr: 1,3
Topic: first-topic Partition: 4 Leader: 3 Replicas: 3,2 Isr: 3,2
Topic: first-topic Partition: 5 Leader: 2 Replicas: 2,1 Isr: 2,1Видно, что у каждого раздела есть лидер, две реплики и два синхронизированных набора реплик (ISR). Лидер раздела — это серверный узел, который предоставляет данные раздела клиентам, в то время как реплики содержат только копии. Узел-реплика считается ISR, если по умолчанию он синхронизирован с лидером в течение последних десяти секунд. Этот интервал настраивается для каждой темы отдельно.
Теперь, когда тема создана, вы будете генерировать её сообщения с помощью скрипта kafka-console-producer.sh. Для запуска продюсера выполните следующую команду:
./bin/kafka-console-producer.sh --topic first-topic --bootstrap-server kafka1.your_domain:9092Вы увидите пустое уведомление:
>Провайдер ждёт вашего SMS. Введите тест и нажмите ENTER. Уведомление будет выглядеть так:
>Hello World!
>Теперь производитель ожидает следующее сообщение, что означает, что предыдущее сообщение успешно доставлено в Kafka. Вы можете ввести любое количество сообщений для тестирования. Чтобы выйти из производителя, нажмите CTRL+C.
Вам нужен потребитель для перечитывания сообщений темы. В Kafka есть простой потребитель — kafka-console-consumer.sh. Запустите его, выполнив:
./bin/kafka-console-consumer.sh --topic first-topic --from-beginning --bootstrap-server kafka1.your_domain:9092Вы увидите прочитанные сообщения из темы:
OutputHello World!
...Моделирование отказа узла
На третьем узле Kafka остановите службу, выполнив следующее:
sudo systemctl stop kafkaЗатем объясните тему с помощью практики:
./bin/kafka-topics.sh --describe --bootstrap-server kafka1.your_domain:9092 --topic first-topicВывод будет примерно таким:
OutputTopic: first-topic TopicId: 4kVImoFNTQeyk3r2zQbdvw PartitionCount: 6 ReplicationFactor: 2 Configs: segment.bytes=1073741824
Topic: first-topic Partition: 0 Leader: 1 Replicas: 3,1 Isr: 1
Topic: first-topic Partition: 1 Leader: 1 Replicas: 1,2 Isr: 1,2
Topic: first-topic Partition: 2 Leader: 2 Replicas: 2,3 Isr: 2
Topic: first-topic Partition: 3 Leader: 1 Replicas: 1,3 Isr: 1
Topic: first-topic Partition: 4 Leader: 2 Replicas: 3,2 Isr: 2
Topic: first-topic Partition: 5 Leader: 2 Replicas: 2,1 Isr: 2,1Хотя узел 3 указан как реплика, он отсутствует в наборах ISR, поскольку недоступен. При повторном присоединении к кластеру он синхронизируется с другими узлами и пытается восстановить прежнее положение.
Попробуйте еще раз прочитать сообщения в первой теме:
./bin/kafka-console-consumer.sh --topic first-topic --from-beginning --bootstrap-server kafka1.your_domain:9092Вы увидите, что они доступны как обычно:
OutputHello World!
...Благодаря наличию реплик первые два узла берут на себя управление и обслуживают потребителя. Теперь можно запустить Kafka на третьем сервере:
sudo systemctl start kafkaНа этом этапе вы увидели, как Kafka справляется с недоступностью узла в кластере. Теперь вы узнаете, как удалить узел из кластера.
Шаг 3 — Передача данных между узлами
На этом этапе вы узнаете, как перемещать темы между узлами в кластере Kafka. При добавлении узла в существующий кластер с темами Kafka не перемещает в него автоматически никакие разделы, что может быть полезно. Это также полезно при удалении узлов, поскольку существующие разделы не переносятся автоматически на оставшиеся узлы.
Kafka предоставляет скрипт kafka-reassign-partitions.sh, который может генерировать, выполнять и проверять планы миграции. Вы будете использовать его для создания плана миграции разделов первой темы на первые два узла.
Сначала нужно указать, какие темы следует перенести. Скрипт принимает JSON-файл с определениями тем, поэтому создайте и откройте его для редактирования:
vi topics-to-move.jsonДобавьте следующие строки:
{
"topics": [
{
"topic": "first-topic"
}
],
"version": 1
}В разделе «Заголовки» определите объект, ссылающийся на первую тему. После этого сохраните и закройте файл.
Выполните следующую команду для создания плана миграции, заменив kafka1.your_domain на домен, указывающий на один из ваших узлов Kafka:
./bin/kafka-reassign-partitions.sh --bootstrap-server kafka1.your_domain:9092 --topics-to-move-json-file topics-to-move.json --broker-list "1,2" --generateВы передаете “1,2” в –broker-list, который представляет собой идентификатор целевых брокеров.
Вывод будет примерно таким:
OutputCurrent partition replica assignment
{"version":1,"partitions":[{"topic":"first-topic","partition":0,"replicas":[3,1],"log_dirs":["any","any"]},{"topic":"first-topic","partition":1,"replicas":[1,2],"log_dirs":["any","any"]},{"topic":"first-topic","partition":2,"replicas":[2,3],"log_dirs":["any","any"]},{"topic":"first-topic","partition":3,"replicas":[1,3],"log_dirs":["any","any"]},{"topic":"first-topic","partition":4,"replicas":[3,2],"log_dirs":["any","any"]},{"topic":"first-topic","partition":5,"replicas":[2,1],"log_dirs":["any","any"]}]}
Proposed partition reassignment configuration
{"version":1,"partitions":[{"topic":"first-topic","partition":0,"replicas":[2,1],"log_dirs":["any","any"]},{"topic":"first-topic","partition":1,"replicas":[1,2],"log_dirs":["any","any"]},{"topic":"first-topic","partition":2,"replicas":[2,1],"log_dirs":["any","any"]},{"topic":"first-topic","partition":3,"replicas":[1,2],"log_dirs":["any","any"]},{"topic":"first-topic","partition":4,"replicas":[2,1],"log_dirs":["any","any"]},{"topic":"first-topic","partition":5,"replicas":[1,2],"log_dirs":["any","any"]}]}Этот скрипт создал два плана, описывающих текущую и предлагаемую структуру разделов соответственно. Первый план предоставляется на случай, если вы захотите отменить изменения позже. Обратите внимание на второй план, который вы сохраните в отдельном файле с именем migration-plan.json. Создайте его и откройте для редактирования:
vi migration-plan.jsonДобавьте второй исполняемый файл:
{"version":1,"partitions":[{"topic":"first-topic","partition":0,"replicas":[2,1],"log_dirs":["any","any"]},{"topic":"first-topic","partition":1,"replicas":[1,2],"log_dirs":["any","any"]},{"topic":"first-topic","partition":2,"replicas":[2,1],"log_dirs":["any","any"]},{"topic":"first-topic","partition":3,"replicas":[1,2],"log_dirs":["any","any"]},{"topic":"first-topic","partition":4,"replicas":[2,1],"log_dirs":["any","any"]},{"topic":"first-topic","partition":5,"replicas":[1,2],"log_dirs":["any","any"]}]}Сохраните и закройте файл. Затем выполните следующую команду для его запуска:
./bin/kafka-reassign-partitions.sh --bootstrap-server kafka1.your_domain:9092 --reassignment-json-file migration-plan.json --executeРезультат будет следующим:
OutputCurrent partition replica assignment
{"version":1,"partitions":[{"topic":"first-topic","partition":0,"replicas":[3,1],"log_dirs":["any","any"]},{"topic":"first-topic","partition":1,"replicas":[1,2],"log_dirs":["any","any"]},{"topic":"first-topic","partition":2,"replicas":[2,3],"log_dirs":["any","any"]},{"topic":"first-topic","partition":3,"replicas":[1,3],"log_dirs":["any","any"]},{"topic":"first-topic","partition":4,"replicas":[3,2],"log_dirs":["any","any"]},{"topic":"first-topic","partition":5,"replicas":[2,1],"log_dirs":["any","any"]}]}
Save this to use as the --reassignment-json-file option during rollback
Successfully started partition reassignments for first-topic-0,first-topic-1,first-topic-2,first-topic-3,first-topic-4,first-topic-5Скрипт сообщил о начале миграции. Чтобы увидеть ход выполнения, введите --verify:
./bin/kafka-reassign-partitions.sh --bootstrap-server kafka1.your_domain:9092 --reassignment-json-file migration-plan.json --verifyЧерез некоторое время вывод будет выглядеть так:
OutputStatus of partition reassignment:
Reassignment of partition first-topic-0 is completed.
Reassignment of partition first-topic-1 is completed.
Reassignment of partition first-topic-2 is completed.
Reassignment of partition first-topic-3 is completed.
Reassignment of partition first-topic-4 is completed.
Reassignment of partition first-topic-5 is completed.
Clearing broker-level throttles on brokers 1,2,3
Clearing topic-level throttles on topic first-topicТеперь можно объяснить первую проблему, чтобы проверить отсутствие разделов на Брокере 3:
./bin/kafka-topics.sh --describe --bootstrap-server kafka1.your_domain:9092 --topic first-topicРезультат будет следующим:
OutputTopic: first-topic TopicId: 4kVImoFNTQeyk3r2zQbdvw PartitionCount: 6 ReplicationFactor: 2 Configs: segment.bytes=1073741824
Topic: first-topic Partition: 0 Leader: 2 Replicas: 2,1 Isr: 1,2
Topic: first-topic Partition: 1 Leader: 1 Replicas: 1,2 Isr: 1,2
Topic: first-topic Partition: 2 Leader: 2 Replicas: 2,1 Isr: 2,1
Topic: first-topic Partition: 3 Leader: 1 Replicas: 1,2 Isr: 1,2
Topic: first-topic Partition: 4 Leader: 2 Replicas: 2,1 Isr: 2,1
Topic: first-topic Partition: 5 Leader: 1 Replicas: 1,2 Isr: 2,1Только брокеры 1 и 2 присутствуют в качестве реплик и ISR, что означает, что миграция прошла успешно.
На этом этапе вы создали план миграции для перемещения первой темы от брокера 3 к оставшимся темам и узнали, как проверить, что миграция прошла гладко.
Результат
Теперь у вас есть кластер Kafka, состоящий из трёх узлов, взаимодействующих по протоколу KRaft. Вы также научились проверять кластер и структуру разделов. Вы проверили избыточность кластера, отключив узел и прочитав данные из топика. Наконец, вы научились переключать топики на узлы кластера.









