Kafka APIs¶
Kafka provides five core Java APIs:
Use these APIs to implement your own components that consume and produce data to Kafka, manage Kafka brokers and topics, and perform stream processing.
Producer API¶
Producers publish (write) a stream of events to one or more Kafka topics. Use the Producer API to create your own Kafka producers.
To use the Producer API, add the following Maven dependency to your project:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.4.0</version>
</dependency>
Following is an implementation of a simple Kafka producer. This example defines some of the producer configuration properties and sends a
record asynchronously using the Send
method.
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "<HOST>:9092");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
try (Producer<String, String> producer = new KafkaProducer<>(props)) {
producer.send(new ProducerRecord<>("my-topic", "some-message"), (metadata, exception) -> {
if (exception != null) {
logger.error("Trouble producing", exception);
} else {
logger.debug("Produced record (%s) at offset %d to topic %s %n", message,
metadata.offset(), metadata.topic());
}
});
}
For more information, see the Producer API Javadoc .
Confluent Tip
For a tutorial on how to build a Kafka producer, see How to build your first Apache Kafka Producer application on the Confluent Developer site.
Consumer API¶
Consumers subscribe to (read) one or more topics and to process the stream of events produced to them.
Use the Consumer API to create your own Kafka consumers.
To use the Consumer API, add the following Maven dependency to your project.
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.4.0</version>
</dependency>
Following is a simple implementation of a Kafka consumer. This example defines some consumer configuration properties and subscribes to the topic my-topic
.
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "<HOST>:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group-id");
props.put("key.deserializer", StringSerializer.class);
props.put("value.deserializer", StringSerializer.class);
try (Consumer<String, String> consumer = new KafkaConsumer<>(props)) {
consumer.subscribe(Collections.singletonList("my-topic"));
while (keepConsuming) {
final ConsumerRecords<String, String> consumerRecords =
consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> record : consumerRecords) {
//process the record
}
}
}
For more information, see the Consumer API Javadocs.
Confluent Tip
For a tutorial on how to build a Kafka consumer, see How to build your first Apache Kafka Consumer application on the Confluent Developer site.
Admin Client API¶
The Admin Client APIs enable you to create your own clients to manage and inspect topics, brokers, and other Kafka objects.
You can use the Admin API by adding the following Maven dependency to your project:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.4.0</version>
</dependency>
The following code shows an example of how to use the AdminClient
to create a topic:
try (final AdminClient client = AdminClient.create(config)) {
final List<NewTopic> topics = new ArrayList<>();
topics.add(new NewTopic("my-topic", 3, 3);
client.createTopics(topics);
}
To learn more, read the Admin API Javadoc.
Connect API¶
The Kafka Connect API enables you to build and run reusable data import/export connectors that consume (read) or produce (write) streams of events from and to external systems and applications that integrate with Kafka. For example, a connector to a relational database like PostgreSQL might capture every change to a set of tables.
Typically you do not need to implement your own connectors because the Kafka community provides hundreds of ready-to-use connectors.
Confluent Tip
Confluent offers several connectors for use with Kafka, whether on-premises or in the cloud.
- For a filterable list of ready-to-use connectors for Kafka and Confluent Platform, see Confluent Hub and Self-managed Connectors for Confluent Platform in the Confluent Platform documentation.
- For a list of ready-to-use connectors for use with Confluent Cloud, see Connect to External Systems in Confluent Cloud.
To learn more about the Connect API, see Kafka Connect API Javadoc.
Kafka Streams API¶
Use the Kafka Streams API to implement stream processing applications and microservices. Input is read from one or more topics in order to generate output to one or more topics, transforming the input streams to output streams.
The Kafka Streams API provides higher-level functions with the Kafka Streams Domain Specific Language (DSL) and lower level processing with the Processor API.
You can use Kafka Streams by adding the following Maven dependencies to your project. Note that the kafka-streams-scala
dependency is only required if you
are writing Kafka Streams applications with Scala.
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>3.4.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.4.0</version>
</dependency>
<!-- Optionally include Kafka Streams DSL for Scala for Scala 2.13 -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams-scala_2.13</artifactId>
<version>3.4.0</version>
</dependency>
Kafka Streams DSL¶
A high-level API that provides the most common data transformation operations such as map, filter, join, and aggregations out of the box. The DSL is recommended if you are new to developing with Kafka Streams, and it should cover most use cases and stream processing requirements. If you are using Scala, you can use the Kafka Streams DSL for Scala library, which removes much of the Java/Scala interoperability boilerplate compared with the Java DSL.
To learn more, see the Kafka Streams DSL Developer Guide.
Confluent Tip
For a tutorial on how to build a Kafka streams application using the DSL API, see How to build your first Apache Kafka Streams application.
Processor API¶
A low-level API that enables you to add and connect processors as well as interact directly with state stores. The Processor API provides you more flexibility than the DSL API but requires more code.
To learn more, see the Processor API Developer Guide.