Cómo configurar un clúster de Kafka de varios nodos con KCraft

0 acciones
0
0
0
0

Introducción

Apache Kafka es una plataforma distribuida de código abierto para el procesamiento de eventos y flujos, escrita en Java y diseñada para procesar feeds de datos en tiempo real. Es inherentemente escalable, con alto rendimiento y disponibilidad. Está diseñada para soportar cientos de nodos por clúster.

En este tutorial, creará un clúster de Kafka que utiliza el protocolo de consenso KRaft. Aprenderá a configurar nodos para que formen parte de un clúster y a ver cómo se asignan las particiones de temas a diferentes nodos. También aprenderá a asignar temas a agentes específicos del clúster.

Requisitos previos
  • Tres gotas con al menos 4 GB de RAM y 2 CPU
  • Un nombre de dominio completamente registrado con tres subdominios que apuntan a los tres puntos.
  • Apache Kafka está instalado y configurado en sus Droplets.

Paso 1: Configurar los nodos de Kafka

En este paso, configurará los tres servidores Kafka que creó como parte de los prerrequisitos para que formen parte del mismo clúster de KRaft. Con KRaft, los propios nodos pueden organizar y realizar tareas administrativas sin la sobrecarga adicional ni la dependencia de Apache ZooKeeper.

Configurando el primer nodo

Comenzará configurando el primer nodo. Primero, detenga el servicio en el primer Droplet ejecutando lo siguiente:

sudo systemctl stop kafka

Como usuario de Kafka, navegue al directorio donde se encuentra Kafka y abra su archivo de configuración para editarlo ejecutando lo siguiente:

vi /config/kraft/server.properties

Encuentra las siguientes líneas:

...
# The role of this server. Setting this puts us in KRaft mode
process.roles=broker,controller
# The node id associated with this instance's roles
node.id=1
# The connect string for the controller quorum
controller.quorum.voters=1@localhost:9092
...

Estos tres parámetros configuran el nodo de Kafka para que actúe como intermediario y controlador, lo que significa que recibe y consume datos (intermediario) y realiza tareas administrativas (controlador). Esta separación es útil en implementaciones grandes, donde los controladores pueden mantenerse separados para mayor eficiencia y redundancia.

node.id especifica el ID del nodo en el clúster. Este es el primer nodo, por lo que debe permanecer en 1. Todos los nodos deben tener un ID único; por lo tanto, el segundo y el tercer nodo tendrán los ID 2 y 3, respectivamente.

controller.quorum.voters asigna los ID de nodo a sus direcciones y puertos de comunicación correspondientes. Aquí se especifican las direcciones de todos los nodos del clúster para que cada nodo conozca a los demás. Modifique la línea para que se vea así:

...
[email protected]_domain:9093,[email protected]_domain:9093,[email protected]_domain:9093
...

Aquí, se enumeran los tres nodos del clúster con sus respectivos ID. Recuerde reemplazar "your_domain" por la dirección de dominio que configuró durante los prerrequisitos.

A continuación, busque las siguientes líneas en el archivo:

...
listeners=PLAINTEXT://:9092,CONTROLLER://:9093
# Name of listener used for communication between brokers.
inter.broker.listener.name=PLAINTEXT
# Listener name, hostname and port the broker will advertise to clients.
# If not set, it uses the value for "listeners".
advertised.listeners=PLAINTEXT://localhost:9092
...

listeners define las direcciones que el nodo de Kafka escucha, mientras que advertised.listeners especifica las direcciones que se envían a los clientes para conectarse al nodo. Esto permite especificar un subconjunto de las direcciones reales que los clientes deben usar.

Cambie las líneas para que se vean así, reemplazando your_domain con su nombre de dominio real:

...
listeners=PLAINTEXT://kafka1.your_domain:9092,CONTROLLER://kafka1.your_domain:9093
# Name of listener used for communication between brokers.
inter.broker.listener.name=PLAINTEXT
# Listener name, hostname and port the broker will advertise to clients.
# If not set, it uses the value for "listeners".
advertised.listeners=PLAINTEXT://kafka1.your_domain:9092
...

Dado que este nodo estará en un clúster, ha apuntado explícitamente las direcciones a la gota en la que se ejecutará.

Luego, busque la configuración num.partitions:

...
# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=1
...

Como indica el comentario, este es el número predeterminado de particiones configuradas para cada nuevo subproceso. Dado que tiene tres nodos, configúrelo como un múltiplo de dos:

...
num.partitions=6
...

El valor 6 aquí garantiza que cada nodo mantenga dos particiones de tema de forma predeterminada.

A continuación, configure el factor de iteración para los subprocesos internos, que mantienen las compensaciones de los consumidores y el estado de las transacciones. Busque las siguientes líneas:

...
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
...

Establezca los siguientes valores:

...
offsets.topic.replication.factor=2
transaction.state.log.replication.factor=2
...

Aquí se especifica que al menos dos nodos deben estar sincronizados con respecto a los metadatos internos. Al terminar, guarde y cierre el archivo.

Después de configurar el número de partición predeterminado, debe reconfigurar el almacenamiento de registros. Primero, elimine los archivos de registro existentes ejecutando:

rm -rf /home/kafka/kafka-logs/*

A continuación, cree un nuevo ID de clúster y guárdelo en una variable de entorno:

KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"

Mostrarlo en la terminal:

echo $KAFKA_CLUSTER_ID

El ID de salida será:

OutputMjj4bch9Q3-B0TEXv8_zPg

Tome nota de este valor; lo necesitará para configurar el segundo y tercer nodo.

Por último, ejecute el siguiente comando para crear el repositorio de informes:

./bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c config/kraft/server.properties

La salida será similar a esto:

Output...
Formatting /home/kafka/kafka-logs with metadata.version 3.7-IV4.
Configuración del segundo y tercer nodo

La configuración de los demás nodos es muy similar a la del primer nodo. Tenga en cuenta que también debe actualizar el node.id:

...
node.id=node_number
...

Los valores apropiados para el segundo y tercer nodo son 2 y 3 respectivamente, estableciendo las direcciones apropiadas para los oyentes y los anunciados.listeners.

Al reconstruir el almacenamiento de registros, reutilice el ID del clúster del primer nodo:

KAFKA_CLUSTER_ID="your_cluster_id"

Cuando haya terminado, inicie el servicio Kafka en los tres nodos ejecutando:

sudo systemctl start kafka

En este punto, ha configurado tres nodos de Kafka para que formen parte de un clúster de KRaft. Crea un tema y genera y consume mensajes en su clúster.

Paso 2: Conexión al clúster

En este paso, se conectará al clúster de Kafka mediante los scripts de shell incluidos. También creará un tema e intentará generar y consumir datos del clúster. Después, desactivará uno de los nodos y observará cómo Kafka reduce la pérdida.

Kafka proporciona un script llamado kafka-metadata-quorum.sh que muestra información sobre el clúster y sus miembros. Ejecute el siguiente comando para ejecutarlo:

./bin/kafka-metadata-quorum.sh --bootstrap-controller kafka1.your_domain:9093 describe --status

Se conectará a uno de los nodos en el puerto 9093, que es el punto final del controlador (pero no del bróker). Recuerde reemplazar kafka1.your_domain por el dominio que apunta a uno de sus nodos de Kafka.

La salida será similar a esto:

OutputClusterId: G3TeIZoeTSCvG2YOWvPE2w
LeaderId: 3
LeaderEpoch: 2
HighWatermark: 383
MaxFollowerLag: 0
MaxFollowerLagTimeMs: 55
CurrentVoters: [1,2,3]
CurrentObservers: []

El script muestra la información inicial sobre el estado del clúster. En la salida mostrada, se puede ver que el nodo 3 es elegido líder y que los tres nodos ([1,2,3]) están en el conjunto de votación y acuerdan esa decisión.

Cree un tema llamado Primer tema ejecutando lo siguiente:

./bin/kafka-topics.sh --create --topic first-topic --bootstrap-server kafka1.your_domain:9092 --replication-factor 2

La salida será:

Created topic first-topic.

Luego ejecute el script kafka-topics.sh para ver cómo están organizadas las particiones en los nodos:

./bin/kafka-topics.sh --describe --bootstrap-server kafka1.your_domain:9092 --topic first-topic

Establecer el factor de repetición en 2 garantiza que el tema estará disponible en al menos dos nodos.

La salida será similar a esto:

OutputTopic: first-topic TopicId: 4kVImoFNTQeyk3r2zQbdvw PartitionCount: 6 ReplicationFactor: 2 Configs: segment.bytes=1073741824
Topic: first-topic Partition: 0 Leader: 3 Replicas: 3,1 Isr: 3,1
Topic: first-topic Partition: 1 Leader: 1 Replicas: 1,2 Isr: 1,2
Topic: first-topic Partition: 2 Leader: 2 Replicas: 2,3 Isr: 2,3
Topic: first-topic Partition: 3 Leader: 1 Replicas: 1,3 Isr: 1,3
Topic: first-topic Partition: 4 Leader: 3 Replicas: 3,2 Isr: 3,2
Topic: first-topic Partition: 5 Leader: 2 Replicas: 2,1 Isr: 2,1

Puede ver que cada partición tiene un líder, dos réplicas y dos conjuntos de réplicas sincronizadas (ISR). El líder de la partición es un nodo servidor que sirve los datos de la partición a los clientes, mientras que las réplicas solo almacenan copias. Un nodo de réplica se considera ISR si ha estado sincronizado con el líder durante los últimos diez segundos de forma predeterminada. Este intervalo se puede configurar por tema.

Ahora que ha creado un tema, generará sus mensajes con el script kafka-console-producer.sh. Ejecute el siguiente comando para iniciar el productor:

./bin/kafka-console-producer.sh --topic first-topic --bootstrap-server kafka1.your_domain:9092

Verás una notificación en blanco:

>

El proveedor espera tu SMS. Accede a la prueba y pulsa ENTER. La notificación se verá así:

>Hello World!
>

El productor está esperando el siguiente mensaje, lo que significa que el mensaje anterior se envió correctamente a Kafka. Puede introducir cualquier número de mensajes para probar. Para salir del productor, pulse Ctrl+C.

Necesita un consumidor para releer los mensajes del tema. Kafka ofrece un consumidor simple llamado kafka-console-consumer.sh. Ejecútelo con:

./bin/kafka-console-consumer.sh --topic first-topic --from-beginning --bootstrap-server kafka1.your_domain:9092

Verás mensajes que se leen desde el tema:

OutputHello World!
...

Simulación de fallo de nodo

En el tercer nodo de Kafka, detenga el servicio ejecutando lo siguiente:

sudo systemctl stop kafka

Luego, explica el tema con la práctica:

./bin/kafka-topics.sh --describe --bootstrap-server kafka1.your_domain:9092 --topic first-topic

La salida será similar a esto:

OutputTopic: first-topic TopicId: 4kVImoFNTQeyk3r2zQbdvw PartitionCount: 6 ReplicationFactor: 2 Configs: segment.bytes=1073741824
Topic: first-topic Partition: 0 Leader: 1 Replicas: 3,1 Isr: 1
Topic: first-topic Partition: 1 Leader: 1 Replicas: 1,2 Isr: 1,2
Topic: first-topic Partition: 2 Leader: 2 Replicas: 2,3 Isr: 2
Topic: first-topic Partition: 3 Leader: 1 Replicas: 1,3 Isr: 1
Topic: first-topic Partition: 4 Leader: 2 Replicas: 3,2 Isr: 2
Topic: first-topic Partition: 5 Leader: 2 Replicas: 2,1 Isr: 2,1

Aunque el nodo 3 aparece como réplica, no figura en los conjuntos de ISR porque no está disponible. Al reincorporarse al clúster, se sincroniza con los demás nodos e intenta recuperar su posición anterior.

Intente leer nuevamente los mensajes del primer tema:

./bin/kafka-console-consumer.sh --topic first-topic --from-beginning --bootstrap-server kafka1.your_domain:9092

Verás que están accesibles como siempre:

OutputHello World!
...

Gracias a la existencia de réplicas, los dos primeros nodos toman el control y sirven al consumidor. Ahora puede iniciar Kafka en el tercer servidor:

sudo systemctl start kafka

Hasta ahora, has visto cómo Kafka mitiga la indisponibilidad de un nodo en el clúster. Ahora aprenderás a eliminar un nodo del clúster.

Paso 3 – Transferir datos entre nodos

En este paso, aprenderá a mover temas entre nodos de un clúster de Kafka. Al añadir un nodo a un clúster existente con temas, Kafka no mueve automáticamente ninguna partición, algo que podría ser conveniente. Esto también es útil para eliminar nodos, ya que las particiones existentes no se mueven automáticamente a los nodos restantes.

Kafka proporciona un script llamado kafka-reassign-partitions.sh que puede generar, ejecutar y verificar planes de migración. Lo usará para crear un plan para migrar las particiones del primer tema a los dos primeros nodos.

Primero, debe especificar qué temas se deben mover. El script acepta un archivo JSON con las definiciones de los temas, así que créelo y ábralo para editarlo:

vi topics-to-move.json

Añade las siguientes líneas:

{
"topics": [
{
"topic": "first-topic"
}
],
"version": 1
}

En Títulos, define un objeto que haga referencia al primer tema. Al terminar, guarda y cierra el archivo.

Ejecute el siguiente comando para generar el plan de migración, reemplazando kafka1.your_domain con el dominio que apunta a uno de sus nodos de Kafka:

./bin/kafka-reassign-partitions.sh --bootstrap-server kafka1.your_domain:9092 --topics-to-move-json-file topics-to-move.json --broker-list "1,2" --generate

Pasa “1,2” a –broker-list que representa el ID de los corredores de destino.

La salida será similar a esto:

OutputCurrent partition replica assignment
{"version":1,"partitions":[{"topic":"first-topic","partition":0,"replicas":[3,1],"log_dirs":["any","any"]},{"topic":"first-topic","partition":1,"replicas":[1,2],"log_dirs":["any","any"]},{"topic":"first-topic","partition":2,"replicas":[2,3],"log_dirs":["any","any"]},{"topic":"first-topic","partition":3,"replicas":[1,3],"log_dirs":["any","any"]},{"topic":"first-topic","partition":4,"replicas":[3,2],"log_dirs":["any","any"]},{"topic":"first-topic","partition":5,"replicas":[2,1],"log_dirs":["any","any"]}]}
Proposed partition reassignment configuration
{"version":1,"partitions":[{"topic":"first-topic","partition":0,"replicas":[2,1],"log_dirs":["any","any"]},{"topic":"first-topic","partition":1,"replicas":[1,2],"log_dirs":["any","any"]},{"topic":"first-topic","partition":2,"replicas":[2,1],"log_dirs":["any","any"]},{"topic":"first-topic","partition":3,"replicas":[1,2],"log_dirs":["any","any"]},{"topic":"first-topic","partition":4,"replicas":[2,1],"log_dirs":["any","any"]},{"topic":"first-topic","partition":5,"replicas":[1,2],"log_dirs":["any","any"]}]}

Este script creó dos planes en total, que describen los diseños de partición actuales y propuestos, respectivamente. El primer plan se proporciona por si desea revertir los cambios posteriormente. Preste atención al segundo plan, que guardará en un archivo separado llamado migration-plan.json. Créelo y ábralo para editarlo:

vi migration-plan.json

Añade el segundo ejecutable:

{"version":1,"partitions":[{"topic":"first-topic","partition":0,"replicas":[2,1],"log_dirs":["any","any"]},{"topic":"first-topic","partition":1,"replicas":[1,2],"log_dirs":["any","any"]},{"topic":"first-topic","partition":2,"replicas":[2,1],"log_dirs":["any","any"]},{"topic":"first-topic","partition":3,"replicas":[1,2],"log_dirs":["any","any"]},{"topic":"first-topic","partition":4,"replicas":[2,1],"log_dirs":["any","any"]},{"topic":"first-topic","partition":5,"replicas":[1,2],"log_dirs":["any","any"]}]}

Guarde y cierre el archivo. Luego, ejecute el siguiente comando para ejecutarlo:

./bin/kafka-reassign-partitions.sh --bootstrap-server kafka1.your_domain:9092 --reassignment-json-file migration-plan.json --execute

La salida será:

OutputCurrent partition replica assignment
{"version":1,"partitions":[{"topic":"first-topic","partition":0,"replicas":[3,1],"log_dirs":["any","any"]},{"topic":"first-topic","partition":1,"replicas":[1,2],"log_dirs":["any","any"]},{"topic":"first-topic","partition":2,"replicas":[2,3],"log_dirs":["any","any"]},{"topic":"first-topic","partition":3,"replicas":[1,3],"log_dirs":["any","any"]},{"topic":"first-topic","partition":4,"replicas":[3,2],"log_dirs":["any","any"]},{"topic":"first-topic","partition":5,"replicas":[2,1],"log_dirs":["any","any"]}]}
Save this to use as the --reassignment-json-file option during rollback
Successfully started partition reassignments for first-topic-0,first-topic-1,first-topic-2,first-topic-3,first-topic-4,first-topic-5

El script indicó que la migración había comenzado. Para ver el progreso de la migración, introduzca --verify:

./bin/kafka-reassign-partitions.sh --bootstrap-server kafka1.your_domain:9092 --reassignment-json-file migration-plan.json --verify

Después de un tiempo, el resultado se verá así:

OutputStatus of partition reassignment:
Reassignment of partition first-topic-0 is completed.
Reassignment of partition first-topic-1 is completed.
Reassignment of partition first-topic-2 is completed.
Reassignment of partition first-topic-3 is completed.
Reassignment of partition first-topic-4 is completed.
Reassignment of partition first-topic-5 is completed.
Clearing broker-level throttles on brokers 1,2,3
Clearing topic-level throttles on topic first-topic

Ahora puedes explicar el primer problema para comprobar que no hay particiones en Broker 3:

./bin/kafka-topics.sh --describe --bootstrap-server kafka1.your_domain:9092 --topic first-topic

El resultado será el siguiente:

OutputTopic: first-topic TopicId: 4kVImoFNTQeyk3r2zQbdvw PartitionCount: 6 ReplicationFactor: 2 Configs: segment.bytes=1073741824
Topic: first-topic Partition: 0 Leader: 2 Replicas: 2,1 Isr: 1,2
Topic: first-topic Partition: 1 Leader: 1 Replicas: 1,2 Isr: 1,2
Topic: first-topic Partition: 2 Leader: 2 Replicas: 2,1 Isr: 2,1
Topic: first-topic Partition: 3 Leader: 1 Replicas: 1,2 Isr: 1,2
Topic: first-topic Partition: 4 Leader: 2 Replicas: 2,1 Isr: 2,1
Topic: first-topic Partition: 5 Leader: 1 Replicas: 1,2 Isr: 2,1

Solo los corredores 1 y 2 están presentes como réplicas e ISR, lo que significa que la migración fue exitosa.

En este punto, ha creado un plan de migración para mover el primer tema del agente 3 a los temas restantes y aprendió cómo verificar que la migración se realizó sin problemas.

Resultado

Ahora tiene un clúster de Kafka compuesto por tres nodos que se comunican mediante el protocolo KRaft. También ha aprendido a inspeccionar el clúster y la disposición de las particiones. Ha probado la redundancia del clúster desactivando un nodo y leyendo desde un tema. Finalmente, ha aprendido a cambiar de temas a nodos del clúster.

Deja una respuesta

Tu dirección de correo electrónico no será publicada. Los campos obligatorios están marcados con *

También te puede gustar