Important Warning

Update July 2016: This Tech Preview documentation from March 2016 is outdated and deprecated. Please use the latest Confluent Platform documentation instead.

Kafka Consumers

The 0.9.0.0 release of Kafka introduces the new Kafka consumer, which is a complete rewrite in Java of the older Scala consumer. It combines the functionality of the 0.8 simple and high-level consumers into a single clean API. In addition to the consolidated API, it brings several important advantages to the table:

  • It supports Kafka 0.9.0.0 security extensions (SSL/SASL).
  • It uses a redesigned group management protocol on top of Kafka, which allows the number of consumer groups in the cluster to scale with the number of brokers.
  • Its library has a much smaller footprint with no unneeded dependences. In particular, it does not depend on Kafka core.

The Confluent Platform also ships with librdkafka, which is a high-performance C/C++ client library with support for the new consumer group management protocol.

Concepts

A consumer group is a set of consumers which cooperate to consume data from some topics. The partitions of all the topics are divided among the consumers in the group. As new group members arrive and old members leave, the partitions are re-assigned so that each member receives a proportional share of the partitions. This is known as rebalancing the group.

The main difference between the older “high-level” consumer and the new consumer is that the former depended on Zookeeper for group management, while the latter uses Kafka itself. For group management with Kafka, one of the brokers serves as the group’s coordinator. The coordinator maintains the current members of the group as well as their partition assignments. It is responsible for detecting when new members have joined and old members have left.

All of the groups managed by Kafka are divided roughly equally across all the brokers in the cluster. In other words, each broker may be the coordinator for some subset of the groups. This allows the number of groups to scale by increasing the number of brokers.

When the consumer starts up, it finds the coordinator for its group and sends a request to join the group. The coordinator then rebalances the partitions between all current member of the group and the new member. Every rebalance results in a new generation of the group.

Each member in the group must send heartbeats to the coordinator in order to remain a member of the group. If no hearbeat is received before expiration of the configured session timeout, then the coordinator will kick the member out of the group and reassign its partitions to another member.

Offset Management: After the consumer receives its assignment from the coordinator, it must determine the initial position for each assigned partition. Initially, before any messages have been read in the group, the position is set according to a configurable offset reset policy. Typically, consumption starts either at the earliest offset or the latest offset.

As a consumer in the group reads messages from the partitions assigned by the coordinator, it must commit the offsets corresponding to the messages it has read. If the consumer crashes or is shutdown, its partitions will be re-assigned to another member, which will begin consumption from the last committed offset of each partition. If the consumer crashes before any offset has been committed, then the consumer which takes over its partitions will use the reset policy.

The offset commit policy is crucial to providing the message delivery guarantees needed by your application. By default, the consumer is configured to use an automatic commit policy, which triggers a commit on a periodic interval. The consumer also supports a commit API which can be used for manual offset management. In the examples below, we show several detailed examples of the commit API and discuss the tradeoffs in terms of performance and reliability.

Configuration

The full list of configuration settings are available in the Kafka documentation Below we highlight several of the key configuration settings and how they affect the consumer’s behavior.

Core Configuration: The only required setting is bootstrap.servers, but we recommend always setting a client.id since this allows you to easily correlate requests on the broker with the client instance which made it. It also helps when using the consumer group utility shown below.

Group Configuration: You should always configure group.id unless you are using the simple assignment API and you don’t need to store offsets in Kafka.

You can control the session timeout by overriding the session.timeout.ms value. The default is 30 seconds, but you can safely increase it to avoid excessive rebalances if you find that your application needs more time to process messages. This concern is mainly relevant if you are using the Java consumer and handling messages in the same thread. The main drawback to using a larger session timeout is that it will take longer for the coordinator to detect when a consumer instance has crashed, which means it will also take longer for another consumer in the group to take over its partitions. For normal shutdowns, however, the consumer sends an explicit request to the coordinator to leave the group which triggers an immediate rebalance.

The other setting which affects rebalance behavior is heartbeat.interval.ms. This controls how often the consumer will send heartbeats to the coordinator. It is also the way that the consumer detects when a rebalance is needed, so a lower heartbeat interval will generally mean faster rebalances. The default setting is three seconds. For larger groups, it may be wise to increase this setting.

Offset Management: The two main settings affecting offset management are whether auto-commit is enabled and the offset reset policy. First, if you set enable.auto.commit (which is the default), then the consumer will automatically commit offsets periodically at the interval set by auto.commit.interval.ms. The default is 5 seconds.

Second, use auto.offset.reset to define the behavior of the consumer when there is no committed position (which would be the case when the group is first initialized) or when an offset is out of range. You can choose either to reset the position to the “earliest” offset or the “latest” offset (the default). You can also select “none” if you would rather set the initial offset yourself.

Administration

The 0.9.0 version of Kafka includes an admin utility for viewing the status of consumer groups.

List Groups

To get a list of the active groups in the cluster, you can use the kafka-consumer-groups utility included in the Kafka distribution. On a large cluster, this may take a little time since we need to collect the list by inspecting each broker in the cluster.

$ bin/kafka-consumer-groups --bootstrap-server host:9092 --new-consumer --list

Describe Group

The utility kafka-consumer-groups can also be used to collect information on a current group. For example, to see the current assignments for the foo group, use the following command:

$ bin/kafka-consumer-groups --bootstrap-server host:9092 --new-consumer --describe --group foo

If you happen to invoke this while a rebalance is in progress, the command will report an error. Retry again and you should see the assignments for all the members in the current generation.

Examples

Below we provide detailed examples of consumer API with special attention payed to offset management and delivery semantics. These examples are intended to be usable .

Initial Setup

The java consumer is constructed with a standard Properties file.

Properties config = new Properties();
config.put("client.id", InetAddress.getLocalHost().getHostName());
config.put("group.id", "foo");
config.put("bootstrap.servers", "host1:9092,host2:9092");
new KafkaConsumer<K, V>(config);

Configuration errors will result in a KafkaException raised from the constructor of KafkaConsumer. The librdkafka configuration is similar, but we need handle configuration errors directly when setting properties:

char hostname[128];
char errstr[512];

rd_kafka_conf_t *conf = rd_kafka_conf_new();

if (gethostname(hostname, sizeof(hostname))) {
  fprintf(stderr, "%% Failed to lookup hostname\n");
  exit(1);
}

if (rd_kafka_conf_set(conf, "client.id", hostname,
                      errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
  fprintf(stderr, "%% %s\n", errstr);
  exit(1);
}

if (rd_kafka_conf_set(conf, "group.id", "foo",
                      errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
  fprintf(stderr, "%% %s\n", errstr);
  exit(1);
}

if (rd_kafka_conf_set(conf, "bootstrap.servers", "host1:9092,host2:9092",
                      errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
  fprintf(stderr, "%% %s\n", errstr);
  exit(1);
}

/* Create Kafka consumer handle */
rd_kafka_t *rk;
if (!(rk = rd_kafka_new(RD_KAFKA_CONSUMER, conf,
                        errstr, sizeof(errstr)))) {
  fprintf(stderr, "%% Failed to create new consumer: %s\n", errstr);
  exit(1);
}

Basic Poll Loop

The consumer API is centered around the poll() method, which is used to retrieve records from the brokers. The subscribe() method controls which topics will be fetched in poll. Typically, consumer usage involves an initial call to subscribe() to setup the topics of interest and then a loop which calls poll() until the application is shutdown.

The consumer intentionally avoids a specific threading model. It is not safe for multi-threaded access and it has no background threads of its own. In particular, this means that all IO occurs in the thread calling poll(). In the example below, we wrap the poll loop in a Runnable which makes it easy to use with an ExecutorService.

public abstract class BasicConsumeLoop implements Runnable {
  private final KafkaConsumer<K, V> consumer;
  private final List<String> topics;
  private final AtomicBoolean shutdown;
  private final CountDownLatch shutdownLatch;

  public BasicConsumeLoop(Properties config, List<String> topics) {
    this.consumer = new KafkaConsumer<>(config);
    this.topics = topics;
    this.shutdown = new AtomicBoolean(false);
    this.shutdownLatch = new CountDownLatch(1);
  }

  public abstract void process(ConsumerRecord<K, V> record);

  public void run() {
    try {
      consumer.subscribe(topics);

      while (!shutdown.get()) {
        ConsumerRecords<K, V> records = consumer.poll(500);
        records.forEach(record -> process(record));
      }
    } finally {
      consumer.close();
      shutdownLatch.countDown();
    }
  }

  public void shutdown() throws InterruptedException {
    shutdown.set(true);
    shutdownLatch.await();
  }
}

We’ve hard-coded the poll timeout to 500 milliseconds. If no records are received before this timeout expires, then poll() will return an empty record set. It’s not a bad idea to add a shortcut check for this case if your message processing involves any setup overhead.

To shutdown the consumer, we’ve added a flag which is checked on each loop iteration. After shutdown is triggered, the consumer will wait at most 500 milliseconds (plus the message processing time) before shutting down since it might be triggered while we are in poll(). A better approach is provided in the next example.

Note that you should always call close() after you are finished using the consumer. Doing so will ensure that active sockets are closed and internal state is cleaned up. It will also trigger a group rebalance immediately which ensures that any partitions owned by the consumer are re-assigned to another member in the group. If not closed properly, the broker will trigger the rebalance only after the session timeout has expired. We’ve added a latch to this example to ensure that the consumer has time to finish closing before finishing shutdown.

The same example looks similar in librdkafka:

static int shutdown = 0;
static void msg_process(rd_kafka_message_t message);

void basic_consume_loop(rd_kafka_t *rk,
                        rd_kafka_topic_partition_list_t *topics) {
  rd_kafka_resp_err_t err;

  if ((err = rd_kafka_subscribe(rk, topics))) {
    fprintf(stderr, "%% Failed to start consuming topics: %s\n", rd_kafka_err2str(err));
    exit(1);
  }

  while (!shutdown) {
    rd_kafka_message_t *rkmessage = rd_kafka_consumer_poll(rk, 500);
    if (rkmessage) {
      msg_process(rkmessage);
      rd_kafka_message_destroy(rkmessage);
    }
  }

  err = rd_kakfka_consumer_close(rk);
  if (err)
    fprintf(stderr, "%% Failed to close consumer: %s\n", rd_kafka_err2str(err));
  else
    fprintf(stderr, "%% Consumer closed\n");
}

void shutdown() {
  shutdown = 1;
}

Although the APIs are similar, librdkafka uses a different approach beneath the surface. While the Java consumer does all IO and processing in the foreground thread, librdkafka uses a background thread. The main consequence of this is that calling rd_kafka_consumer_poll is totally safe when used from multiple threads. You can use this parallelize message handling in multiple threads. From a high level, poll is taking messages off of a queue which is filled in the background.

Another consequence of using a background thread is that all heartbeats and rebalances are executed in the background. The benefit of this is that you don’t need to worry about message handling causing the consumer to “miss” a rebalance. The drawback, however, is that the background thread will continue heartbeating even if your message processor dies. If this happens, then the consumer will continue to hold onto its partitions and the read lag will continue to build until the process is shutdown.

Although the two clients have taken a different approach internally, they are not as far apart as they seem. To provide the same abstraction in the Java client, you could place a queue in between the poll loop and the message processors. The poll loop would fill the queue and the processors would pull messages off of it.

Shutdown with Wakeup

An alternative pattern for the poll loop in the Java consumer is to use Long.MAX_VALUE for the timeout. To break from the loop, we can use the consumer’s wakeup() method from a separate thread. This will raise a WakeupException from the thread blocking in poll(). If the thread is not currently blocking, then this will wakeup the next poll invocation.

public abstract class ConsumeLoop implements Runnable {
  private final KafkaConsumer<K, V> consumer;
  private final List<String> topics;
  private final CountDownLatch shutdownLatch;

  public BasicConsumeLoop(KafkaConsumer<K, V> consumer, List<String> topics) {
    this.consumer = consumer;
    this.topics = topics;
    this.shutdownLatch = new CountDownLatch(1);
  }

  public abstract void process(ConsumerRecord<K, V> record);

  public void run() {
    try {
      consumer.subscribe(topics);

      while (true) {
        ConsumerRecords<K, V> records = consumer.poll(Long.MAX_VALUE);
        records.forEach(record -> process(record));
      }
    } catch (WakeupException e) {
      // ignore, we're closing
    } catch (Exception e) {
      log.error("Unexpected error", e);
    } finally {
      consumer.close();
      shutdownLatch.countDown();
    }
  }

  public void shutdown() throws InterruptedException {
    consumer.wakeup();
    shutdownLatch.await();
  }
}

Synchronous Commits

In the examples above, we have assumed that the consumer is configured to auto-commit offsets (this is the default). Auto-commit basically works as a cron with a period set through the auto.commit.interval.ms configuration property. If the consumer crashes, then after a restart or a rebalance, the position of all partitions owned by the crashed consumer will be reset to the last committed offset. When this happens, the last committed position may be as old as the auto-commit interval itself. Any messages which have arrived since the last commit will have to be read again.

Clearly if you want to reduce the window for duplicates, you can reduce the auto-commit interval, but some users may want even finer control over offsets. The consumer therefore supports a commit API which gives you full control over offsets. The simplest and most reliable way to manually commit offsets is using a synchronous commit with commitSync(). As its name suggests, this method blocks until the commit has completed successfully.

Note that when you use the commit API directly, you should first disable auto-commit in the configuration by setting the enable.auto.commit property to false.

private void doCommitSync() {
  try {
    consumer.commitSync();
  } catch (WakeupException e) {
    // we're shutting down, but finish the commit first and then
    // rethrow the exception so that the main loop can exit
    doCommitSync();
    throw e;
  } catch (CommitFailedException e) {
    // the commit failed with an unrecoverable error. if there is any
    // internal state which depended on the commit, you can clean it
    // up here. otherwise it's reasonable to ignore the error and go on
    log.debug("Commit failed", e);
  }
}

public void run() {
  try {
    consumer.subscribe(topics);

    while (true) {
      ConsumerRecords<K, V> records = consumer.poll(Long.MAX_VALUE);
      records.forEach(record -> process(record));
      doCommitSync();
    }
  } catch (WakeupException e) {
    // ignore, we're closing
  } catch (Exception e) {
    log.error("Unexpected error", e);
  } finally {
    consumer.close();
    shutdownLatch.countDown();
  }
}

In this example, we’ve added a try/catch block around the call to commitSync. The CommitFailedException is thrown when the commit cannot be completed because the group has been rebalanced. This is the main thing we have to be careful of when using the Java client. Since all network IO (including heartbeating) and message processing is done in the foreground, it is possible for the session timeout to expire while a batch of messages is being processed. To handle this, you have two choices.

First you can adjust the session.timeout.ms setting to ensure that the handler has enough time to finish processing messages. You can then tune max.partition.fetch.bytes to limit the amount of data returned in a single batch, though you will have to consider how many partitions are in the subscribed topics.

The second option is to do message processing in a separate thread, but you will have to manage flow control to ensure that the threads can keep up. For example, just pushing messages into a blocking queue would probably not be sufficient unless the rate of processing can keep up with the rate of delivery (in which case you might not need a separate thread anway). It may even exacerbate the problem if the poll loop is stuck blocking on a call to offer() while the background thread is handling an even larger batch of messages. The Java API offers a pause() method to help in these situations, which we will cover in another example.

For now, we recommend setting session.timeout.ms large enough that commit failures from rebalances are rare. As mentioned above, the only drawback to this is a longer delay before partitions can be re-assigned in the event of a hard failure (where the consumer cannot be cleanly shutdown with close()). This should be rare in practice.

Note also that we have to be a little careful in this example since the wakeup() might be triggered while the commit is pending. The recursive call is safe since the wakeup will only be triggered once.

In librdkafka, we can get similar behavior with rd_kafka_commit, which is used for both synchronous and asynchronous commits. The approach is slightly different, however, since rd_kafka_consumer_poll returns single messages instead of batches as the Java consumer does.

void consume_loop(rd_kafka_t *rk,
                  rd_kafka_topic_partition_list_t *topics) {
  static int MIN_COMMIT_COUNT = 1000;

  int msg_count = 0;
  rd_kafka_resp_err_t err;

  if ((err = rd_kafka_subscribe(rk, topics))) {
    fprintf(stderr, "%% Failed to start consuming topics: %s\n", rd_kafka_err2str(err));
    exit(1);
  }

  while (!shutdown) {
    rd_kafka_message_t *rkmessage = rd_kafka_consumer_poll(rk, 500);
    if (rkmessage) {
      msg_process(rkmessage);
      rd_kafka_message_destroy(rkmessage);

      if ((++msg_count % MIN_COMMIT_COUNT) == 0)
        rd_kafka_commit(rk, NULL, 0);
    }
  }

  err = rd_kakfka_consumer_close(rk);
  if (err)
    fprintf(stderr, "%% Failed to close consumer: %s\n", rd_kafka_err2str(err));
  else
    fprintf(stderr, "%% Consumer closed\n");
}

In this example, we trigger a synchronous commit every 1000 messages. The second argument to rd_kafka_commit is the list of offsets to be committed; if set to NULL, librdkafka will commit the latest offsets for the assigned positions. The third argument in rd_kafka_commit is a flag which controls whether this call is asynchronous. We could also trigger the commit on expiration of a timeout to ensure there the committed position is updated regularly.

Delivery Guarantees: This is as good a time as any to talk about delivery semantics. Using auto-commit gives you “at least once” delivery: Kafka guarantees that no messages will be missed, but duplicates are possible. In the above example, we get the same since the commit follows the message processing. By changing the order, however, we can get “at most once” delivery. But we have to be a little careful with the commit failure, so we change doCommitSync to return whether or not the commit succeeded. There’s also no longer any need to catch the WakeupException in the synchronous commit.

private boolean doCommitSync() {
  try {
    consumer.commitSync();
    return true;
  } catch (CommitFailedException e) {
    // the commit failed with an unrecoverable error. if there is any
    // internal state which depended on the commit, you can clean it
    // up here. otherwise it's reasonable to ignore the error and go on
    log.debug("Commit failed", e);
    return false;
  }
}

public void run() {
  try {
    consumer.subscribe(topics);

    while (true) {
      ConsumerRecords<K, V> records = consumer.poll(Long.MAX_VALUE);
      if (doCommitSync())
        records.forEach(record -> process(record));
    }
  } catch (WakeupException e) {
    // ignore, we're closing
  } catch (Exception e) {
    log.error("Unexpected error", e);
  } finally {
    consumer.close();
    shutdownLatch.countDown();
  }
}

And in librdkafka:

void consume_loop(rd_kafka_t *rk,
                  rd_kafka_topic_partition_list_t *topics) {
  rd_kafka_resp_err_t err;

  if ((err = rd_kafka_subscribe(rk, topics))) {
    fprintf(stderr, "%% Failed to start consuming topics: %s\n", rd_kafka_err2str(err));
    exit(1);
  }

  while (!shutdown) {
    rd_kafka_message_t *rkmessage = rd_kafka_consumer_poll(rk, 500);
    if (rkmessage && !rd_kakfa_commit_message(rk, rkmessage, 0)) {
      msg_process(rkmessage);
      rd_kafka_message_destroy(rkmessage);
    }
  }

  err = rd_kakfka_consumer_close(rk);
  if (err)
    fprintf(stderr, "%% Failed to close consumer: %s\n", rd_kafka_err2str(err));
  else
    fprintf(stderr, "%% Consumer closed\n");
}

For simplicity in this example, we’ve used rd_kafka_commit_message prior to processing the message. Committing on every message would produce a lot of overhead in practice. A better approach would be to collect a batch of messages, execute the synchronous commit, and then process the messages only if the commit succeeded.

Correct offset management is crucial because it affects delivery semantics. As of version 0.9.0.0, the best Kafka can give you is at-least-once or at-most-once. If you are not careful, however, it might not give you either. Exactly-once delivery is under active investigation, but it is not currently possible without depending on another system (e.g. a transactional RDBMS).

Asynchronous Commits

Each call to the commit API results in an offset commit request being sent to the broker. Using the synchronous API, the consumer is blocked until that request returns successfully. This may reduce overall throughput since the consumer might otherwise be able to process records while that commit is pending. One way to deal with this is to increase the amount of data that is returned in each poll(). The consumer has a configuration setting fetch.min.bytes which controls how much data is returned in each fetch. The broker will hold onto the fetch until enough data is available (or fetch.max.wait.ms expires). The tradeoff, however, is that this also increases the amount of duplicates that have to be dealt with in a worst-case failure.

A second option is to use asynchronous commits. Instead of waiting for the request to complete, the consumer can send the request and return immediately.

public void run() {
  try {
    consumer.subscribe(topics);

    while (true) {
      ConsumerRecords<K, V> records = consumer.poll(Long.MAX_VALUE);
      records.forEach(record -> process(record));
      consumer.commitAsync();
    }
  } catch (WakeupException e) {
    // ignore, we're closing
  } catch (Exception e) {
    log.error("Unexpected error", e);
  } finally {
    consumer.close();
    shutdownLatch.countDown();
  }
}

And in librdkafka:

void consume_loop(rd_kafka_t *rk,
                  rd_kafka_topic_partition_list_t *topics) {
  static int MIN_COMMIT_COUNT = 1000;

  int msg_count = 0;
  rd_kafka_resp_err_t err;

  if ((err = rd_kafka_subscribe(rk, topics))) {
    fprintf(stderr, "%% Failed to start consuming topics: %s\n", rd_kafka_err2str(err));
    exit(1);
  }

  while (!shutdown) {
    rd_kafka_message_t *rkmessage = rd_kafka_consumer_poll(rk, 500);
    if (rkmessage) {
      msg_process(rkmessage);
      rd_kafka_message_destroy(rkmessage);

      if ((++msg_count % MIN_COMMIT_COUNT) == 0)
        rd_kafka_commit(rk, NULL, 1);
    }
  }

  err = rd_kakfka_consumer_close(rk);
  if (err)
    fprintf(stderr, "%% Failed to close consumer: %s\n", rd_kafka_err2str(err));
  else
    fprintf(stderr, "%% Consumer closed\n");
}

The only difference between this example and the previous one is that we have enabled asynchronous commit in the call to rd_kafka_commit.

So if it helps performance, why not always use async commits? The main reason is that the consumer does not retry the request if the commit fails. This is something that commitSync gives you for free; it will retry indefinitely until the commit succeeds or an unrecoverable error is ecountered. The problem with asynchronous commits is dealing with commit ordering. By the time the consumer finds out that a commit has failed, we may already have processed the next batch of messages and even sent the next commit. In this case, a retry of the old commit could cause duplicate consumption.

Instead of complicating the consumer internals to try and handle this problem in a sane way, the API gives you a callback which is invoked when the commit either succeeds or fails. If you like, you can use this callback to retry the commit, but you will have to deal with the same reordering problem.

public void run() {
  try {
    consumer.subscribe(topics);

    while (true) {
      ConsumerRecords<K, V> records = consumer.poll(Long.MAX_VALUE);
      records.forEach(record -> process(record));
      consumer.commitAsync(new OffsetCommitCallback() {
        public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
          if (e != null)
            log.debug("Commit failed for offsets {}", offsets, e);
        }
      });
    }
  } catch (WakeupException e) {
    // ignore, we're closing
  } catch (Exception e) {
    log.error("Unexpected error", e);
  } finally {
    consumer.close();
    shutdownLatch.countDown();
  }
}

A similar feature is available in librdkafka, but we have to configure it on initialization:

static void on_commit(rd_kafka_t *rk,
                      rd_kafka_resp_err_t err,
                      rd_kafka_topic_partition_list_t *offsets,
                      void *opaque) {
  if (err)
    fprintf(stderr, "%% Failed to commit offsets: %s\n", rd_kafka_err2str(err));
}

void init_rd_kafka() {
  rd_kafka_conf_t *conf = rd_kafka_conf_new();
  rd_kafka_conf_set_offset_commit_cb(conf, on_commit);

  // initialization ommitted
}

Offset commit failures are merely annoying if the following commits succeed since they won’t actually result in duplicate reads. However, if the last commit fails before a rebalance occurs or before the consumer is shutdown, then offsets will be reset to the last commit and you will likely see duplicates. A common pattern is therefore to combine async commits in the poll loop with sync commits on rebalances or shutdown. Committing on close is straightforward, but we need a way to hook into rebalances. For this, the subscribe() method introduced earlier has a variant which accepts a ConsumerRebalanceListener, which has two methods to hook into rebalance behavior. In the example below, we incorporate synchronous commits on rebalances and on close.

private void doCommitSync() {
  try {
    consumer.commitSync();
  } catch (WakeupException e) {
    // we're shutting down, but finish the commit first and then
    // rethrow the exception so that the main loop can exit
    doCommitSync();
    throw e;
  } catch (CommitFailedException e) {
    // the commit failed with an unrecoverable error. if there is any
    // internal state which depended on the commit, you can clean it
    // up here. otherwise it's reasonable to ignore the error and go on
    log.debug("Commit failed", e);
  }
}

public void run() {
  try {
    consumer.subscribe(topics, new ConsumerRebalanceListener() {
      public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
        doCommitSync();
      }

      public void onPartitionsAssigned(Collection<TopicPartition> partitions) {}
    });

    while (true) {
      ConsumerRecords<K, V> records = consumer.poll(Long.MAX_VALUE);
      records.forEach(record -> process(record));
      consumer.commitAsync();
    }
  } catch (WakeupException e) {
    // ignore, we're closing
  } catch (Exception e) {
    log.error("Unexpected error", e);
  } finally {
    try {
      doCommitSync();
    } finally {
      consumer.close();
      shutdownLatch.countDown();
    }
  }
}

Each rebalance has two phases: partition revocation and partition assignment. The revocation method is always called before a rebalance and is our last chance to commit offsets before the partitions are re-asssigned. The assignment method is always called after the rebalance and can be used to set the initial position of the assigned partitions. In this case, we’ve used the revocation hook to commit the current offsets synchronously.

In general, asynchronous commits should be considered less safe than synchronous commits. Consecutive commit failures before a crash will result in increased duplicate processing. You can mitigate this danger by adding logic to handle commit failures in the callback or by mixing calls to commitSync() occasionally, but we wouldn’t recommend too much complexity unless testing shows it is necessary. If you need more reliability, synchronous commits are there for you, and you can still scale up by increasing the number of topic partitions and the number of consumers in the group. But if you just want to maximize throughput and you’re willing to accept some increase in the number of duplicates, then asynchronous commits may be a good option.

A somewhat obvious point, but one that’s worth making is that asynchronous commits only make sense for “at least once” message delivery. To get “at most once,” you need to know if the commit succeeded before consuming the message. This implies a synchronous commit unless you have the ability to “unread” a message after you find that the commit failed.