Введение
Apache Kafka — это распределённая платформа обработки событий и потоков данных с открытым исходным кодом, написанная на Java и предназначенная для обработки потоков данных в режиме реального времени. Она обладает изначальной масштабируемостью, высокой пропускной способностью и доступностью. Разработанная Apache Software Foundation, платформа Kafka получила широкое распространение благодаря своей надёжности, простоте использования и отказоустойчивости. Крупнейшие организации мира используют её для эффективного и распределённого управления большими объёмами данных.
В этом руководстве вы скачаете и настроите Apache Kafka. Вы научитесь создавать и удалять темы, а также отправлять и получать события с помощью предоставленных скриптов. Вы также узнаете о похожих проектах с той же целью и о том, чем Kafka отличается от других.
Предпосылки
- Машина с не менее чем 4 ГБ оперативной памяти и двумя процессорами. В случае Ubuntu Server
- На вашем Droplet или локальном компьютере установлена Java 8 или выше.
Шаг 1 — Загрузка и настройка Apache Kafka
В этом разделе вы скачаете и распакуете Apache Kafka на свой компьютер. Для дополнительной безопасности вы настроите его под своей учётной записью. Затем вы настроите и запустите его с помощью KRaft.
Сначала создайте отдельного пользователя, под которым будет работать Kafka. Создайте пользователя с именем kafka, выполнив следующую команду:
sudo adduser kafkaВам будет предложено ввести пароль вашей учётной записи. Введите надёжный пароль и пропустите заполнение дополнительной информации, нажимая клавишу ENTER в каждом поле.
Наконец, переключитесь на конкретного пользователя Kafka:
su kafkaДалее вам нужно скачать релизный пакет Kafka с официальной страницы загрузок. На момент написания статьи последней версией была 3.7.0, собранная для Scala 2.13. Если вы используете macOS или Linux, вы можете скачать Kafka с помощью curl.
Используйте эту команду для загрузки Kafka и помещения его в /tmp:
curl -o /tmp/kafka.tgz https://downloads.apache.org/kafka/3.7.0/kafka_2.13-3.7.0.tgzВерсию нужно сохранить в папке ~/kafka в основном каталоге. Создайте её, выполнив:
mkdir ~/kafkaЗатем извлеките его в ~/kafka, выполнив:
tar -xzf /tmp/kafka.tgz -C ~/kafka --strip-components=1Поскольку загруженный вами архив содержит корневую папку с тем же именем, что и версия Kafka, –strip-components=1 пропустит ее и извлечет все ее содержимое.
На момент написания статьи Kafka 3 был последним крупным релизом, поддерживающим две системы управления метаданными: Apache ZooKeeper и Kafka KRaft (сокращение от Kafka Raft). ZooKeeper — это проект с открытым исходным кодом, предоставляющий стандартный способ координации распределённых данных для приложений, также разработанный Apache Software Foundation.
Однако, начиная с версии Kafka 3.3, появилась поддержка KRaft. KRaft — это специализированная система для координации только экземпляров Kafka, упрощающая процесс установки и обеспечивающая значительно большую масштабируемость. С KRaft Kafka сама берёт на себя полную ответственность за данные, а не хранит административные метаданные вне системы.
Хотя ZooKeeper всё ещё доступен, ожидается, что его поддержка будет прекращена в Kafka 4 и более поздних версиях. В этом руководстве вы настроите Kafka с помощью KRaft.
Вам необходимо создать уникальный идентификатор для нового кластера Kafka. В настоящее время он состоит только из одного узла. Перейдите в каталог, где сейчас находится Kafka:
cd ~/kafkaKafka с KRaft хранит свою конфигурацию в файле config/kraft/server.properties, тогда как файл конфигурации ZooKeeper — это config/server.properties.
Перед первым запуском необходимо изменить некоторые настройки по умолчанию. Откройте файл для редактирования, выполнив команду:
nano config/kraft/server.propertiesНайдите следующие строки:
...
############################# Log Basics #############################
# A comma separated list of directories under which to store log files
log.dirs=/tmp/kafka-logs
...Параметр log.dirs определяет, где Kafka хранит свои файлы журналов. По умолчанию они хранятся в /tmp/kafka-logs, поскольку они гарантированно доступны для записи, пусть и временно. Замените значение на указанный путь:
... ############################# Log Basics ############################# # A comma separated list of directories under which to store log files log.dirs=/home/kafka/kafka-logs ...
Поскольку вы создали отдельного пользователя для Kafka, путь к каталогу журнала будет проложен в его домашнем каталоге. Если каталог не существует, Kafka его создаст. После завершения сохраните и закройте файл.
Теперь, когда вы настроили Kafka, выполните следующую команду, чтобы сгенерировать случайный идентификатор кластера:
KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"Затем создайте место для хранения файлов журнала, выполнив следующую команду и введя идентификатор:
bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c config/kraft/server.propertiesРезультат будет следующим:
Output
Formatting /home/kafka/kafka-logs with metadata.version 3.7-IV4.Наконец, вы можете впервые запустить сервер Kafka:
bin/kafka-server-start.sh config/kraft/server.propertiesКонечный вывод будет примерно таким:
Output
...
[2024-02-26 10:38:26,889] INFO Awaiting socket connections on 0.0.0.0:9092. (kafka.network.DataPlaneAcceptor)
[2024-02-26 10:38:26,890] INFO [BrokerServer id=1] Waiting for all of the authorizer futures to be completed (kafka.server.BrokerServer)
[2024-02-26 10:38:26,890] INFO [BrokerServer id=1] Finished waiting for all of the authorizer futures to be completed (kafka.server.BrokerServer)
[2024-02-26 10:38:26,890] INFO [BrokerServer id=1] Waiting for all of the SocketServer Acceptors to be started (kafka.server.BrokerServer)
[2024-02-26 10:38:26,890] INFO [BrokerServer id=1] Finished waiting for all of the SocketServer Acceptors to be started (kafka.server.BrokerServer)
[2024-02-26 10:38:26,890] INFO [BrokerServer id=1] Transition from STARTING to STARTED (kafka.server.BrokerServer)
[2024-02-26 10:38:26,891] INFO Kafka version: 3.7.0 (org.apache.kafka.common.utils.AppInfoParser)
[2024-02-26 10:38:26,891] INFO Kafka commitId: 5e3c2b738d253ff5 (org.apache.kafka.common.utils.AppInfoParser)
[2024-02-26 10:38:26,891] INFO Kafka startTimeMs: 1708943906890 (org.apache.kafka.common.utils.AppInfoParser)
[2024-02-26 10:38:26,892] INFO [KafkaRaftServer nodeId=1] Kafka Server started (kafka.server.KafkaRaftServer)Выходные данные показывают, что Kafka успешно инициализировался с помощью KRaft и принимает соединения по адресу 0.0.0.0:9092.
Процесс завершается при нажатии клавиш CTRL + C. Поскольку запускать Kafka при открытом сеансе нежелательно, на следующем шаге вы создадите службу для запуска Kafka в фоновом режиме.
Шаг 2 — Создание службы systemd для Kafka
В этом разделе вы создадите службу systemd для постоянной работы Kafka в фоновом режиме. Службы systemd можно запускать, останавливать и перезапускать непрерывно.
Конфигурация службы хранится в файле с именем code-server.service в каталоге /lib/systemd/system, где systemd хранит свои службы. Создайте его с помощью текстового редактора:
sudo nano /etc/systemd/system/kafka.serviceДобавьте следующие строки:
[Unit]
Description=kafka-server
[Service]
Type=simple
User=kafka
ExecStart=/bin/sh -c '/home/kafka/kafka/bin/kafka-server-start.sh /home/kafka/kafka/config/kraft/server.properties > /home/kafka/kafka/kafka.log 2>&1'
ExecStop=/home/kafka/kafka/bin/kafka-server-stop.sh
Restart=on-abnormal
[Install]
WantedBy=multi-user.targetЗдесь сначала указывается описание службы. Затем в поле [Служба] указывается тип службы (простая означает, что команда должна быть выполнена просто) и указывается команда, которая будет выполнена. Также указывается, что пользователь, от имени которого будет запущена служба, — kafka, и что служба должна автоматически перезапускаться при завершении работы Kafka.
Раздел [Install] предписывает системе запустить эту службу, как только вы сможете войти на сервер. После завершения сохраните и закройте файл.
Запустите службу Kafka, выполнив следующую команду:
sudo systemctl start kafkaПроверьте правильность запуска, просмотрев его статус:
sudo systemctl status kafkaВы увидите вывод, подобный следующему:
Output
● kafka.service - kafka-server
Loaded: loaded (/etc/systemd/system/kafka.service; disabled; preset: enabled)
Active: active (running) since Mon 2024-02-26 11:17:30 UTC; 2min 40s ago
Main PID: 1061 (sh)
Tasks: 94 (limit: 4646)
Memory: 409.2M
CPU: 10.491s
CGroup: /system.slice/kafka.service
├─1061 /bin/sh -c "/home/kafka/kafka/bin/kafka-server-start.sh /home/kafka/kafka/config/kraft/server.properties > /home/kafka/kafka/kafka.log 2>&1"
└─1062 java -Xmx1G -Xms1G -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -XX:MaxInlineLevel=15 -Djava.awt.headless=true "-Xlog:gc*:file=/home/kafka/kafka/bin/../logs/kaf>
Feb 26 11:17:30 kafka-test1 systemd[1]: Started kafka.service - kafka-server.Для автоматического запуска Kafka после перезапуска сервера включите его службу, выполнив следующую команду:
sudo systemctl enable kafkaНа этом этапе вы создали и включили службу systemd для Kafka, которая запускается при каждой загрузке сервера. Далее вы узнаете, как создавать и удалять темы в Kafka, а также как создавать и получать текстовые сообщения с помощью доступных скриптов.
Шаг 3 – Создание и использование тематических сообщений
Теперь, когда вы настроили сервер Kafka, вы познакомитесь с темами и научитесь управлять ими с помощью предоставленных скриптов. Вы также научитесь отправлять и получать сообщения из темы.
Как объясняется в статье «Поток событий», публикация и получение сообщений связаны с темами. Тема может быть связана с категорией, к которой принадлежит сообщение.
Предоставленный скрипт kafka-topics.sh можно использовать для управления темами в Kafka через CLI. Чтобы создать тему с именем first-topic, выполните следующую команду:
bin/kafka-topics.sh --create --topic first-topic --bootstrap-server localhost:9092Все предоставленные скрипты Kafka требуют указания адреса сервера с помощью --bootstrap-server.
Результат будет следующим:
Output
Created topic first-topic.Чтобы вывести список всех существующих тем, замените –create на –list:
bin/kafka-topics.sh --list --bootstrap-server localhost:9092Вы видите созданную вами тему:
Output
first-topicПодробную информацию и статистику по теме можно получить, передав --describe:
bin/kafka-topics.sh --describe --topic first-topic --bootstrap-server localhost:9092Вывод будет выглядеть так:
Output
Topic: first-topic TopicId: VtjiMIUtRUulwzxJL5qVjg PartitionCount: 1 ReplicationFactor: 1 Configs: segment.bytes=1073741824
Topic: first-topic Partition: 0 Leader: 1 Replicas: 1 Isr: 1В первой строке указаны имя темы, идентификатор и фактор повторения, который равен 1, поскольку тема существует только на текущем компьютере. Вторая строка намеренно смещена и содержит информацию о первом (и единственном) разделе темы. Kafka позволяет разбить тему на разделы, то есть различные её части могут быть распределены по разным серверам, что повышает масштабируемость. В данном случае раздел только один.
Теперь, когда вы создали тему, вы будете генерировать для неё сообщения с помощью скрипта kafka-console-producer.sh. Выполните следующую команду, чтобы запустить продюсер:
bin/kafka-console-producer.sh --topic first-topic --bootstrap-server localhost:9092Вы увидите пустое уведомление:
>Провайдер ждёт вашего SMS. Введите тест и нажмите ENTER. Уведомление будет выглядеть так:
>test
>Теперь производитель ожидает следующее сообщение, что означает, что предыдущее сообщение успешно доставлено в Kafka. Вы можете ввести любое количество сообщений для тестирования. Чтобы выйти из производителя, нажмите CTRL+C.
Для получения сообщений темы вам понадобится потребитель. Kafka предоставляет простой потребитель в виде kafka-console-consumer.sh. Запустите его, выполнив:
bin/kafka-console-consumer.sh --topic first-topic --bootstrap-server localhost:9092Однако вывода не будет. Это связано с тем, что потребитель ведёт потоковую передачу данных из темы, и в данный момент ничего не создаётся и не отправляется. Чтобы получить сообщения, созданные до начала работы потребителя, необходимо прочитать тему с самого начала, выполнив:
bin/kafka-console-consumer.sh --topic first-topic --from-beginning --bootstrap-server localhost:9092Потребитель воспроизводит все события темы и извлекает сообщения:
Outputtest
...Как и в случае с конструктором, для выхода нажмите CTRL+C.
Чтобы убедиться, что потребитель действительно транслирует данные, откройте его в отдельном сеансе терминала. Откройте дополнительный сеанс SSH и запустите потребитель в конфигурации по умолчанию:
bin/kafka-console-consumer.sh --topic first-topic --bootstrap-server localhost:9092В первом сеансе запустите конструктор:
bin/kafka-console-producer.sh --topic first-topic --bootstrap-server localhost:9092Затем введите желаемые сообщения:
>second test
>third test
>Вы сразу увидите их получение потребителем:
Output
second test
third testПосле завершения тестирования остановите как производителя, так и потребителя.
Чтобы удалить первую тему, передайте --delete в kafka-topics.sh:
bin/kafka-topics.sh --delete --topic first-topic --bootstrap-server localhost:9092Вывода не будет. Вы можете просмотреть список тем, чтобы убедиться, что они действительно были удалены:
bin/kafka-topics.sh --list --bootstrap-server localhost:9092Результат будет следующим:
Output
__consumer_offsets__Consumer_Equivalent — это внутренняя тема Kafka, в которой хранится количество времени, в течение которого потребитель читал тему.
На этом этапе вы создали тему Kafka и создали в ней сообщения. Затем вы обработали сообщения с помощью предоставленного скрипта и получили их в режиме реального времени. Далее вы узнаете, чем Kafka отличается от других брокеров событий и аналогичного программного обеспечения.
Сравнение с аналогичными архитектурами
Apache Kafka считается фактическим решением для потоковой передачи событий. Однако Apache Pulsar и RabbitMQ также широко используются и выделяются как универсальные решения, хотя и имеют различия в подходе.
Основное различие между очередью сообщений и потоком событий заключается в том, что основная задача первой — максимально быстрая доставка сообщений получателям, независимо от их порядка. Такие системы обычно хранят сообщения в памяти до тех пор, пока получатели не подтвердят их получение. Фильтрация и маршрутизация сообщений играют важную роль, поскольку получатели могут проявлять интерес к определённым категориям данных. RabbitMQ — яркий пример традиционной системы обмена сообщениями, в которой несколько получателей могут подписаться на тему и получать несколько копий сообщения.
С другой стороны, потоковая передача событий ориентирована на сохранение. События должны архивироваться, поддерживаться и обрабатываться один раз. Маршрутизация их конкретным получателям не важна, поскольку идея заключается в том, что все получатели обрабатывают события одинаково.
Apache Pulsar — это система обмена сообщениями с открытым исходным кодом, разработанная Apache Software Foundation и поддерживающая потоковую передачу событий. В отличие от Kafka, на основе которой она была разработана с нуля, Pulsar изначально создавался как традиционное решение для организации очередей сообщений, а затем обзавёлся возможностями потоковой передачи событий. Таким образом, Pulsar полезен, когда требуется сочетание обоих подходов без необходимости развертывания отдельных приложений.
Результат
Теперь Apache Kafka безопасно работает в фоновом режиме на вашем сервере, настроенном как системная служба. Вы также научились работать с темами из командной строки, а также создавать и получать сообщения. Однако главное преимущество Kafka — широкий выбор клиентов для интеграции в ваши приложения.









