Kafka APIs

Apache Kafka® provides five core Java APIs to enable cluster and client management. This article explores the different Kafka APIs and how they can be used to build and manage powerful streaming applications.

Use these APIs to implement your own components that consume and produce data to Kafka, manage Kafka brokers and topics, and perform stream processing.

Ready to get started?

Producer API

Producers publish (write) a stream of events to one or more Kafka topics. The Producer API enables developers to create their own producers that write data to Kafka topics. The API provides several options for configuring the behavior of the producer, such as setting the number of acknowledgments required before considering a message as sent, or setting compression options to reduce the size of messages.

To use the Producer API, add the following Maven dependency to your project:

<dependency>
   <groupId>org.apache.kafka</groupId>
   <artifactId>kafka-clients</artifactId>
   <version>3.4.0</version>
</dependency>

Following is an implementation of a simple Kafka producer. This example defines some of the producer configuration properties and sends a record asynchronously using the Send method.

Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "<HOST>:9092");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

 try (Producer<String, String> producer = new KafkaProducer<>(props)) {

   producer.send(new ProducerRecord<>("my-topic", "some-message"), (metadata, exception) -> {
             if (exception != null) {
                 logger.error("Trouble producing", exception);
             } else {
                 logger.debug("Produced record (%s) at offset %d to topic %s %n", message,
                  metadata.offset(), metadata.topic());
                    }
            });
   }

For more information, see Kafka Producer Design and the Producer API Javadoc .

Confluent Tip

For a tutorial on how to build a Kafka producer, see How to build your first Apache Kafka Producer application on the Confluent Developer site.

Consumer API

Consumers subscribe to (read) one or more topics and to process the stream of events produced to them. The Consumer API enables developers to create their own consumers to read data from Kafka topics. It provides several options for configuring the behavior of the consumer, such as setting the position to start consuming from, or setting the number of records to fetch at a time.

To use the Consumer API, add the following Maven dependency to your project.

<dependency>
   <groupId>org.apache.kafka</groupId>
   <artifactId>kafka-clients</artifactId>
   <version>3.4.0</version>
</dependency>

Following is a simple implementation of a Kafka consumer. This example defines some consumer configuration properties and subscribes to the topic my-topic.

Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "<HOST>:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group-id");
props.put("key.deserializer", StringSerializer.class);
props.put("value.deserializer", StringSerializer.class);

 try (Consumer<String, String> consumer = new KafkaConsumer<>(props)) {
   consumer.subscribe(Collections.singletonList("my-topic"));
           while (keepConsuming) {
             final ConsumerRecords<String, String> consumerRecords =
                                     consumer.poll(Duration.ofSeconds(1));
             for (ConsumerRecord<String, String> record : consumerRecords) {

         //process the record
             }
           }
 }

For more information, see Kafka Consumer Design and the Consumer API Javadocs.

Confluent Tip

For a tutorial on how to build a Kafka consumer, see How to build your first Apache Kafka Consumer application on the Confluent Developer site.

Admin Client API

The Admin Client API is a Kafka API that enables developers to manage and administer Kafka clusters programmatically. It provides a set of operations that can be used to create, delete, describe, and modify Kafka resources such as topics, brokers, and ACLs (Access Control Lists).

The Admin Client API can be used to automate common administrative tasks, such as creating and deleting topics, and to integrate Kafka administration into larger systems and workflows.

You can use the Admin API by adding the following Maven dependency to your project:

<dependency>
   <groupId>org.apache.kafka</groupId>
   <artifactId>kafka-clients</artifactId>
   <version>3.4.0</version>
</dependency>

The following code shows an example of how to use the AdminClient to create a topic:

try (final AdminClient client = AdminClient.create(config)) {

    final List<NewTopic> topics = new ArrayList<>();

        topics.add(new NewTopic("my-topic", 3, 3);
         client.createTopics(topics);

  }

To learn more, read the Admin API Javadoc.

Connect API

The Kafka Connect API enables you to build and run reusable data import/export connectors that consume (read) or produce (write) streams of events from and to external systems and applications that integrate with Kafka. For example, a connector to a relational database like PostgreSQL might capture every change to a set of tables.

Typically you do not need to implement your own connectors because the Kafka community provides hundreds of ready-to-use connectors.

Confluent Tip

Confluent offers several connectors for use with Kafka, whether on-premises or in the cloud.

To learn more about the Connect API, see Kafka Connect API Javadoc.

Kafka Streams API

Use the Kafka Streams API to implement applications and microservices that perform stream processing operations on data in Kafka. Input is read from one or more topics to generate output to one or more topics, transforming the input streams to output streams.

The Kafka Streams API provides higher-level functions with the Kafka Streams Domain Specific Language (DSL) and lower level processing with the Processor API.

You can use Kafka Streams by adding the following Maven dependencies to your project. Note that the kafka-streams-scala dependency is only required if you are writing Kafka Streams applications with Scala.

<dependency>
   <groupId>org.apache.kafka</groupId>
   <artifactId>kafka-streams</artifactId>
   <version>3.4.0</version>
</dependency>
<dependency>
   <groupId>org.apache.kafka</groupId>
   <artifactId>kafka-clients</artifactId>
   <version>3.4.0</version>
</dependency>
<!-- Optionally include Kafka Streams DSL for Scala for Scala 2.13 -->
<dependency>
   <groupId>org.apache.kafka</groupId>
   <artifactId>kafka-streams-scala_2.13</artifactId>
   <version>3.4.0</version>
</dependency>

Kafka Streams DSL

A high-level API that provides the most common data transformation operations such as map, filter, join, and aggregations out of the box. The DSL is recommended if you are new to developing with Kafka Streams, and it should cover most use cases and stream processing requirements. If you are using Scala, you can use the Kafka Streams DSL for Scala library, which removes much of the Java/Scala interoperability boilerplate compared with the Java DSL.

To learn more, see the Kafka Streams DSL Developer Guide.

Confluent Tip

For a tutorial on how to build a Kafka streams application using the DSL API, see How to build your first Apache Kafka Streams application.

Processor API

A low-level API that enables you to add and connect processors as well as interact directly with state stores. The Processor API provides you more flexibility than the DSL API but requires more code.

To learn more, see the Processor API Developer Guide.