Apache Kafka Java Client

Confluent Platform includes the Apache Kafka® Java Client producer and consumer. An overview of Kafka producers and consumers for the Java Client is provided below.

A producer sends records to Kafka topics. Key components of a Java producer are listed below:

  • ProducerRecord: Represents a record or a message to be sent to Kafka. It requires a topic name to send the record, and optionally, you can also specify a key and a partition.
  • KafkaProducer: Responsible for sending records to their respective topics in Kafka.
  • Serializer: Converts the records into bytes to be sent to Kafka. Kafka provides serializers for common types, and you can also write custom serializers.

A consumer reads data from Kafka topics. Key components of a Java consumer are listed below:

  • ConsumerRecord: Represents a record or a message retrieved from Kafka.
  • KafkaConsumer: Polls records from Kafka.
  • Deserializer: Converts bytes fetched from Kafka back into objects. As with serializers, Kafka provides deserializers for common types, and you can also create custom ones.
  • ConsumerGroup: If multiple consumers share the same group ID, they will jointly consume messages from one or more topics, with each consumer in the group consuming a unique set of partitions.

For a step-by-step guide on building a Java client application for Kafka, see Getting Started with Apache Kafka and Java.

Java Client installation

All JARs included in the packages are also available in the Confluent Maven repository. Here’s a sample POM file showing how to add this repository:

<repositories>

  <repository>
    <id>confluent</id>
    <url>https://packages.confluent.io/maven/</url>
  </repository>

  <!-- further repository entries here -->

</repositories>

The Confluent Maven repository includes compiled versions of Kafka.

For example, to reference the Kafka version that is included with Confluent Platform 7.0.1, use the following in your pom.xml:

<dependencies>

  <dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>7.0.1-ccs</version>
  </dependency>

  <!-- further dependency entries here -->

</dependencies>

Note

Version names of Apache Kafka vs. Kafka in Confluent Platform: Confluent always contributes patches back to the Apache Kafka® open source project. However, the exact versions (and version names) being included in Confluent Platform may differ from the Apache artifacts when Confluent Platform and Kafka releases do not align. If they are different, Confluent keeps the groupId and artifactId identical, but appends the suffix -ccs to the version identifier of the Confluent Platform version to distinguish these from the Apache artifacts.

You can reference artifacts for all Java libraries that are included with Confluent Platform. For example, to use the Avro serializer you can include the following in your pom.xml:

<dependencies>

  <dependency>
    <groupId>io.confluent</groupId>
    <artifactId>kafka-avro-serializer</artifactId>
    <!-- For Confluent Platform 7.0.1 -->
    <version>7.0.1</version>
  </dependency>

  <!-- further dependency entries here -->

</dependencies>

Tip

You can also specify kafka-protobuf-serializer or kafka-jsonschema-serializer serializers. For more information, see Schema Formats, Serializers, and Deserializers.

Java Client example code

For a step-by-step tutorial using the Java client including code samples for the producer and consumer see this guide.

There are additional examples including how to produce and consume Avro data with Schema Registry.

Kafka Producer

Initialization

The Java producer is constructed with a standard Properties file.

Properties config = new Properties();
config.put("client.id", InetAddress.getLocalHost().getHostName());
config.put("bootstrap.servers", "host1:9092,host2:9092");
config.put("acks", "all");
new KafkaProducer<K, V>(config);

Configuration errors will result in a raised KafkaException from the constructor of KafkaProducer.

Asynchronous writes

The Java producer includes a send() API which returns a future which can be polled to get the result of the send.

final ProducerRecord<K, V> record = new ProducerRecord<>(topic, key, value);
Future<RecordMetadata> future = producer.send(record);

This producer example shows how to invoke some code after the write has completed you can also provide a callback. In Java this is implemented as a Callback object:

final ProducerRecord<K, V> record = new ProducerRecord<>(topic, key, value);
producer.send(record, new Callback() {
  public void onCompletion(RecordMetadata metadata, Exception e) {
    if (e != null)
      log.debug("Send failed for record {}", record, e);
  }
});

In the Java implementation you should avoid doing any expensive work in this callback since it is executed in the producer’s IO thread.

Synchronous writes

Future<RecordMetadata> future = producer.send(record);
RecordMetadata metadata = future.get();

Kafka Consumer

Initialization

The Java consumer is constructed with a standard Properties file.

Properties config = new Properties();
config.put("client.id", InetAddress.getLocalHost().getHostName());
config.put("group.id", "foo");
config.put("bootstrap.servers", "host1:9092,host2:9092");
new KafkaConsumer<K, V>(config);

Configuration errors will result in a KafkaException raised from the constructor of KafkaConsumer.

Basic usage

The Java client is designed around an event loop which is driven by the poll() API. This design is motivated by the UNIX select and poll system calls. A basic consumption loop with the Java API usually takes the following form:

while (running) {
  ConsumerRecords<K, V> records = consumer.poll(Long.MAX_VALUE);
  process(records); // application-specific processing
  consumer.commitSync();
}

There is no background thread in the Java consumer. The API depends on calls to poll() to drive all of its IO including:

  • Joining the consumer group and handling partition rebalances.
  • Sending periodic heartbeats if part of an active generation.
  • Sending periodic offset commits (if autocommit is enabled).
  • Sending and receiving fetch requests for assigned partitions.

Due to this single-threaded model, no heartbeats can be sent while the application is handling the records returned from a call to poll(). This means that the consumer will fall out of the consumer group if either the event loop terminates or if a delay in record processing causes the session timeout to expire before the next iteration of the loop. This is actually by design. One of the problems that the Java client attempts to solve is ensuring the liveness of consumers in the group. As long as the consumer is assigned partitions, no other members in the group can consume from the same partitions, so it is important to ensure that it is actually making progress and has not become a zombie.

This feature protects your application from a large class of failures, but the downside is that it puts the burden on you to tune the session timeout so that the consumer does not exceed it in its normal record processing. The max.poll.records configuration option places an upper bound on the number of records returned from each call. You should use both poll() and max.poll.records with a fairly high session timeout (e.g. 30 to 60 seconds), and keeping the number of records processed on each iteration bounded so that worst-case behavior is predictable.

If you fail to tune these settings appropriately, the consequence is typically a CommitFailedException raised from the call to commit offsets for the processed records. If you are using the automatic commit policy, then you might not even notice when this happens since the consumer silently ignores commit failures internally (unless it’s occurring often enough to impact lag metrics). You can catch this exception and either ignore it or perform any needed rollback logic.

while (running) {
  ConsumerRecords<K, V> records = consumer.poll(Long.MAX_VALUE);
  process(records); // application-specific processing
  try {
    consumer.commitSync();
  } catch (CommitFailedException e) {
    // application-specific rollback of processed records
  }
}

Java Client code examples

Basic poll loop

The consumer API is centered around the poll() method, which is used to retrieve records from the brokers. The subscribe() method controls which topics will be fetched in poll. Typically, consumer usage involves an initial call to subscribe() to setup the topics of interest and then a loop which calls poll() until the application is shut down.

The consumer intentionally avoids a specific threading model. It is not safe for multi-threaded access and it has no background threads of its own. In particular, this means that all IO occurs in the thread calling poll(). In the consumer example below, the poll loop is wrapped in a Runnable which makes it easy to use with an ExecutorService.

public abstract class BasicConsumeLoop implements Runnable {
  private final KafkaConsumer<K, V> consumer;
  private final List<String> topics;
  private final AtomicBoolean shutdown;
  private final CountDownLatch shutdownLatch;

  public BasicConsumeLoop(Properties config, List<String> topics) {
    this.consumer = new KafkaConsumer<>(config);
    this.topics = topics;
    this.shutdown = new AtomicBoolean(false);
    this.shutdownLatch = new CountDownLatch(1);
  }

  public abstract void process(ConsumerRecord<K, V> record);

  public void run() {
    try {
      consumer.subscribe(topics);

      while (!shutdown.get()) {
        ConsumerRecords<K, V> records = consumer.poll(500);
        records.forEach(record -> process(record));
      }
    } finally {
      consumer.close();
      shutdownLatch.countDown();
    }
  }

  public void shutdown() throws InterruptedException {
    shutdown.set(true);
    shutdownLatch.await();
  }
}

The poll timeout is hard-coded to 500 milliseconds. If no records are received before this timeout expires, then poll() will return an empty record set. It’s not a bad idea to add a shortcut check for this case if your message processing involves any setup overhead.

To shut down the consumer, a flag is added which is checked on each loop iteration. After shutdown is triggered, the consumer will wait at most 500 milliseconds (plus the message processing time) before shutting down since it might be triggered while it is in poll(). A better approach is provided in the next example.

Note that you should always call close() after you are finished using the consumer. Doing so will ensure that active sockets are closed and internal state is cleaned up. It will also trigger a group rebalance immediately which ensures that any partitions owned by the consumer are re-assigned to another member in the group. If not closed properly, the broker will trigger the rebalance only after the session timeout has expired. Latch is added to this example to ensure that the consumer has time to finish closing before finishing shutdown.

Shutdown with wakeup

An alternative pattern for the poll loop in the Java consumer is to use Long.MAX_VALUE for the timeout. To break from the loop, you can use the consumer’s wakeup() method from a separate thread. This will raise a WakeupException from the thread blocking in poll(). If the thread is not currently blocking, then this will wakeup the next poll invocation.

public abstract class ConsumeLoop implements Runnable {
  private final KafkaConsumer<K, V> consumer;
  private final List<String> topics;
  private final CountDownLatch shutdownLatch;

  public BasicConsumeLoop(KafkaConsumer<K, V> consumer, List<String> topics) {
    this.consumer = consumer;
    this.topics = topics;
    this.shutdownLatch = new CountDownLatch(1);
  }

  public abstract void process(ConsumerRecord<K, V> record);

  public void run() {
    try {
      consumer.subscribe(topics);

      while (true) {
        ConsumerRecords<K, V> records = consumer.poll(Long.MAX_VALUE);
        records.forEach(record -> process(record));
      }
    } catch (WakeupException e) {
      // ignore, we're closing
    } catch (Exception e) {
      log.error("Unexpected error", e);
    } finally {
      consumer.close();
      shutdownLatch.countDown();
    }
  }

  public void shutdown() throws InterruptedException {
    consumer.wakeup();
    shutdownLatch.await();
  }
}

Synchronous commits

The simplest and most reliable way to manually commit offsets is using a synchronous commit with commitSync(). As its name suggests, this method blocks until the commit has completed successfully.

private void doCommitSync() {
  try {
    consumer.commitSync();
  } catch (WakeupException e) {
    // we're shutting down, but finish the commit first and then
    // rethrow the exception so that the main loop can exit
    doCommitSync();
    throw e;
  } catch (CommitFailedException e) {
    // the commit failed with an unrecoverable error. if there is any
    // internal state which depended on the commit, you can clean it
    // up here. otherwise it's reasonable to ignore the error and go on
    log.debug("Commit failed", e);
  }
}

public void run() {
  try {
    consumer.subscribe(topics);

    while (true) {
      ConsumerRecords<K, V> records = consumer.poll(Long.MAX_VALUE);
      records.forEach(record -> process(record));
      doCommitSync();
    }
  } catch (WakeupException e) {
    // ignore, we're closing
  } catch (Exception e) {
    log.error("Unexpected error", e);
  } finally {
    consumer.close();
    shutdownLatch.countDown();
  }
}

In this example, a try/catch block is added around the call to commitSync. The CommitFailedException is thrown when the commit cannot be completed because the group has been rebalanced. This is the main thing to be careful of when using the Java client. Since all network IO (including heartbeating) and message processing is done in the foreground, it is possible for the session timeout to expire while a batch of messages is being processed. To handle this, you have two choices.

First you can adjust the session.timeout.ms setting to ensure that the handler has enough time to finish processing messages. You can then tune max.partition.fetch.bytes to limit the amount of data returned in a single batch, though you will have to consider how many partitions are in the subscribed topics.

The second option is to do message processing in a separate thread, but you will have to manage flow control to ensure that the threads can keep up. For example, just pushing messages into a blocking queue would probably not be sufficient unless the rate of processing can keep up with the rate of delivery (in which case you might not need a separate thread). It may even exacerbate the problem if the poll loop is stuck blocking on a call to offer() while the background thread is handling an even larger batch of messages. The Java API offers a pause() method to help in these situations.

For now, you should set session.timeout.ms large enough that commit failures from rebalances are rare. As mentioned above, the only drawback to this is a longer delay before partitions can be re-assigned in the event of a hard failure (where the consumer cannot be cleanly shut down with close()). This should be rare in practice.

You should be careful in this example since the wakeup() might be triggered while the commit is pending. The recursive call is safe since the wakeup will only be triggered once.

Delivery guarantees

In the previous example, you get “at least once” delivery since the commit follows the message processing. By changing the order, however, you can get “at most once” delivery. But you must be a little careful with the commit failure, so you should change doCommitSync to return whether or not the commit succeeded. There’s also no longer any need to catch the WakeupException in the synchronous commit.

private boolean doCommitSync() {
  try {
    consumer.commitSync();
    return true;
  } catch (CommitFailedException e) {
    // the commit failed with an unrecoverable error. if there is any
    // internal state which depended on the commit, you can clean it
    // up here. otherwise it's reasonable to ignore the error and go on
    log.debug("Commit failed", e);
    return false;
  }
}

public void run() {
  try {
    consumer.subscribe(topics);

    while (true) {
      ConsumerRecords<K, V> records = consumer.poll(Long.MAX_VALUE);
      if (doCommitSync())
        records.forEach(record -> process(record));
    }
  } catch (WakeupException e) {
    // ignore, we're closing
  } catch (Exception e) {
    log.error("Unexpected error", e);
  } finally {
    consumer.close();
    shutdownLatch.countDown();
  }
}

Correct offset management is crucial because it affects delivery semantics.

Asynchronous commits

public void run() {
  try {
    consumer.subscribe(topics);

    while (true) {
      ConsumerRecords<K, V> records = consumer.poll(Long.MAX_VALUE);
      records.forEach(record -> process(record));
      consumer.commitAsync();
    }
  } catch (WakeupException e) {
    // ignore, we're closing
  } catch (Exception e) {
    log.error("Unexpected error", e);
  } finally {
    consumer.close();
    shutdownLatch.countDown();
  }
}

The API gives you a callback which is invoked when the commit either succeeds or fails:

public void run() {
  try {
    consumer.subscribe(topics);

    while (true) {
      ConsumerRecords<K, V> records = consumer.poll(Long.MAX_VALUE);
      records.forEach(record -> process(record));
      consumer.commitAsync(new OffsetCommitCallback() {
        public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
          if (e != null)
            log.debug("Commit failed for offsets {}", offsets, e);
          }
      });
    }
  } catch (WakeupException e) {
    // ignore, we're closing
  } catch (Exception e) {
    log.error("Unexpected error", e);
  } finally {
    consumer.close();
    shutdownLatch.countDown();
  }
}

In the example below, synchronous commits are incorporated on rebalances and on close. For this, the subscribe() method has a variant which accepts a ConsumerRebalanceListener, which has two methods to hook into rebalance behavior.

private void doCommitSync() {
  try {
    consumer.commitSync();
  } catch (WakeupException e) {
    // we're shutting down, but finish the commit first and then
    // rethrow the exception so that the main loop can exit
    doCommitSync();
    throw e;
  } catch (CommitFailedException e) {
    // the commit failed with an unrecoverable error. if there is any
    // internal state which depended on the commit, you can clean it
    // up here. otherwise it's reasonable to ignore the error and go on
    log.debug("Commit failed", e);
  }
}

public void run() {
  try {
    consumer.subscribe(topics, new ConsumerRebalanceListener() {
      @Override
      public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
        doCommitSync();
      }

      @Override
      public void onPartitionsAssigned(Collection<TopicPartition> partitions) {}
    });

    while (true) {
      ConsumerRecords<K, V> records = consumer.poll(Long.MAX_VALUE);
      records.forEach(record -> process(record));
      consumer.commitAsync();
    }
  } catch (WakeupException e) {
    // ignore, we're closing
  } catch (Exception e) {
    log.error("Unexpected error", e);
  } finally {
    try {
      doCommitSync();
    } finally {
      consumer.close();
      shutdownLatch.countDown();
    }
  }
}

API documentation

Click here to view the Java Client API documentation.

Limitations

Confluent Cloud does not support the following methods: