How to set up a multi-node Kafka cluster using KCraft

0 Shares
0
0
0
0

Introduction

Apache Kafka is an open source distributed event and stream processing platform written in Java and built for processing real-time data feeds. It is inherently scalable, with high throughput and availability. It is designed to support hundreds of nodes per cluster.

In this tutorial, you will create a Kafka cluster that uses the KRaft consensus protocol. You will learn how to configure nodes to be part of a cluster and see how topic partitions are assigned to different nodes. You will also learn how to assign topics to specific brokers in the cluster.

Prerequisites
  • Three drops with at least 4 GB of RAM and 2 CPUs
  • A fully registered domain name with three subdomains pointing to the three drops.
  • Apache Kafka is installed and configured on your Droplets.

Step 1 – Configure Kafka Nodes

In this step, you will configure the three Kafka servers you created as part of the prerequisites to be part of the same KRaft cluster. With KRaft, the nodes themselves can organize and perform administrative tasks without the additional overhead and dependency on Apache ZooKeeper.

First node configuration

You will start by configuring the first node. First, stop the service on the first Droplet by running the following:

sudo systemctl stop kafka

As the Kafka user, navigate to the directory where Kafka lives and open its configuration file for editing by running the following:

vi /config/kraft/server.properties

Find the following lines:

... # The role of this server. Setting this puts us in KRaft mode process.roles=broker,controller # The node id associated with this instance's roles node.id=1 # The connect string for the controller quorum controller.quorum.voters=1@localhost:9092 ...

These three parameters configure the Kafka node to act as both a broker and a controller, meaning it receives and consumes data (broker) and performs administrative tasks (controller). This separation is useful in large deployments where controllers can be kept separate for increased efficiency and redundancy.

node.id specifies the node ID in the cluster. This is the first node, so it should remain at 1. All nodes must have a unique node ID, so the second and third nodes will have IDs of 2 and 3, respectively.

controller.quorum.voters maps the node IDs to their corresponding addresses and ports for communication. This is where you specify the addresses of all the cluster nodes so that each node is aware of the other nodes. Change the line to look like this:

... [email protected]_domain:9093,[email protected]_domain:9093,[email protected]_domain:9093 ...

Here, you list all three nodes in the cluster with their respective IDs. Remember to replace your_domain with your domain address that you set during the prerequisites.

Next, find the following lines in the file:

... listeners=PLAINTEXT://:9092,CONTROLLER://:9093 # Name of listener used for communication between brokers. inter.broker.listener.name=PLAINTEXT # Listener name, hostname and port the broker will advertise to clients. # If not set, it uses the value for "listeners". advertised.listeners=PLAINTEXT://localhost:9092 ...

listeners defines the addresses that the Kafka node listens to, while advertised.listeners specifies the addresses that are sent to clients to connect to the node. This allows you to specify a subset of the actual addresses that clients should use.

Change the lines to look like this, replacing your_domain with your actual domain name:

... listeners=PLAINTEXT://kafka1.your_domain:9092,CONTROLLER://kafka1.your_domain:9093 # Name of listener used for communication between brokers. inter.broker.listener.name=PLAINTEXT # Listener name, hostname and port the broker will advertise to clients. # If not set, it uses the value for "listeners". advertised.listeners=PLAINTEXT://kafka1.your_domain:9092 ...

Since this node will be in a cluster, you have explicitly pointed the addresses to the droplet it will run on.

Then, find the num.partitions setting:

... # The default number of log partitions per topic. More partitions allow greater # parallelism for consumption, but this will also result in more files across # the brokers. num.partitions=1 ...

As the comment states, this is the default number of partitions configured for each new thread. Since you have three nodes, set it to a multiple of two:

... num.partitions=6 ...

The value of 6 here ensures that each node maintains two topic partitions by default.

Next, you configure the iteration factor for internal threads, which maintain consumer offsets and transaction status. Find the following lines:

... offsets.topic.replication.factor=1 transaction.state.log.replication.factor=1 ...

Set them to the following values:

... offsets.topic.replication.factor=2 transaction.state.log.replication.factor=2 ...

Here, you specify that at least two nodes must be in sync regarding internal metadata. When you are done, save and close the file.

After setting the default partition number, you need to reconfigure the log storage. First, delete the existing log files by running:

rm -rf /home/kafka/kafka-logs/*

Next, create a new cluster ID and store it in an environment variable:

KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)""

Show it in the terminal:

echo $KAFKA_CLUSTER_ID

The output ID will be:

OutputMjj4bch9Q3-B0TEXv8_zPg

Take note of this value; you will need it to configure the second and third nodes.

Finally, run the following command to create the report repository:

./bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c config/kraft/server.properties

The output will be similar to this:

Output... Formatting /home/kafka/kafka-logs with metadata.version 3.7-IV4.
Configuring the second and third nodes

Configuring the other nodes is very similar to what you did for the first node. Note that you also update the node.id:

... node.id=node_number ...

The appropriate values for the second and third nodes are 2 and 3 respectively, setting the appropriate addresses for listeners and advertised.listeners.

When rebuilding the log storage, reuse the cluster ID from the first node:

KAFKA_CLUSTER_ID="your_cluster_id""

When you are done, start the Kafka service on all three nodes by running:

sudo systemctl start kafka

At this point, you have configured three Kafka nodes to be part of a KRaft cluster. You create a topic and produce and consume messages in your cluster.

Step 2 – Connecting to the Cluster

In this step, you will connect to the Kafka cluster using the shell scripts that come with Kafka. You will also create a topic and try to produce and consume data from the cluster. Then, you will take down one of the nodes and see how Kafka reduces the loss.

Kafka provides a script called kafka-metadata-quorum.sh that displays information about the cluster and its members. Run the following command to run it:

./bin/kafka-metadata-quorum.sh --bootstrap-controller kafka1.your_domain:9093 describe --status

You will connect to one of the nodes on port 9093, which is the endpoint for the controller (but not for the broker). Remember to replace kafka1.your_domain with the domain that points to one of your Kafka nodes.

The output will be similar to this:

OutputClusterId: G3TeIZoeTSCvG2YOWvPE2w LeaderId: 3 LeaderEpoch: 2 HighWatermark: 383 MaxFollowerLag: 0 MaxFollowerLagTimeMs: 55 CurrentVoters: [1,2,3] CurrentObservers: []

The script lists the initial information about the cluster state. In the output shown, you can see that node 3 is elected as the leader and all three nodes ([1,2,3]) are in the voting set and agree on that decision.

Create a topic called First Topic by running the following:

./bin/kafka-topics.sh --create --topic first-topic --bootstrap-server kafka1.your_domain:9092 --replication-factor 2

The output will be:

Created topic first-topic.

Then run the kafka-topics.sh script to see how the partitions are arranged on the nodes:

./bin/kafka-topics.sh --describe --bootstrap-server kafka1.your_domain:9092 --topic first-topic

Setting the repetition factor to 2 ensures that the topic will be available on at least two nodes.

The output will be similar to this:

OutputTopic: first-topic TopicId: 4kVImoFNTQeyk3r2zQbdvw PartitionCount: 6 ReplicationFactor: 2 Configs: segment.bytes=1073741824 Topic: first-topic Partition: 0 Leader: 3 Replicas: 3,1 Isr: 3,1 Topic: first-topic Partition: 1 Leader: 1 Replicas: 1,2 Isr: 1,2 Topic: first-topic Partition: 2 Leader: 2 Replicas: 2,3 Isr: 2,3 Topic: first-topic Partition: 3 Leader: 1 Replicas: 1,3 Isr: 1,3 Topic: first-topic Partition: 4 Leader: 3 Replicas: 3,2 Isr: 3,2 Topic: first-topic Partition: 5 Leader: 2 Replicas: 2,1 Isr: 2,1

You can see that each partition has a leader, two replicas, and two in-sync replica sets (ISRs). The partition leader is a server node that serves the partition's data to clients, while the replicas only hold copies. A replica node is considered ISR if it has been in sync with the leader for the past ten seconds by default. This interval is configurable on a per-topic basis.

Now that you have created a topic, you will produce its messages using the kafka-console-producer.sh script. Run the following command to start the producer:

./bin/kafka-console-producer.sh --topic first-topic --bootstrap-server kafka1.your_domain:9092

You will see a blank notification:

>

The provider is waiting for your SMS. Enter the test and press ENTER. The notification will look like this:

>Hello World! >

The producer is now waiting for the next message, meaning the previous message has been successfully delivered to Kafka. You can enter any number of messages for testing. To exit the producer, press CTRL+C.

You need a consumer to reread the topic messages. Kafka provides a simple consumer as kafka-console-consumer.sh. Run it by running:

./bin/kafka-console-consumer.sh --topic first-topic --from-beginning --bootstrap-server kafka1.your_domain:9092

You will see messages being read from the topic:

OutputHello World! ...

Node failure simulation

On the third Kafka node, stop the service by running the following:

sudo systemctl stop kafka

Then, explain the topic with practice:

./bin/kafka-topics.sh --describe --bootstrap-server kafka1.your_domain:9092 --topic first-topic

The output will be similar to this:

OutputTopic: first-topic TopicId: 4kVImoFNTQeyk3r2zQbdvw PartitionCount: 6 ReplicationFactor: 2 Configs: segment.bytes=1073741824 Topic: first-topic Partition: 0 Leader: 1 Replicas: 3,1 Isr: 1 Topic: first-topic Partition: 1 Leader: 1 Replicas: 1,2 Isr: 1,2 Topic: first-topic Partition: 2 Leader: 2 Replicas: 2,3 Isr: 2 Topic: first-topic Partition: 3 Leader: 1 Replicas: 1,3 Isr: 1 Topic: first-topic Partition: 4 Leader: 2 Replicas: 3,2 Isr: 2 Topic: first-topic Partition: 5 Leader: 2 Replicas: 2,1 Isr: 2,1

Although node 3 is listed as a replica, it is not in the ISR sets because it is unavailable. When it rejoins the cluster, it synchronizes with the other nodes and attempts to regain its previous position.

Try reading the messages in the first topic again:

./bin/kafka-console-consumer.sh --topic first-topic --from-beginning --bootstrap-server kafka1.your_domain:9092

You will see that they are accessible as usual:

OutputHello World! ...

Thanks to the existence of replicas, the first two nodes take control and serve the consumer. Now you can start Kafka on the third server:

sudo systemctl start kafka

At this point, you have seen how Kafka mitigates the unavailability of a node in the cluster. Now you will learn how to remove a node from the cluster.

Step 3 – Transfer data between nodes

In this step, you will learn how to move topics between nodes in a Kafka cluster. When adding a node to an existing cluster with topics, Kafka does not automatically move any partitions to it, which you might want to do. This is also useful for removing nodes, as existing partitions are not automatically moved to the remaining nodes.

Kafka provides a script called kafka-reassign-partitions.sh that can generate, execute, and verify migration plans. You will use it to create a plan to migrate the partitions of the first topic to the first two nodes.

First you need to specify which topics should be moved. The script accepts a JSON file with the topic definitions, so create and open it for editing:

vi topics-to-move.json

Add the following lines:

{ "topics": [ { "topic": "first-topic" } ], "version": 1 }

Under Titles, you define an object that references the first topic. When you're done, save and close the file.

Run the following command to generate the migration plan, replacing kafka1.your_domain with the domain that points to one of your Kafka nodes:

./bin/kafka-reassign-partitions.sh --bootstrap-server kafka1.your_domain:9092 --topics-to-move-json-file topics-to-move.json --broker-list "1,2" --generate

You pass “1,2” to –broker-list which represents the ID of the target brokers.

The output will be similar to this:

Output Current partition replica assignment {"version":1,"partitions":[{"topic":"first-topic","partition":0,"replicas":[3,1],"log_dirs":["any","any"]},{"topic":"first-topic","partition":1,"replicas":[1,2],"log_dirs":["any","any"]},{"topic":"first-topic","partition":2,"replicas":[2,3],"log_dirs":[" any","any"]},{"topic":"first-topic","partition":3,"replicas":[1,3],"log_dirs":["any","any"]},{"topic":"first-topic","partition" :4,"replicas":[3,2],"log_dirs":["any","any"]},{"topic":"first-topic","partition":5,"replicas":[2,1],"log_dirs":["any","any"]}]} Proposed partition reassignment configuration {"version":1,"partitions":[{"topic":"first-topic","partition":0,"replicas":[2,1],"log_dirs":["any","any"]},{"topic":"first-topic","partition":1,"replicas":[1,2],"log_dirs":["any","any"]},{"topic":"first-topic","partition":2,"replicas":[2,1],"log_dirs":[" any","any"]},{"topic":"first-topic","partition":3,"replicas":[1,2],"log_dirs":["any","any"]},{"topic":"first-topic","partition" :4,"replicas":[2,1],"log_dirs":["any","any"]},{"topic":"first-topic","partition":5,"replicas":[1,2],"log_dirs":["any","any"]}]}

This script created two plans in total, describing the current and proposed partition layouts, respectively. The first plan is provided if you want to revert the changes later. Pay attention to the second plan, which you will save in a separate file called migration-plan.json. Create and open it for editing:

vi migration-plan.json

Add the second executable:

{"version":1,"partitions":[{"topic":"first-topic","partition":0,"replicas":[2,1],"log_dirs":["any","any"]},{"topic":"first-topic","partition":1,"replicas":[1,2],"log_dirs":["any","any"]},{"topic":"first-topic","partition":2,"replicas":[2,1],"log_dirs":[" any","any"]},{"topic":"first-topic","partition":3,"replicas":[1,2],"log_dirs":["any","any"]},{"topic":"first-topic","partition" :4,"replicas":[2,1],"log_dirs":["any","any"]},{"topic":"first-topic","partition":5,"replicas":[1,2],"log_dirs":["any","any"]}]}

Save and close the file. Then run the following command to run it:

./bin/kafka-reassign-partitions.sh --bootstrap-server kafka1.your_domain:9092 --reassignment-json-file migration-plan.json --execute

The output will be:

Output Current partition replica assignment {"version":1,"partitions":[{"topic":"first-topic","partition":0,"replicas":[3,1],"log_dirs":["any","any"]},{"topic":"first-topic","partition":1,"replicas":[1,2],"log_dirs":["any","any"]},{"topic":"first-topic","partition":2,"replicas":[2,3],"log_dirs":[" any","any"]},{"topic":"first-topic","partition":3,"replicas":[1,3],"log_dirs":["any","any"]},{"topic":"first-topic","partition" :4,"replicas":[3,2],"log_dirs":["any","any"]},{"topic":"first-topic","partition":5,"replicas":[2,1],"log_dirs":["any","any"]}]} Save this to use as the --reassignment-json-file option during rollback Successfully started partition reassignments for first-topic-0,first-topic-1,first-topic-2,first-topic-3,first-topic-4,first-topic-5

The script indicated that the migration had started. To see the progress of the migration, enter --verify instead:

./bin/kafka-reassign-partitions.sh --bootstrap-server kafka1.your_domain:9092 --reassignment-json-file migration-plan.json --verify

After a while, the output will look like this:

Output Status of partition reassignment: Reassignment of partition first-topic-0 is completed. Reassignment of partition first-topic-1 is completed. Reassignment of partition first-topic-2 is completed. Reassignment of partition first-topic-3 is completed. Reassignment of partition first-topic-4 is completed. Reassignment of partition first-topic-5 is completed. Clearing broker-level throttles on brokers 1,2,3 Clearing topic-level throttles on topic first-topic

Now you can explain the first issue to check that there are no partitions on Broker 3:

./bin/kafka-topics.sh --describe --bootstrap-server kafka1.your_domain:9092 --topic first-topic

The output will be as follows:

OutputTopic: first-topic TopicId: 4kVImoFNTQeyk3r2zQbdvw PartitionCount: 6 ReplicationFactor: 2 Configs: segment.bytes=1073741824 Topic: first-topic Partition: 0 Leader: 2 Replicas: 2,1 Isr: 1,2 Topic: first-topic Partition: 1 Leader: 1 Replicas: 1,2 Isr: 1,2 Topic: first-topic Partition: 2 Leader: 2 Replicas: 2,1 Isr: 2,1 Topic: first-topic Partition: 3 Leader: 1 Replicas: 1,2 Isr: 1,2 Topic: first-topic Partition: 4 Leader: 2 Replicas: 2,1 Isr: 2,1 Topic: first-topic Partition: 5 Leader: 1 Replicas: 1,2 Isr: 2,1

Only brokers 1 and 2 are present as replicas and ISRs, meaning the migration was successful.

At this point, you have created a migration plan to move the first topic from broker 3 to the remaining topics and learned how to verify that the migration went smoothly.

Result

You now have a Kafka cluster consisting of three nodes communicating using the KRaft protocol. You have also learned how to inspect the cluster and the partition layout. You have tested the cluster redundancy by bringing down a node and reading from a topic. Finally, you have learned how to switch topics to cluster nodes.

[Total: 1   Average: 5/5]
Leave a Reply

Your email address will not be published. Required fields are marked *

You May Also Like