How to set up a Kafka producer for a data source via CLI

0 Shares
0
0
0
0

Introduction

Apache Kafka provides shell scripts for producing and consuming basic text messages to and from a Kafka cluster. While they are useful for exploration and experimentation, real-world applications access Kafka programmatically. To this end, Kafka provides many client libraries for popular programming languages and environments. In this tutorial, you create a Java application that produces data in a Kafka topic. You create a Java project using Apache Maven, a tool for building and packaging Java projects, and add the Kafka client library as a dependency. Then, you implement a class that uses the Kafka client by producing messages and retrieving in-cluster metadata about them.

Prerequisites
  • A device with at least 4 GB of RAM and 2 CPUs
  • Java Development Kit (JDK) 8 or higher installed on your Droplet or local machine
  • Apache Kafka installed and configured on your Droplet or local machine
  • Introduction to the standard directory layout of Java projects

Step 1 – Create a Maven Project

In this step, you will install Apache Maven and use it to create a project that you will use to communicate with Kafka. On Ubuntu, Maven is readily available in the official repositories.

First, list your available packages by running:

sudo apt update

To install it, run the following command:

sudo apt install maven

Verify that it is installed by reading its version number:

mvn --version

The output will be similar to the following, depending on your Java version and platform:

OutputApache Maven 3.6.3
Maven home: /usr/share/maven
Java version: 11.0.22, vendor: Ubuntu, runtime: /usr/lib/jvm/java-11-openjdk-amd64
Default locale: en, platform encoding: UTF-8
OS name: "linux", version: "5.15.0-100-generic", arch: "amd64", family: "unix"

Next, create a directory where you will store your Java projects for working with Kafka:

mkdir ~/kafka-projects

Go to the newly created directory:

cd ~/kafka-projects

Then generate an empty Java project by running the following:

mvn archetype:generate \
-DgroupId=com.dokafka \
-DartifactId=dokafka \
-DarchetypeArtifactId=maven-archetype-quickstart \
-DarchetypeVersion=1.4 \
-DinteractiveMode=false

Here, you instruct Maven to create a new project called dokafka With group ID com.dokafka The group ID uniquely identifies this project across the ecosystem. Maven This project is based on the archetype maven-archetype-quickstart will be created, which Maven will call the templates in this way.

There will be a lot of output, especially if this is the first time Maven is run. The end of the output will look like this:

Output...
INFO] Generating project in Batch mode
[INFO] ----------------------------------------------------------------------------
[INFO] Using following parameters for creating project from Archetype: maven-archetype-quickstart:1.4
[INFO] ----------------------------------------------------------------------------
[INFO] Parameter: groupId, Value: com.dokafka
[INFO] Parameter: artifactId, Value: dokafka
[INFO] Parameter: version, Value: 1.0-SNAPSHOT
[INFO] Parameter: package, Value: com.dokafka
[INFO] Parameter: packageInPathFormat, Value: com/dokafka
[INFO] Parameter: package, Value: com.dokafka
[INFO] Parameter: groupId, Value: com.dokafka
[INFO] Parameter: artifactId, Value: dokafka
[INFO] Parameter: version, Value: 1.0-SNAPSHOT
[INFO] Project created from Archetype in dir: /root/kafka-projects/dokafka
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 3.537 s
[INFO] Finished at: 2024-03-10T10:43:07Z
[INFO] ------------------------------------------------------------------------

Maven downloads the necessary Java packages from its central repository and builds the project. dokafka Using the pattern maven-archetype-quickstart has created.

Go to the project directory by running:

cd dokafka

The project structure is as follows:

├── pom.xml
└── src
├── main
│ └── java
│ └── com
│ └── dokafka
│ └── App.java
└── test
└── java
└── com
└── dokafka
└── AppTest.java

As part of the prerequisites, you learned about the standard Maven project structure, which you see here. Directory src/main/java Holds the project source code, src/test/java Contains experimental resources and pom.xml At the root of the project is the main Maven configuration file.

This project contains only one source file, App.java Show its contents to see what Maven produced:

cat src/main/java/com/dokafka/App.java

The output will be:

package com.dokafka;
/**
* Hello world!
*
*/
public class App
{
public static void main( String[] args )
{
System.out.println( "Hello World!" );
}
}

To run this code, you must first build the project by running:

mvn package

Maven will compile the code and package it into a JAR file to run. The end of the output will be like this, which means it is complete:

Output...
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 9.830 s
[INFO] Finished at: 2024-03-10T10:41:24Z
[INFO] ------------------------------------------------------------------------

Maven file JAR The obtained result was placed under the target list. To run the class App that you just created, run the following command and enter the full class ID:

java -cp target/dokafka-1.0-SNAPSHOT.jar com.dokafka.App

The output will be:

OutputHello World!

You have installed Maven and created an empty Java project. Next, you will add the necessary dependencies for Kafka.

Step 2 – Adding Maven Dependencies

Now you will add the Kafka Java client, as well as other dependencies for logging, to your project. You can also configure Maven to include these dependencies during packaging. First, you will add the Kafka client dependencies. Go to the Maven repository page for the Java client in your browser and select the latest available version, then copy the provided XML snippet for Maven. At the time of writing, the latest version of the Java client library is 3.7.0 It was.

Dependencies to pom.xml They will be added to the root of your project. Open it for editing:

nano pom.xml

Section <dependencies&gt; Find ; and add the dependency definition:

...
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.7.0</version>
</dependency>
...
</dependencies>

This will provide the Kafka client library to your project. However, the library itself requires two other dependencies that you need to add manually. They come from the SLF4J library that is used to log messages, as it supports many logging libraries and allows the developer to be flexible about how log messages are processed. The two dependencies that you need to add are:

  • slf4j-api which is the library itself
  • slf4j-simple, which processes logs and outputs to the terminal

Once you have defined the dependencies, you need to make them available alongside the final JAR that is built. Section &<build> pom.xml Find and add the highlighted lines:

...
<build>
<pluginManagement>
<plugins>
...
</plugins>
</pluginManagement>
<plugins>
<plugin>
<artifactId>maven-dependency-plugin</artifactId>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>copy-dependencies</goal>
</goals>
<configuration>
<outputDirectory>${project.build.directory}/lib</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
...

Here, the plugin maven-dependency-plugin You configure to copy all dependencies at package time. The dependency JAR files, in this project configuration, are under target/lib Note that you should not include the & part.<plugins> The existing one in <pluginManagement&gt; Change. When you're done, save and close the file.

Build the project to make sure everything is configured correctly:

mvn package

The end of the output should look like this:

Output...
[INFO] --- maven-jar-plugin:3.0.2:jar (default-jar) @ dokafka ---
[INFO] Building jar: /root/kafka-projects/dokafka/target/dokafka-1.0-SNAPSHOT.jar
[INFO]
[INFO] --- maven-dependency-plugin:2.8:copy-dependencies (default) @ dokafka ---
[INFO] Copying junit-4.11.jar to /root/kafka-projects/dokafka/target/lib/junit-4.11.jar
[INFO] Copying slf4j-simple-2.0.12.jar to /root/kafka-projects/dokafka/target/lib/slf4j-simple-2.0.12.jar
[INFO] Copying snappy-java-1.1.10.5.jar to /root/kafka-projects/dokafka/target/lib/snappy-java-1.1.10.5.jar
[INFO] Copying zstd-jni-1.5.5-6.jar to /root/kafka-projects/dokafka/target/lib/zstd-jni-1.5.5-6.jar
[INFO] Copying hamcrest-core-1.3.jar to /root/kafka-projects/dokafka/target/lib/hamcrest-core-1.3.jar
[INFO] Copying lz4-java-1.8.0.jar to /root/kafka-projects/dokafka/target/lib/lz4-java-1.8.0.jar
[INFO] Copying slf4j-api-2.0.12.jar to /root/kafka-projects/dokafka/target/lib/slf4j-api-2.0.12.jar
[INFO] Copying kafka-clients-3.7.0.jar to /root/kafka-projects/dokafka/target/lib/kafka-clients-3.7.0.jar
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 5.205 s
[INFO] Finished at: 2024-03-12T06:36:34Z
[INFO] ------------------------------------------------------------------------

You can download the files below. target/lib List to make sure the dependencies are actually copied:

Outputhamcrest-core-1.3.jar kafka-clients-3.7.0.jar slf4j-api-2.0.12.jar snappy-java-1.1.10.5.jar
junit-4.11.jar lz4-java-1.8.0.jar slf4j-simple-2.0.12.jar zstd-jni-1.5.5-6.jar

You have added the necessary dependencies to your Maven project. Now you want to connect to Kafka and produce messages on a scheduled basis.

Step 3 – Create a Kafka Producer in Java

In this step, you set up a Kafka producer in Java and write messages to a topic.

According to the project structure, the source code is below. src/main/java/com/dokafka is saved. Since it is not needed for the rest of the training App.java If you don't have it, remove it by running:

rm src/main/java/com/dokafka/App.java

You will store the producer code in a class called ProducerDemo. Create and open the accompanying file for editing:

nano src/main/java/com/dokafka/ProducerDemo.java

Add the following lines:

package com.dokafka;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ProducerDemo {
private static final Logger log = LoggerFactory.getLogger(ProducerDemo.class);
public static void main(String[] args) {
String bootstrapServers = "localhost:9092";
String topicName = "java_demo";
Properties properties = new Properties();
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
ProducerRecord<String, String> producerRecord =
new ProducerRecord<>(topicName, "Hello World!");
producer.send(producerRecord);
producer.flush();
producer.close();
}
}

First class ProducerDemo You define, import the classes used, and create a Logger In the main method, you first declare the address of the Kafka cluster (bootstrapServers) and the name of the topic for generating the message (topicName).

Then, an object Properties You instantiate a , which is similar to a key-value dictionary and holds the configuration for your Kafka producer function. You specify the property BOOTSTRAP_SERVERS_CONFIG You fill in the Kafka cluster address. Also, the entries KEY_SERIALIZER_CLASS_CONFIG and VALUE_SERIALIZER_CLASS_CONFIG on StringSerializer.class.getName() Set.

These properties specify which serializers should be used to process the keys and values of the generated messages. Serializers are classes that accept input and return an array of bytes as output, ready to be transmitted over the network. Deserializers They do the opposite and reconstruct the original object from the stream of bytes. Here, both the key and the value are used StringSerializer Internals are serialized as strings.

Next, you will have a KafkaProducer You declare and instantiate:

KafkaProducer<String, String> producer = new KafkaProducer<>(properties);

Constructor of keys and values of type String Accepts the following with the associated properties for configuration. To send a message to a topic, KafkaProducer It accepts a ProducerRecord that is named after the subject and the message itself. Hello World! You are sampling. Note that the creator himself is not tied to a specific topic.

After sending the message, you flash and close the producer. Call producer.send() It is asynchronous, meaning that control flow returns to the main method while the message is being sent on another thread. Since this sample program wants to exit after that, you force the producer to send whatever is left by flushing. Then, you close() it, signaling to Kafka that the producer is being destroyed.

Next, you will create a script that will handle the building and running of ProducerDemo. You will save it in a file called run-producer.sh. Create and open it for editing:

nano run-producer.sh

Add the following lines:

#!/bin/bash
mvn clean
mvn package
java -cp target/dokafka-1.0-SNAPSHOT.jar:target/lib/* com.dokafka.ProducerDemo

When you're done, save and close the file. The highlighted area indicates where the dependencies are located.

Then mark it as executable:

chmod +x run-producer.sh

Finally, try generating a Hello World! message by running it:

./run-producer.sh

The output will be long and should end up like this:

Output...
[main] INFO org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector - initializing Kafka metrics collector
[main] INFO org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-1] Instantiated an idempotent producer.
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 3.7.0
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: 2ae524ed625438c5
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 1710176327832
[kafka-producer-network-thread | producer-1] WARN org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Error while fetching metadata with correlation id 1 : {java_demo=UNKNOWN_TOPIC_OR_PARTITION}
[kafka-producer-network-thread | producer-1] INFO org.apache.kafka.clients.Metadata - [Producer clientId=producer-1] Cluster ID: Z-4Gf_p6T2ygtb6461nKRA
[kafka-producer-network-thread | producer-1] INFO org.apache.kafka.clients.producer.internals.TransactionManager - [Producer clientId=producer-1] ProducerId set to 12 with epoch 0
[main] INFO org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
[main] INFO org.apache.kafka.common.metrics.Metrics - Metrics scheduler closed
[main] INFO org.apache.kafka.common.metrics.Metrics - Closing reporter org.apache.kafka.common.metrics.JmxReporter
[main] INFO org.apache.kafka.common.metrics.Metrics - Closing reporter org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter
[main] INFO org.apache.kafka.common.metrics.Metrics - Metrics reporters closed
[main] INFO org.apache.kafka.common.utils.AppInfoParser - App info kafka.producer for producer-1 unregistered

KafkaProducer was registered which was successfully created and later failed to register. Now the message is written to the java_demo topic and you can retrieve it using the kafka-console-consumer.sh script.

In a separate shell, go to your Kafka installation directory and run the following command to read the topic:

bin/kafka-console-consumer.sh --topic java_demo --from-beginning --bootstrap-server localhost:9092

The output will be:

OutputHello World!

You can exit. CTRL+C Press .

At this stage, programmatically, send a message in the subject. java_demo You have generated and read it back using the bash script provided by Kafka. Now you will learn how to use the information that Kafka returns after a message is successfully sent.

Step 4 – Retrieve Metadata Using Callbacks

Method send() KafkaProducer It accepts callbacks, which allow you to act on events that occur, such as when a record is received. This is useful for retrieving information about how the cluster is handling the record.

To expand contact send() With a callback, first ProducerDemo Open for editing:

nano src/main/java/com/dokafka/ProducerDemo.java

Change the code to the following:

...
producer.send(producerRecord, new Callback() {
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e != null) {
log.error("An error occurred!", e);
return;
}
log.info(String.format("Timestamp: %s, partition: %s; offset: %s",
recordMetadata.timestamp(),
recordMetadata.partition(),
recordMetadata.offset()));
}
});
...

Now add an implementation of the Callback interface to the method. send() You transfer and onCompletion() You are implementing that RecordMetadata and optionally a Exception receives. Then, if an error occurs, log it. Otherwise, you log the timestamp, partition number, and offset of the record that is currently in the cluster. Since sending the message this way is asynchronous, your code is called when the cluster accepts the record, without you having to explicitly wait for it to happen.

When you're done, save and close the file, then run the builder:

./run-producer.sh

Notice a new message at the end of the output:

Output...
[kafka-producer-network-thread | producer-1] INFO com.dokafka.ProducerDemo - Timestamp: 1710181715303, partition: 0; offset: 2
...

The newly generated message was accepted by the cluster and stored in partition 0.

If you run it again, you will notice that the offset is one larger, indicating the location of the message in the partition:

Output[kafka-producer-network-thread | producer-1] INFO com.dokafka.ProducerDemo - Timestamp: 1710181831814, partition: 0; offset: 3

Result

In this article, you created a Java project using Maven and equipped it with dependencies to communicate with Kafka. Then, you created a class that generates messages to your Kafka cluster and extended it to retrieve the sent record information.

Leave a Reply

Your email address will not be published. Required fields are marked *

You May Also Like