Введение в Кафку

0 Акции
0
0
0
0

Введение

Apache Kafka — это распределённая платформа обработки событий и потоков данных с открытым исходным кодом, написанная на Java и предназначенная для обработки потоков данных в режиме реального времени. Она обладает изначальной масштабируемостью, высокой пропускной способностью и доступностью. Разработанная Apache Software Foundation, платформа Kafka получила широкое распространение благодаря своей надёжности, простоте использования и отказоустойчивости. Крупнейшие организации мира используют её для эффективного и распределённого управления большими объёмами данных.

В этом руководстве вы скачаете и настроите Apache Kafka. Вы научитесь создавать и удалять темы, а также отправлять и получать события с помощью предоставленных скриптов. Вы также узнаете о похожих проектах с той же целью и о том, чем Kafka отличается от других.

Предпосылки
  • Устройство с объемом оперативной памяти не менее 4 ГБ и двумя процессорами.
  • На вашем Droplet или локальном компьютере установлена Java 8 или выше.

Шаг 1 — Загрузка и настройка Apache Kafka

В этом разделе вы скачаете и распакуете Apache Kafka на свой компьютер. Для дополнительной безопасности вы настроите его под своей учётной записью. Затем вы настроите и запустите его с помощью KRaft.

Сначала создайте отдельного пользователя, под которым будет работать Kafka. Выполнив следующую команду, вы создадите пользователя с именем Кафка Создавать:

sudo adduser kafka

Вам будет предложено ввести пароль вашей учётной записи. Введите надёжный пароль и нажмите Enter. ВХОДИТЬ Для каждого поля пропустите заполнение дополнительной информации.

Наконец, переключитесь на конкретного пользователя Kafka:

su kafka

Далее вам нужно скачать релизный пакет Kafka с официальной страницы загрузок. На момент написания статьи последняя версия — 3.6.1. Если вы используете macOS или Linux, вы можете скачать Kafka с помощью curl.

Используйте эту команду, чтобы загрузить Kafka и установить его в /tmp Место:

curl -o /tmp/kafka.tgz https://dlcdn.apache.org/kafka/3.6.1/kafka_2.13-3.6.1.tgz

У вас есть версия ниже ~/кафка, вы сохраните его в основном каталоге. Создайте его, выполнив:

mkdir ~/kafka

Затем, запустив его, ~/кафка Извлекать:

tar -xzf /tmp/kafka.tgz -C ~/kafka --strip-components=1

Поскольку загруженный вами архив содержит корневую папку с тем же именем, что и версия Kafka, –strip-components=1 пропустит ее и извлечет все ее содержимое.

На момент написания статьи Kafka 3 был последним крупным релизом, поддерживающим две системы управления метаданными: Apache ZooKeeper и Kafka KRaft (сокращение от Kafka Raft). ZooKeeper — это проект с открытым исходным кодом, предоставляющий стандартный способ координации распределённых данных для приложений, также разработанный Apache Software Foundation.

Однако, начиная с версии Kafka 3.3, появилась поддержка KRaft. KRaft — это специализированная система для координации только экземпляров Kafka, упрощающая процесс установки и обеспечивающая значительно большую масштабируемость. С KRaft Kafka сама берёт на себя полную ответственность за данные, а не хранит административные метаданные вне системы.

Хотя ZooKeeper всё ещё доступен, ожидается, что его поддержка будет прекращена в Kafka 4 и более поздних версиях. В этом руководстве вы настроите Kafka с помощью KRaft.

Вам необходимо создать уникальный идентификатор для нового кластера Kafka. Пока он будет состоять всего из одного узла. Перейдите в каталог, где в данный момент находится Kafka:

cd ~/kafka

Кафка с KRaft настраивается в config/kraft/server.properties сохраняет, в то время как файл конфигурации ZooKeeper config/server.properties Это.

Перед первым запуском необходимо изменить некоторые настройки по умолчанию. Откройте файл для редактирования, выполнив команду:

nano config/kraft/server.properties
...
############################# Log Basics #############################
# A comma separated list of directories under which to store log files
log.dirs=/tmp/kafka-logs
...

Настройки log.dirs Указывает, где Kafka хранит свои файлы журналов. По умолчанию они хранятся в /tmp/kafka-logs сохраняет их, поскольку они гарантированно доступны для записи, пусть и временно. Замените значение на указанный путь:

...
############################# Log Basics #############################
# A comma separated list of directories under which to store log files
log.dirs=/home/kafka/kafka-logs
...

Поскольку вы создали отдельного пользователя для Kafka, путь к каталогу журнала будет проложен в его домашнем каталоге. Если каталог не существует, Kafka его создаст. После завершения сохраните и закройте файл.

Теперь, когда вы настроили Kafka, выполните следующую команду, чтобы сгенерировать случайный идентификатор кластера:

KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"

Затем создайте место для хранения файлов журнала, выполнив следующую команду и введя идентификатор:

bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c config/kraft/server.properties

Результат будет следующим:

Output
Formatting /home/kafka/kafka-logs with metadata.version 3.6-IV2.

Наконец, вы можете впервые запустить сервер Kafka:

bin/kafka-server-start.sh config/kraft/server.properties

Конечный вывод будет примерно таким:

Output
...
[2024-02-26 10:38:26,889] INFO Awaiting socket connections on 0.0.0.0:9092. (kafka.network.DataPlaneAcceptor)
[2024-02-26 10:38:26,890] INFO [BrokerServer id=1] Waiting for all of the authorizer futures to be completed (kafka.server.BrokerServer)
[2024-02-26 10:38:26,890] INFO [BrokerServer id=1] Finished waiting for all of the authorizer futures to be completed (kafka.server.BrokerServer)
[2024-02-26 10:38:26,890] INFO [BrokerServer id=1] Waiting for all of the SocketServer Acceptors to be started (kafka.server.BrokerServer)
[2024-02-26 10:38:26,890] INFO [BrokerServer id=1] Finished waiting for all of the SocketServer Acceptors to be started (kafka.server.BrokerServer)
[2024-02-26 10:38:26,890] INFO [BrokerServer id=1] Transition from STARTING to STARTED (kafka.server.BrokerServer)
[2024-02-26 10:38:26,891] INFO Kafka version: 3.6.1 (org.apache.kafka.common.utils.AppInfoParser)
[2024-02-26 10:38:26,891] INFO Kafka commitId: 5e3c2b738d253ff5 (org.apache.kafka.common.utils.AppInfoParser)
[2024-02-26 10:38:26,891] INFO Kafka startTimeMs: 1708943906890 (org.apache.kafka.common.utils.AppInfoParser)
[2024-02-26 10:38:26,892] INFO [KafkaRaftServer nodeId=1] Kafka Server started (kafka.server.KafkaRaftServer)

Вывод показывает, что Kafka успешно инициализировался с помощью KRaft и создает соединения в 0.0.0.0:9092 Принимает.

Когда CTRL + C Нажмите кнопку , и процесс завершится. Поскольку запуск Kafka в открытом сеансе нежелателен, на следующем шаге вы создадите службу для запуска Kafka в фоновом режиме.

Шаг 2 — Создание службы systemd для Kafka

В этом разделе вы создадите службу systemd для постоянной работы Kafka в фоновом режиме. Службы systemd можно запускать, останавливать и перезапускать непрерывно.

Поместите конфигурацию службы в файл с именем code-server.service В списке /lib/systemd/system Вы экономите, где systemd Он хранит ваши услуги. Создайте его с помощью текстового редактора:

sudo nano /etc/systemd/system/kafka.service

Добавьте следующие строки:

[Unit]
Description=kafka-server
[Service]
Type=simple
User=kafka
ExecStart=/bin/sh -c '/home/kafka/kafka/bin/kafka-server-start.sh /home/kafka/kafka/config/kraft/server.properties > /home/kafka/kafka/kafka.log 2>&1'
ExecStop=/home/kafka/kafka/bin/kafka-server-stop.sh
Restart=on-abnormal
[Install]
WantedBy=multi-user.target

Здесь вы сначала указываете описание услуги. Затем в поле [Услуга] Вы определяете тип службы (простой означает, что команда должна быть запущена просто) и указываете команду, которая будет запущена. Вы также указываете пользователя, от имени которого она будет запущена. Кафка и служба должна автоматически перезапуститься, если Kafka завершит работу.

Раздел [установить] сообщает системе о необходимости запуска этой службы, когда станет возможным вход на ваш сервер. После завершения сохраните и закройте файл.

Запустите службу Kafka, выполнив следующую команду:

sudo systemctl start kafka

Проверьте правильность запуска, просмотрев его статус:

sudo systemctl status kafka

Вы увидите вывод, подобный следующему:

Output
● kafka.service - kafka-server
Loaded: loaded (/etc/systemd/system/kafka.service; disabled; preset: enabled)
Active: active (running) since Mon 2024-02-26 11:17:30 UTC; 2min 40s ago
Main PID: 1061 (sh)
Tasks: 94 (limit: 4646)
Memory: 409.2M
CPU: 10.491s
CGroup: /system.slice/kafka.service
├─1061 /bin/sh -c "/home/kafka/kafka/bin/kafka-server-start.sh /home/kafka/kafka/config/kraft/server.properties > /home/kafka/kafka/kafka.log 2>&1"
└─1062 java -Xmx1G -Xms1G -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -XX:MaxInlineLevel=15 -Djava.awt.headless=true "-Xlog:gc*:file=/home/kafka/kafka/bin/../logs/kaf>
Feb 26 11:17:30 kafka-test1 systemd[1]: Started kafka.service - kafka-server.

Для автоматического запуска Kafka после перезапуска сервера включите его службу, выполнив следующую команду:

sudo systemctl enable kafka

На этом этапе вы создали и включили службу systemd для Kafka, которая запускается при каждой загрузке сервера. Далее вы узнаете, как создавать и удалять темы в Kafka, а также как создавать и получать текстовые сообщения с помощью доступных скриптов.

Шаг 3 – Создание и использование тематических сообщений

Теперь, когда вы настроили сервер Kafka, вы познакомитесь с темами и научитесь управлять ими с помощью предоставленных скриптов. Вы также научитесь отправлять и получать сообщения из темы. Как объясняется в статье «Поток событий», публикация и получение сообщений связаны с темами. Тема может быть связана с категорией, к которой принадлежит сообщение.

Из предоставленного сценария kafka-topics.sh Вы можете управлять темами в Kafka через CLI Используется для создания темы под названием первая тема Выполните следующую команду:

bin/kafka-topics.sh --create --topic first-topic --bootstrap-server localhost:9092

Все предоставленные скрипты Kafka требуют, чтобы адрес сервера был указан с помощью --bootstrap-сервер Указать.

Результат будет следующим:

Output
Created topic first-topic.

Чтобы перечислить все доступные темы, вместо --создавать В --список Отправлять:

bin/kafka-topics.sh --list --bootstrap-server localhost:9092

Вы видите созданную вами тему:

Output
first-topic

Подробную информацию и статистику по теме вы можете найти, перейдя по ссылке --описывать Получать:

bin/kafka-topics.sh --describe --topic first-topic --bootstrap-server localhost:9092

Вывод будет выглядеть так:

Output
Topic: first-topic TopicId: VtjiMIUtRUulwzxJL5qVjg PartitionCount: 1 ReplicationFactor: 1 Configs: segment.bytes=1073741824
Topic: first-topic Partition: 0 Leader: 1 Replicas: 1 Isr: 1

В первой строке указаны имя темы, идентификатор и фактор повторения, который равен 1, поскольку тема существует только на текущем компьютере. Вторая строка намеренно смещена и содержит информацию о первом (и единственном) разделе темы. Kafka позволяет разбить тему на разделы, то есть различные её части могут быть распределены по разным серверам, что повышает масштабируемость. В данном случае раздел только один.

Теперь, когда вы создали тему, вы будете генерировать для неё сообщения с помощью скрипта kafka-console-producer.sh. Выполните следующую команду, чтобы запустить продюсер:

bin/kafka-console-producer.sh --topic first-topic --bootstrap-server localhost:9092

Вы увидите пустое уведомление:

>

Продюсер ждёт вашего СМС. Пройдите тест и ВХОДИТЬ Нажмите . Уведомление будет выглядеть так:

>test
>

Теперь производитель ожидает следующее сообщение, что означает, что предыдущее сообщение успешно доставлено в Kafka. Вы можете ввести любое количество сообщений для тестирования. Чтобы выйти из производителя, CTRL+C Нажимать .

Для извлечения сообщений из темы вам нужен потребитель. Kafka предоставляет простой потребитель в виде kafka-console-consumer.sh Он предлагает. Запустите его, выполнив:

bin/kafka-console-consumer.sh --topic first-topic --bootstrap-server localhost:9092

Однако вывода не будет. Это связано с тем, что потребитель ведёт потоковую передачу данных из темы, и в данный момент ничего не создаётся и не отправляется. Чтобы получить сообщения, созданные до начала работы потребителя, необходимо прочитать тему с самого начала, выполнив:

bin/kafka-console-consumer.sh --topic first-topic --from-beginning --bootstrap-server localhost:9092

Потребитель воспроизводит все события темы и извлекает сообщения:

Outputtest
...

Как строитель, чтобы выйти, CTRL+C Нажимать .

Чтобы убедиться, что потребитель действительно транслирует данные, откройте его в отдельном сеансе терминала. Откройте дополнительный сеанс SSH и запустите потребитель в конфигурации по умолчанию:

bin/kafka-console-consumer.sh --topic first-topic --bootstrap-server localhost:9092

В первом сеансе запустите конструктор:

bin/kafka-console-producer.sh --topic first-topic --bootstrap-server localhost:9092

Затем введите желаемые сообщения:

>second test
>third test
>

Вы сразу увидите, что они получены потребителем:

Output
second test
third test

После завершения тестирования остановите как производителя, так и потребителя.

Чтобы удалить первую тему, --удалить к kafka-topics.sh Передача:

bin/kafka-topics.sh --delete --topic first-topic --bootstrap-server localhost:9092

Вывода не будет. Вы можете просмотреть список тем, чтобы убедиться, что они действительно были удалены:

bin/kafka-topics.sh --list --bootstrap-server localhost:9092

Результат будет следующим:

Output
__consumer_offsets

__Consumer_Equivalent — это внутренняя тема Kafka, которая хранит время, потраченное потребителем на чтение темы. На этом этапе вы создали тему Kafka и создали в ней сообщения. Затем вы получили сообщения с помощью предоставленного скрипта и, наконец, получили их в режиме реального времени. На следующем этапе вы узнаете, чем Kafka отличается от других брокеров событий и аналогичного программного обеспечения.

Сравнение с аналогичными архитектурами

Apache Kafka считается эффективным решением для потоковых систем событий. Однако Apache Pulsar и RabbitMQ также широко используются и выделяются как универсальные решения, хотя и с разным подходом. Основное различие между очередями сообщений и потоками событий заключается в том, что главная задача первых — максимально быстрая доставка сообщений клиентам, независимо от их порядка. Такие системы обычно хранят сообщения в памяти до тех пор, пока они не будут подтверждены потребителями. Фильтрация и маршрутизация сообщений играют важную роль, поскольку потребители могут проявлять интерес к определённым категориям данных. RabbitMQ — яркий пример традиционной системы обмена сообщениями, где несколько потребителей могут подписываться на тему и получать несколько копий сообщения. Потоковая передача событий, с другой стороны, ориентирована на сохранение. События должны архивироваться, поддерживаться в аккуратном виде и обрабатываться один раз. Их маршрутизация к конкретным потребителям не важна, поскольку идея заключается в том, что все потребители обрабатывают события одинаково. Apache Pulsar — это система обмена сообщениями с открытым исходным кодом, разработанная Apache Software Foundation и поддерживающая потоковую передачу событий. В отличие от Kafka, на основе которого он был создан с нуля, Pulsar изначально создавался как традиционное решение для организации очередей сообщений, а затем обзавёлся возможностями потоковой передачи событий. Поэтому Pulsar полезен, когда требуется комбинация обоих методов без необходимости развертывания отдельных приложений.

Результат

Теперь Apache Kafka безопасно работает в фоновом режиме на вашем сервере, настроенном как системная служба. Вы также научились работать с темами из командной строки, а также создавать и получать сообщения. Однако главное преимущество Kafka — широкий выбор клиентов для интеграции в ваши приложения.

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *

Вам также может понравиться