Important
You are viewing documentation for an older version of Confluent Platform. For the latest, click here.
Kafka Java Client¶
Confluent Platform includes the Java producer and consumer shipped with Apache Kafka®.
Java Client installation¶
All JARs included in the packages are also available in the Confluent Maven repository. Here’s a sample POM file showing how to add this repository:
<repositories>
<repository>
<id>confluent</id>
<url>https://packages.confluent.io/maven/</url>
</repository>
<!-- further repository entries here -->
</repositories>
The Confluent Maven repository includes compiled versions of Kafka.
To reference the Kafka version 2.5 that is included with Confluent Platform 5.5.15,
use the following in your pom.xml
:
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.12</artifactId>
<version>5.5.15-ccs</version>
</dependency>
<!-- further dependency entries here -->
</dependencies>
Note
Version names of Apache Kafka vs. Kafka in Confluent Platform:
Confluent always contributes patches back to the Apache Kafka® open source project.
However, the exact versions (and version names) being included in Confluent Platform
may differ from the Apache artifacts when Confluent Platform and Kafka
releases do not align. If they are different, Confluent keeps the groupId
and artifactId
identical, but appends the suffix -ccs
to the version identifier
of the Confluent Platform version to distinguish these from the Apache artifacts.
You can reference artifacts for all Java libraries that are included with Confluent Platform. For example, to use the
Avro serializer you can include the following in your pom.xml
:
<dependencies>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<!-- For Confluent Platform 5.5.15 -->
<version>5.5.15</version>
</dependency>
<!-- further dependency entries here -->
</dependencies>
Tip
You can also specify kafka-protobuf-serializer
or kafka-jsonschema-serializer
serializers. For more information, see Schema Formats, Serializers, and Deserializers.
Java Client example code¶
For Hello World examples of Kafka clients in Java, see Java. All examples include a producer and consumer that can connect to any Kafka cluster running on-premises or in Confluent Cloud. They also include examples of how to produce and consume Avro data with Schema Registry.
Kafka Producer¶
Initialization¶
The Java producer is constructed with a standard Properties
file.
Properties config = new Properties();
config.put("client.id", InetAddress.getLocalHost().getHostName());
config.put("bootstrap.servers", "host1:9092,host2:9092");
config.put("acks", "all");
new KafkaProducer<K, V>(config);
Configuration errors will result in a raised KafkaException
from
the constructor of KafkaProducer
.
Asynchronous writes¶
The Java producer includes a send()
API which returns a future which can be polled to get the result of the send.
final ProducerRecord<K, V> record = new ProducerRecord<>(topic, key, value);
Future<RecordMetadata> future = producer.send(record);
This producer example shows how to invoke some code after the write has completed you can also
provide a callback. In Java this is implemented as a Callback
object:
final ProducerRecord<K, V> record = new ProducerRecord<>(topic, key, value);
producer.send(record, new Callback() {
public void onCompletion(RecordMetadata metadata, Exception e) {
if (e != null)
log.debug("Send failed for record {}", record, e);
}
});
In the Java implementation you should avoid doing any expensive work in this callback since it is executed in the producer’s IO thread.
Synchronous writes¶
Future<RecordMetadata> future = producer.send(record);
RecordMetadata metadata = future.get();
Kafka Consumer¶
Initialization¶
The Java consumer is constructed with a standard Properties
file.
Properties config = new Properties();
config.put("client.id", InetAddress.getLocalHost().getHostName());
config.put("group.id", "foo");
config.put("bootstrap.servers", "host1:9092,host2:9092");
new KafkaConsumer<K, V>(config);
Configuration errors will result in a KafkaException
raised from
the constructor of KafkaConsumer
.
Basic usage¶
The Java client is designed around an event loop which is driven by
the poll()
API. This design is motivated by the UNIX select
and poll
system calls. A basic consumption loop with the Java API
usually takes the following form:
while (running) {
ConsumerRecords<K, V> records = consumer.poll(Long.MAX_VALUE);
process(records); // application-specific processing
consumer.commitSync();
}
There is no background thread in the Java consumer. The API depends on
calls to poll()
to drive all of its IO including:
- Joining the consumer group and handling partition rebalances.
- Sending periodic heartbeats if part of an active generation.
- Sending periodic offset commits (if autocommit is enabled).
- Sending and receiving fetch requests for assigned partitions.
Due to this single-threaded model, no heartbeats can be sent while
the application is handling the records returned from a call to poll()
.
This means that the consumer will fall out of the consumer group if either the event loop
terminates or if a delay in record processing causes the session
timeout to expire before the next iteration of the loop. This is
actually by design. One of the problems that the Java client attempts
to solve is ensuring the liveness of consumers in the group. As long
as the consumer is assigned partitions, no other members in the group
can consume from the same partitions, so it is important to ensure
that it is actually making progress and has not become a zombie.
This feature protects your application from a large class of failures,
but the downside is that it puts the burden on you to tune the session
timeout so that the consumer does not exceed it in its normal record
processing. The max.poll.records
configuration option places an upper bound on the number of
records returned from each call. You should use both poll()
and max.poll.records
with a fairly high
session timeout (e.g. 30 to 60 seconds), and keeping the number of
records processed on each iteration bounded so that worst-case
behavior is predictable.
If you fail to tune these settings appropriately, the consequence is
typically a CommitFailedException
raised from the call to commit
offsets for the processed records. If you are using the automatic
commit policy, then you might not even notice when this happens since
the consumer silently ignores commit failures internally (unless it’s
occurring often enough to impact lag metrics). You can catch this
exception and either ignore it or perform any needed rollback logic.
while (running) {
ConsumerRecords<K, V> records = consumer.poll(Long.MAX_VALUE);
process(records); // application-specific processing
try {
consumer.commitSync();
} catch (CommitFailedException e) {
// application-specific rollback of processed records
}
}
Java Client code examples¶
Basic poll loop¶
The consumer API is centered around the poll()
method, which is
used to retrieve records from the brokers. The subscribe()
method
controls which topics will be fetched in poll. Typically, consumer
usage involves an initial call to subscribe()
to setup the topics
of interest and then a loop which calls poll()
until the
application is shut down.
The consumer intentionally avoids a specific threading model. It is
not safe for multi-threaded access and it has no background threads of
its own. In particular, this means that all IO occurs in the thread
calling poll()
. In the consumer example below, the poll loop is wrapped in a
Runnable
which makes it easy to use with an ExecutorService
.
public abstract class BasicConsumeLoop implements Runnable {
private final KafkaConsumer<K, V> consumer;
private final List<String> topics;
private final AtomicBoolean shutdown;
private final CountDownLatch shutdownLatch;
public BasicConsumeLoop(Properties config, List<String> topics) {
this.consumer = new KafkaConsumer<>(config);
this.topics = topics;
this.shutdown = new AtomicBoolean(false);
this.shutdownLatch = new CountDownLatch(1);
}
public abstract void process(ConsumerRecord<K, V> record);
public void run() {
try {
consumer.subscribe(topics);
while (!shutdown.get()) {
ConsumerRecords<K, V> records = consumer.poll(500);
records.forEach(record -> process(record));
}
} finally {
consumer.close();
shutdownLatch.countDown();
}
}
public void shutdown() throws InterruptedException {
shutdown.set(true);
shutdownLatch.await();
}
}
The poll timeout is hard-coded to 500 milliseconds. If no records
are received before this timeout expires, then poll()
will return
an empty record set. It’s not a bad idea to add a shortcut check for
this case if your message processing involves any setup overhead.
To shut down the consumer, a flag is added which is checked on each
loop iteration. After shutdown is triggered, the consumer will wait at
most 500 milliseconds (plus the message processing time) before
shutting down since it might be triggered while it is in poll()
.
A better approach is provided in the next example.
Note that you should always call close()
after you are finished
using the consumer. Doing so will ensure that active sockets are
closed and internal state is cleaned up. It will also trigger a group
rebalance immediately which ensures that any partitions owned by the
consumer are re-assigned to another member in the group. If not closed
properly, the broker will trigger the rebalance only after the session
timeout has expired. Latch is added to this example to ensure
that the consumer has time to finish closing before finishing
shutdown.
Shutdown with wakeup¶
An alternative pattern for the poll loop in the Java consumer is to
use Long.MAX_VALUE
for the timeout. To break from the loop, you can
use the consumer’s wakeup()
method from a separate thread. This
will raise a WakeupException
from the thread blocking in
poll()
. If the thread is not currently blocking, then this will
wakeup the next poll invocation.
public abstract class ConsumeLoop implements Runnable {
private final KafkaConsumer<K, V> consumer;
private final List<String> topics;
private final CountDownLatch shutdownLatch;
public BasicConsumeLoop(KafkaConsumer<K, V> consumer, List<String> topics) {
this.consumer = consumer;
this.topics = topics;
this.shutdownLatch = new CountDownLatch(1);
}
public abstract void process(ConsumerRecord<K, V> record);
public void run() {
try {
consumer.subscribe(topics);
while (true) {
ConsumerRecords<K, V> records = consumer.poll(Long.MAX_VALUE);
records.forEach(record -> process(record));
}
} catch (WakeupException e) {
// ignore, we're closing
} catch (Exception e) {
log.error("Unexpected error", e);
} finally {
consumer.close();
shutdownLatch.countDown();
}
}
public void shutdown() throws InterruptedException {
consumer.wakeup();
shutdownLatch.await();
}
}
Synchronous commits¶
The simplest and most
reliable way to manually commit offsets is using a synchronous commit
with commitSync()
. As its name suggests, this method blocks until
the commit has completed successfully.
private void doCommitSync() {
try {
consumer.commitSync();
} catch (WakeupException e) {
// we're shutting down, but finish the commit first and then
// rethrow the exception so that the main loop can exit
doCommitSync();
throw e;
} catch (CommitFailedException e) {
// the commit failed with an unrecoverable error. if there is any
// internal state which depended on the commit, you can clean it
// up here. otherwise it's reasonable to ignore the error and go on
log.debug("Commit failed", e);
}
}
public void run() {
try {
consumer.subscribe(topics);
while (true) {
ConsumerRecords<K, V> records = consumer.poll(Long.MAX_VALUE);
records.forEach(record -> process(record));
doCommitSync();
}
} catch (WakeupException e) {
// ignore, we're closing
} catch (Exception e) {
log.error("Unexpected error", e);
} finally {
consumer.close();
shutdownLatch.countDown();
}
}
In this example, a try/catch block is added around the call to
commitSync
. The CommitFailedException
is thrown when the
commit cannot be completed because the group has been rebalanced. This
is the main thing to be careful of when using the Java
client. Since all network IO (including heartbeating) and message
processing is done in the foreground, it is possible for the session
timeout to expire while a batch of messages is being processed. To
handle this, you have two choices.
First you can adjust the session.timeout.ms
setting to ensure that
the handler has enough time to finish processing messages. You can
then tune max.partition.fetch.bytes
to limit the amount of data
returned in a single batch, though you will have to consider how many
partitions are in the subscribed topics.
The second option is to do message processing in a separate thread,
but you will have to manage flow control to ensure that the threads
can keep up. For example, just pushing messages into a blocking queue
would probably not be sufficient unless the rate of processing can
keep up with the rate of delivery (in which case you might not need a
separate thread anway). It may even exacerbate the problem if the poll
loop is stuck blocking on a call to offer()
while the background
thread is handling an even larger batch of messages. The Java API
offers a pause()
method to help in these situations.
For now, you should set session.timeout.ms
large enough that
commit failures from rebalances are rare. As mentioned above, the only
drawback to this is a longer delay before partitions can be
re-assigned in the event of a hard failure (where the consumer cannot
be cleanly shut down with close()
). This should be rare in
practice.
You should be careful in this example since
the wakeup()
might be triggered while the commit is pending. The
recursive call is safe since the wakeup will only be triggered once.
Delivery guarantees¶
In the previous example, you get “at least once”
delivery since
the commit follows the message processing. By changing the order,
however, you can get “at most once” delivery. But you must be a
little careful with the commit failure, so you should change doCommitSync
to return whether or not the commit succeeded. There’s also no longer
any need to catch the WakeupException
in the synchronous commit.
private boolean doCommitSync() {
try {
consumer.commitSync();
return true;
} catch (CommitFailedException e) {
// the commit failed with an unrecoverable error. if there is any
// internal state which depended on the commit, you can clean it
// up here. otherwise it's reasonable to ignore the error and go on
log.debug("Commit failed", e);
return false;
}
}
public void run() {
try {
consumer.subscribe(topics);
while (true) {
ConsumerRecords<K, V> records = consumer.poll(Long.MAX_VALUE);
if (doCommitSync())
records.forEach(record -> process(record));
}
} catch (WakeupException e) {
// ignore, we're closing
} catch (Exception e) {
log.error("Unexpected error", e);
} finally {
consumer.close();
shutdownLatch.countDown();
}
}
Correct offset management is crucial because it affects delivery semantics.
Asynchronous commits¶
public void run() {
try {
consumer.subscribe(topics);
while (true) {
ConsumerRecords<K, V> records = consumer.poll(Long.MAX_VALUE);
records.forEach(record -> process(record));
consumer.commitAsync();
}
} catch (WakeupException e) {
// ignore, we're closing
} catch (Exception e) {
log.error("Unexpected error", e);
} finally {
consumer.close();
shutdownLatch.countDown();
}
}
The API gives you a callback which is invoked when the commit either succeeds or fails:
public void run() {
try {
consumer.subscribe(topics);
while (true) {
ConsumerRecords<K, V> records = consumer.poll(Long.MAX_VALUE);
records.forEach(record -> process(record));
consumer.commitAsync(new OffsetCommitCallback() {
public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
if (e != null)
log.debug("Commit failed for offsets {}", offsets, e);
}
});
}
} catch (WakeupException e) {
// ignore, we're closing
} catch (Exception e) {
log.error("Unexpected error", e);
} finally {
consumer.close();
shutdownLatch.countDown();
}
}
In the example below, synchronous commits are incorporated on rebalances and on close.
For this, the subscribe()
method has a variant which accepts a
ConsumerRebalanceListener
, which has two methods to hook into
rebalance behavior.
private void doCommitSync() {
try {
consumer.commitSync();
} catch (WakeupException e) {
// we're shutting down, but finish the commit first and then
// rethrow the exception so that the main loop can exit
doCommitSync();
throw e;
} catch (CommitFailedException e) {
// the commit failed with an unrecoverable error. if there is any
// internal state which depended on the commit, you can clean it
// up here. otherwise it's reasonable to ignore the error and go on
log.debug("Commit failed", e);
}
}
public void run() {
try {
consumer.subscribe(topics, new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
doCommitSync();
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {}
});
while (true) {
ConsumerRecords<K, V> records = consumer.poll(Long.MAX_VALUE);
records.forEach(record -> process(record));
consumer.commitAsync();
}
} catch (WakeupException e) {
// ignore, we're closing
} catch (Exception e) {
log.error("Unexpected error", e);
} finally {
try {
doCommitSync();
} finally {
consumer.close();
shutdownLatch.countDown();
}
}
}
Suggested Reading¶
Blog post: Multi-Threaded Message Consumption with the Apache Kafka Consumer