Kafka APIs

Kafka provides five core Java APIs:

Use these APIs to implement your own components that consume and produce data to Kafka, manage Kafka brokers and topics, and perform stream processing.

Producer API

Producers publish (write) a stream of events to one or more Kafka topics. Use the Producer API to create your own Kafka producers.

To use the Producer API, add the following Maven dependency to your project:

<dependency>
   <groupId>org.apache.kafka</groupId>
   <artifactId>kafka-clients</artifactId>
   <version>3.4.0</version>
</dependency>

Following is an implementation of a simple Kafka producer. This example defines some of the producer configuration properties and sends a record asynchronously using the Send method.

Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "<HOST>:9092");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

 try (Producer<String, String> producer = new KafkaProducer<>(props)) {

   producer.send(new ProducerRecord<>("my-topic", "some-message"), (metadata, exception) -> {
             if (exception != null) {
                 logger.error("Trouble producing", exception);
             } else {
                 logger.debug("Produced record (%s) at offset %d to topic %s %n", message,
                  metadata.offset(), metadata.topic());
                    }
            });
   }

For more information, see the Producer API Javadoc .

Confluent Tip

For a tutorial on how to build a Kafka producer, see How to build your first Apache Kafka Producer application on the Confluent Developer site.

Consumer API

Consumers subscribe to (read) one or more topics and to process the stream of events produced to them.

Use the Consumer API to create your own Kafka consumers.

To use the Consumer API, add the following Maven dependency to your project.

<dependency>
   <groupId>org.apache.kafka</groupId>
   <artifactId>kafka-clients</artifactId>
   <version>3.4.0</version>
</dependency>

Following is a simple implementation of a Kafka consumer. This example defines some consumer configuration properties and subscribes to the topic my-topic.

Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "<HOST>:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group-id");
props.put("key.deserializer", StringSerializer.class);
props.put("value.deserializer", StringSerializer.class);

 try (Consumer<String, String> consumer = new KafkaConsumer<>(props)) {
   consumer.subscribe(Collections.singletonList("my-topic"));
           while (keepConsuming) {
             final ConsumerRecords<String, String> consumerRecords =
                                     consumer.poll(Duration.ofSeconds(1));
             for (ConsumerRecord<String, String> record : consumerRecords) {

         //process the record
             }
           }
 }

For more information, see the Consumer API Javadocs.

Confluent Tip

For a tutorial on how to build a Kafka consumer, see How to build your first Apache Kafka Consumer application on the Confluent Developer site.

Admin Client API

The Admin Client APIs enable you to create your own clients to manage and inspect topics, brokers, and other Kafka objects.

You can use the Admin API by adding the following Maven dependency to your project:

<dependency>
   <groupId>org.apache.kafka</groupId>
   <artifactId>kafka-clients</artifactId>
   <version>3.4.0</version>
</dependency>

The following code shows an example of how to use the AdminClient to create a topic:

try (final AdminClient client = AdminClient.create(config)) {

    final List<NewTopic> topics = new ArrayList<>();

        topics.add(new NewTopic("my-topic", 3, 3);
         client.createTopics(topics);

  }

To learn more, read the Admin API Javadoc.

Connect API

The Kafka Connect API enables you to build and run reusable data import/export connectors that consume (read) or produce (write) streams of events from and to external systems and applications that integrate with Kafka. For example, a connector to a relational database like PostgreSQL might capture every change to a set of tables.

Typically you do not need to implement your own connectors because the Kafka community provides hundreds of ready-to-use connectors.

Confluent Tip

Confluent offers several connectors for use with Kafka, whether on-premises or in the cloud.

To learn more about the Connect API, see Kafka Connect API Javadoc.

Kafka Streams API

Use the Kafka Streams API to implement stream processing applications and microservices. Input is read from one or more topics in order to generate output to one or more topics, transforming the input streams to output streams.

The Kafka Streams API provides higher-level functions with the Kafka Streams Domain Specific Language (DSL) and lower level processing with the Processor API.

You can use Kafka Streams by adding the following Maven dependencies to your project. Note that the kafka-streams-scala dependency is only required if you are writing Kafka Streams applications with Scala.

<dependency>
   <groupId>org.apache.kafka</groupId>
   <artifactId>kafka-streams</artifactId>
   <version>3.4.0</version>
</dependency>
<dependency>
   <groupId>org.apache.kafka</groupId>
   <artifactId>kafka-clients</artifactId>
   <version>3.4.0</version>
</dependency>
<!-- Optionally include Kafka Streams DSL for Scala for Scala 2.13 -->
<dependency>
   <groupId>org.apache.kafka</groupId>
   <artifactId>kafka-streams-scala_2.13</artifactId>
   <version>3.4.0</version>
</dependency>

Kafka Streams DSL

A high-level API that provides the most common data transformation operations such as map, filter, join, and aggregations out of the box. The DSL is recommended if you are new to developing with Kafka Streams, and it should cover most use cases and stream processing requirements. If you are using Scala, you can use the Kafka Streams DSL for Scala library, which removes much of the Java/Scala interoperability boilerplate compared with the Java DSL.

To learn more, see the Kafka Streams DSL Developer Guide.

Confluent Tip

For a tutorial on how to build a Kafka streams application using the DSL API, see How to build your first Apache Kafka Streams application.

Processor API

A low-level API that enables you to add and connect processors as well as interact directly with state stores. The Processor API provides you more flexibility than the DSL API but requires more code.

To learn more, see the Processor API Developer Guide.