.. _kafka_consumer: Kafka Consumers --------------- The Confluent Platform ships with the standard Java consumer first released in Kafka 0.9.0.0, the high-performance C/C++ client `librdkafka `_, and clients for `Python `_ and `Go `_ .. note:: The older Scala consumers are still supported for now, but they are not covered in this documentation, and we encourage users to migrate away from them. In particular, :ref:`Kafka security extensions ` and the new broker based balanced consumer groups are not supported with the old consumers. Concepts ~~~~~~~~ A **consumer group** is a set of consumers which cooperate to consume data from some topics. The partitions of all the topics are divided among the consumers in the group. As new group members arrive and old members leave, the partitions are re-assigned so that each member receives a proportional share of the partitions. This is known as *rebalancing* the group. The main difference between the older "high-level" consumer and the new consumer is that the former depended on Zookeeper for group management, while the latter uses a group protocol built into Kafka itself. In this protocol, one of the brokers is designated as the group's **coordinator** and is responsible for managing the members of the group as well as their partition assignments. The coordinator of each group is chosen from the leaders of the internal offsets topic ``__consumer_offsets``, which is used to store committed offsets. Basically the group's ID is hashed to one of the partitions for this topic and the leader of that partition is selected as the coordinator. In this way, management of consumer groups is divided roughly equally across all the brokers in the cluster, which allows the number of groups to scale by increasing the number of brokers. When the consumer starts up, it finds the coordinator for its group and sends a request to join the group. The coordinator then begins a group rebalance so that the new member is assigned its fair share of the group's partitions. Every rebalance results in a new **generation** of the group. Each member in the group must send heartbeats to the coordinator in order to remain a member of the group. If no hearbeat is received before expiration of the configured **session timeout**, then the coordinator will kick the member out of the group and reassign its partitions to another member. **Offset Management**: After the consumer receives its assignment from the coordinator, it must determine the initial position for each assigned partition. When the group is first created, before any messages have been consumed, the position is set according to a configurable offset reset policy (``auto.offset.reset``). Typically, consumption starts either at the earliest offset or the latest offset. As a consumer in the group reads messages from the partitions assigned by the coordinator, it must commit the offsets corresponding to the messages it has read. If the consumer crashes or is shutdown, its partitions will be re-assigned to another member, which will begin consumption from the last committed offset of each partition. If the consumer crashes before any offset has been committed, then the consumer which takes over its partitions will use the reset policy. The offset commit policy is crucial to providing the message delivery guarantees needed by your application. By default, the consumer is configured to use an automatic commit policy, which triggers a commit on a periodic interval. The consumer also supports a commit API which can be used for manual offset management. In the examples below, we show several detailed examples of the commit API and discuss the tradeoffs in terms of performance and reliability. Configuration ~~~~~~~~~~~~~ The full list of configuration settings are available in the `Kafka documentation `_ Below we highlight several of the key configuration settings and how they affect the consumer's behavior. **Core Configuration**: The only required setting is ``bootstrap.servers``, but we recommend always setting a ``client.id`` since this allows you to easily correlate requests on the broker with the client instance which made it. Typically, all consumers within the same group will share the same client ID in order to enforce :ref:`client quotas `. **Group Configuration**: You should always configure ``group.id`` unless you are using the simple assignment API and you don't need to store offsets in Kafka. You can control the session timeout by overriding the ``session.timeout.ms`` value. The default is 30 seconds, but you can safely increase it to avoid excessive rebalances if you find that your application needs more time to process messages. This concern is mainly relevant if you are using the Java consumer and handling messages in the same thread. In that case, you may also want to adjust ``max.poll.records`` to tune the number of records that must be handled on every loop iteration. See :ref:`basic usage ` below for more detail on this issue. The main drawback to using a larger session timeout is that it will take longer for the coordinator to detect when a consumer instance has crashed, which means it will also take longer for another consumer in the group to take over its partitions. For normal shutdowns, however, the consumer sends an explicit request to the coordinator to leave the group which triggers an immediate rebalance. The other setting which affects rebalance behavior is ``heartbeat.interval.ms``. This controls how often the consumer will send heartbeats to the coordinator. It is also the way that the consumer detects when a rebalance is needed, so a lower heartbeat interval will generally mean faster rebalances. The default setting is three seconds. For larger groups, it may be wise to increase this setting. **Offset Management**: The two main settings affecting offset management are whether auto-commit is enabled and the offset reset policy. First, if you set ``enable.auto.commit`` (which is the default), then the consumer will automatically commit offsets periodically at the interval set by ``auto.commit.interval.ms``. The default is 5 seconds. Second, use ``auto.offset.reset`` to define the behavior of the consumer when there is no committed position (which would be the case when the group is first initialized) or when an offset is out of range. You can choose either to reset the position to the "earliest" offset or the "latest" offset (the default). You can also select "none" if you would rather set the initial offset yourself and you are willing to handle out of range errors manually. Initialization ~~~~~~~~~~~~~~ The Java consumer is constructed with a standard ``Properties`` file. .. sourcecode:: java Properties config = new Properties(); config.put("client.id", InetAddress.getLocalHost().getHostName()); config.put("group.id", "foo"); config.put("bootstrap.servers", "host1:9092,host2:9092"); new KafkaConsumer(config); Configuration errors will result in a ``KafkaException`` raised from the constructor of ``KafkaConsumer``. The C/C++ (librdkafka) configuration is similar, but we need to handle configuration errors directly when setting properties: .. literalinclude:: consumer-initialization.c :language: c The Python client can be configured via a dictionary as follows: .. sourcecode:: python from confluent_kafka import Consumer conf = {'bootstrap.servers': "host1:9092,host2:9092", 'group.id': "foo", 'default.topic.config': {'auto.offset.reset': 'smallest'}} consumer = Consumer(conf) The Go client uses a ``ConfigMap`` object to pass configuration to the consumer: .. sourcecode:: go import ( "github.com/confluentinc/confluent-kafka-go/kafka" ) consumer, err := kafka.NewConsumer(&kafka.ConfigMap{ "bootstrap.servers": "host1:9092,host2:9092", "group.id": "foo", "default.topic.config": kafka.ConfigMap{"auto.offset.reset": "smallest"}}) .. _consumer_basic_usage: Basic Usage ~~~~~~~~~~~ Although the Java client and librdkafka share many of the same configuration options and underlying features, they take fairly different approaches when it comes to their threading model and how they handle consumer liveness. Before diving into the examples, it's helpful to understand the API designs of each client. Java Client ^^^^^^^^^^^ The Java client is designed around an event loop which is driven by the ``poll()`` API. This design is motivated by the UNIX ``select`` and ``poll`` system calls. A basic consumption loop with the Java API usually takes the following form: .. sourcecode:: java while (running) { ConsumerRecords records = consumer.poll(Long.MAX_VALUE); process(records); // application-specific processing consumer.commitSync(); } There is no background thread in the Java consumer. The API depends on calls to ``poll()`` to drive all of its IO including: * Joining the consumer group and handling partition rebalances. * Sending periodic heartbeats if part of an active generation. * Sending periodic offset commits (if autocommit is enabled). * Sending and receiving fetch requests for assigned partitions. Due to this single-threaded model, no heartbeats can be sent while the application is handling the records returned from a call to ``poll()``. This means that the consumer will fall out of the consumer group if either the event loop terminates or if a delay in record processing causes the session timeout to expire before the next iteration of the loop. This is actually by design. One of the problems that the Java client attempts to solve is ensuring the liveness of consumers in the group. As long as the consumer is assigned partitions, no other members in the group can consume from the same partitions, so it is important to ensure that it is actually making progress and has not become a zombie. This feature protects your application from a large class of failures, but the downside is that it puts the burden on you to tune the session timeout so that the consumer does not exceed it in its normal record processing. In the 0.9 release of Kafka, this was difficult because there was no direct way to limit the amount of data returned from ``poll()``, but 0.10 added a new configuration option, ``max.poll.records``, which places an upper bound on the number of records returned from each call. We recommend both using a fairly high session timeout (e.g. 30 to 60 seconds), and keeping the number of records processed on each iteration bounded so that worst-case behavior is predictable. If you fail to tune these settings appropriately, the consequence is typically a ``CommitFailedException`` raised from the call to commit offsets for the processed records. If you are using the automatic commit policy, then you might not even notice when this happens since the consumer silently ignores commit failures internally (unless it's occurring often enough to impact lag metrics). You can catch this exception and either ignore it or perform any needed rollback logic. .. sourcecode:: java while (running) { ConsumerRecords records = consumer.poll(Long.MAX_VALUE); process(records); // application-specific processing try { consumer.commitSync(); } catch (CommitFailedException e) { // application-specific rollback of processed records } } C/C++ Client (librdkafka) ^^^^^^^^^^^^^^^^^^^^^^^^^ Librdkafka uses a multi-threaded approach to Kafka consumption. From a user's perspective, interaction with the API is not too different from the example used by the Java client with the user calling ``rd_kafka_consumer_poll`` in a loop, though this API returns only a single message or event at a time: .. literalinclude:: consumer-usage-1.c :language: c Unlike the Java client, librdkafka does all fetching and coordinator communication in background threads. This frees you from the complication of tuning the session timeout according to the expected processing time. However, since the background thread will keep the consumer alive until the client has been closed, it is up to you to ensure that your process does not become a zombie since it will continue to hold onto assigned partitions in that case. Note that partition rebalances also take place in a background thread, which means you still have to handle the potential for commit failures as the consumer may no longer have the same partition assignment when the commit begins. This is unnecessary if you enable autocommit since commit failures will be ignored silently, which also implies that you have no way to rollback processing. .. literalinclude:: consumer-usage-2.c :language: c Python Client ^^^^^^^^^^^^^ The Python client uses librdkafka internally so it also uses a multi-threaded approach to Kafka consumption. From a user's perspective, interaction with the API is not too different from the example used by the Java client with the user calling the ``poll()`` method in a loop, though this API returns only a single message at a time: .. sourcecode:: python try: msg_count = 0 while running: msg = consumer.poll(timeout=1.0) if msg is None: continue msg_process(msg) # application-specific processing msg_count += 1 if msg_count % MIN_COMMIT_COUNT == 0: consumer.commit(async=False) finally: # Shut down consumer consumer.close() Go Client ^^^^^^^^^ The Go client uses librdkafka internally so it also uses a multi-threaded approach to Kafka consumption. From a user's perspective, interaction with the API is not too different from the example used by the Java client with the user calling the ``Poll()`` method in a loop, though this API returns only a single message at a time: .. sourcecode:: go for run == true { ev := consumer.Poll(0) switch e := ev.(type) { case *kafka.Message: // application-specific processing case kafka.Error: fmt.Fprintf(os.Stderr, "%% Error: %v\n", e) run = false default: fmt.Printf("Ignored %v\n", e) } } Detailed Examples ~~~~~~~~~~~~~~~~~ Below we provide detailed examples using the consumer API with special attention payed to offset management and delivery semantics. These examples are intended to give you a starting point for building your consumer application. Basic Poll Loop ^^^^^^^^^^^^^^^ The consumer API is centered around the ``poll()`` method, which is used to retrieve records from the brokers. The ``subscribe()`` method controls which topics will be fetched in poll. Typically, consumer usage involves an initial call to ``subscribe()`` to setup the topics of interest and then a loop which calls ``poll()`` until the application is shutdown. The consumer intentionally avoids a specific threading model. It is not safe for multi-threaded access and it has no background threads of its own. In particular, this means that all IO occurs in the thread calling ``poll()``. In the example below, we wrap the poll loop in a ``Runnable`` which makes it easy to use with an ``ExecutorService``. .. sourcecode:: java public abstract class BasicConsumeLoop implements Runnable { private final KafkaConsumer consumer; private final List topics; private final AtomicBoolean shutdown; private final CountDownLatch shutdownLatch; public BasicConsumeLoop(Properties config, List topics) { this.consumer = new KafkaConsumer<>(config); this.topics = topics; this.shutdown = new AtomicBoolean(false); this.shutdownLatch = new CountDownLatch(1); } public abstract void process(ConsumerRecord record); public void run() { try { consumer.subscribe(topics); while (!shutdown.get()) { ConsumerRecords records = consumer.poll(500); records.forEach(record -> process(record)); } } finally { consumer.close(); shutdownLatch.countDown(); } } public void shutdown() throws InterruptedException { shutdown.set(true); shutdownLatch.await(); } } We've hard-coded the poll timeout to 500 milliseconds. If no records are received before this timeout expires, then ``poll()`` will return an empty record set. It's not a bad idea to add a shortcut check for this case if your message processing involves any setup overhead. To shutdown the consumer, we've added a flag which is checked on each loop iteration. After shutdown is triggered, the consumer will wait at most 500 milliseconds (plus the message processing time) before shutting down since it might be triggered while we are in ``poll()``. A better approach is provided in the next example. Note that you should always call ``close()`` after you are finished using the consumer. Doing so will ensure that active sockets are closed and internal state is cleaned up. It will also trigger a group rebalance immediately which ensures that any partitions owned by the consumer are re-assigned to another member in the group. If not closed properly, the broker will trigger the rebalance only after the session timeout has expired. We've added a latch to this example to ensure that the consumer has time to finish closing before finishing shutdown. The same example looks similar in librdkafka: .. literalinclude:: consumer-detailed-example.c :language: c In Python: .. sourcecode:: python running = True def basic_consume_loop(consumer, topics): try: consumer.subscribe(topics) while running: msg = consumer.poll(timeout=1.0) if msg is None: continue if msg.error(): if msg.error().code() == KafkaError._PARTITION_EOF: # End of partition event sys.stderr.write('%% %s [%d] reached end at offset %d\n' % (msg.topic(), msg.partition(), msg.offset())) elif msg.error(): raise KafkaException(msg.error()) else: msg_process(msg) finally: # Close down consumer to commit final offsets. consumer.close() def shutdown(): running = False And in Go: .. sourcecode:: go err = consumer.SubscribeTopics(topics, nil) for run == true { ev := consumer.Poll(0) switch e := ev.(type) { case *kafka.Message: fmt.Printf("%% Message on %s:\n%s\n", e.TopicPartition, string(e.Value)) case kafka.PartitionEOF: fmt.Printf("%% Reached %v\n", e) case kafka.Error: fmt.Fprintf(os.Stderr, "%% Error: %v\n", e) run = false default: fmt.Printf("Ignored %v\n", e) } } consumer.Close() Although the APIs are similar, the C/C++, Python, and Go clients uses a different approach than the Java client beneath the surface. While the Java consumer does all IO and processing in the foreground thread, these clients use a background thread. The main consequence of this is that calling ``rd_kafka_consumer_poll`` or ``Consumer.poll()`` is totally safe when used from multiple threads. You can use this to parallelize message handling in multiple threads. From a high level, poll is taking messages off of a queue which is filled in the background. Another consequence of using a background thread is that all heartbeats and rebalances are executed in the background. The benefit of this is that you don't need to worry about message handling causing the consumer to "miss" a rebalance. The drawback, however, is that the background thread will continue heartbeating even if your message processor dies. If this happens, then the consumer will continue to hold onto its partitions and the read lag will continue to build until the process is shutdown. Although the clients have taken different approaches internally, they are not as far apart as they seem. To provide the same abstraction in the Java client, you could place a queue in between the poll loop and the message processors. The poll loop would fill the queue and the processors would pull messages off of it. Shutdown with Wakeup ^^^^^^^^^^^^^^^^^^^^ An alternative pattern for the poll loop in the Java consumer is to use ``Long.MAX_VALUE`` for the timeout. To break from the loop, we can use the consumer's ``wakeup()`` method from a separate thread. This will raise a ``WakeupException`` from the thread blocking in ``poll()``. If the thread is not currently blocking, then this will wakeup the next poll invocation. .. sourcecode:: java public abstract class ConsumeLoop implements Runnable { private final KafkaConsumer consumer; private final List topics; private final CountDownLatch shutdownLatch; public BasicConsumeLoop(KafkaConsumer consumer, List topics) { this.consumer = consumer; this.topics = topics; this.shutdownLatch = new CountDownLatch(1); } public abstract void process(ConsumerRecord record); public void run() { try { consumer.subscribe(topics); while (true) { ConsumerRecords records = consumer.poll(Long.MAX_VALUE); records.forEach(record -> process(record)); } } catch (WakeupException e) { // ignore, we're closing } catch (Exception e) { log.error("Unexpected error", e); } finally { consumer.close(); shutdownLatch.countDown(); } } public void shutdown() throws InterruptedException { consumer.wakeup(); shutdownLatch.await(); } } Synchronous Commits ^^^^^^^^^^^^^^^^^^^ In the examples above, we have assumed that the consumer is configured to auto-commit offsets (this is the default). Auto-commit basically works as a cron with a period set through the ``auto.commit.interval.ms`` configuration property. If the consumer crashes, then after a restart or a rebalance, the position of all partitions owned by the crashed consumer will be reset to the last committed offset. When this happens, the last committed position may be as old as the auto-commit interval itself. Any messages which have arrived since the last commit will have to be read again. Clearly if you want to reduce the window for duplicates, you can reduce the auto-commit interval, but some users may want even finer control over offsets. The consumer therefore supports a commit API which gives you full control over offsets. The simplest and most reliable way to manually commit offsets is using a synchronous commit with ``commitSync()``. As its name suggests, this method blocks until the commit has completed successfully. Note that when you use the commit API directly, you should first disable auto-commit in the configuration by setting the ``enable.auto.commit`` property to ``false``. .. sourcecode:: java private void doCommitSync() { try { consumer.commitSync(); } catch (WakeupException e) { // we're shutting down, but finish the commit first and then // rethrow the exception so that the main loop can exit doCommitSync(); throw e; } catch (CommitFailedException e) { // the commit failed with an unrecoverable error. if there is any // internal state which depended on the commit, you can clean it // up here. otherwise it's reasonable to ignore the error and go on log.debug("Commit failed", e); } } public void run() { try { consumer.subscribe(topics); while (true) { ConsumerRecords records = consumer.poll(Long.MAX_VALUE); records.forEach(record -> process(record)); doCommitSync(); } } catch (WakeupException e) { // ignore, we're closing } catch (Exception e) { log.error("Unexpected error", e); } finally { consumer.close(); shutdownLatch.countDown(); } } In this example, we've added a try/catch block around the call to ``commitSync``. The ``CommitFailedException`` is thrown when the commit cannot be completed because the group has been rebalanced. This is the main thing we have to be careful of when using the Java client. Since all network IO (including heartbeating) and message processing is done in the foreground, it is possible for the session timeout to expire while a batch of messages is being processed. To handle this, you have two choices. First you can adjust the ``session.timeout.ms`` setting to ensure that the handler has enough time to finish processing messages. You can then tune ``max.partition.fetch.bytes`` to limit the amount of data returned in a single batch, though you will have to consider how many partitions are in the subscribed topics. The second option is to do message processing in a separate thread, but you will have to manage flow control to ensure that the threads can keep up. For example, just pushing messages into a blocking queue would probably not be sufficient unless the rate of processing can keep up with the rate of delivery (in which case you might not need a separate thread anway). It may even exacerbate the problem if the poll loop is stuck blocking on a call to ``offer()`` while the background thread is handling an even larger batch of messages. The Java API offers a ``pause()`` method to help in these situations, which we will cover in another example. For now, we recommend setting ``session.timeout.ms`` large enough that commit failures from rebalances are rare. As mentioned above, the only drawback to this is a longer delay before partitions can be re-assigned in the event of a hard failure (where the consumer cannot be cleanly shutdown with ``close()``). This should be rare in practice. Note also that we have to be a little careful in this example since the ``wakeup()`` might be triggered while the commit is pending. The recursive call is safe since the wakeup will only be triggered once. In C/C++ (librdkafka), we can get similar behavior with ``rd_kafka_commit``, which is used for both synchronous and asynchronous commits. The approach is slightly different, however, since ``rd_kafka_consumer_poll`` returns single messages instead of batches as the Java consumer does. .. literalinclude:: consumer-synchronous-commits-1.c :language: c In this example, we trigger a synchronous commit every 1000 messages. The second argument to ``rd_kafka_commit`` is the list of offsets to be committed; if set to ``NULL``, librdkafka will commit the latest offsets for the assigned positions. The third argument in ``rd_kafka_commit`` is a flag which controls whether this call is asynchronous. We could also trigger the commit on expiration of a timeout to ensure there the committed position is updated regularly. Since the Python client uses librdkafka internally, it uses a similar pattern by setting the ``async`` parameter to the ``Consumer.commit()`` method call. This method can also accept the mutually exclusive keyword parameters ``offsets`` to explicitly list the offsets for each assigned topic partition and ``message`` which will commit offsets relative to a ``Message`` object returned by ``poll()``. .. sourcecode:: python def consume_loop(consumer, topics): try: consumer.subscribe(topics) msg_count = 0 while running: msg = consumer.poll(timeout=1.0) if msg is None: continue if msg.error(): if msg.error().code() == KafkaError._PARTITION_EOF: # End of partition event sys.stderr.write('%% %s [%d] reached end at offset %d\n' % (msg.topic(), msg.partition(), msg.offset())) elif msg.error(): raise KafkaException(msg.error()) else: msg_process(msg) msg_count += 1 if msg_count % MIN_COMMIT_COUNT == 0: consumer.commit(async=False) finally: # Close down consumer to commit final offsets. consumer.close() The Go client also uses librdkafka internally, and so it uses a similar pattern but only provides a synchronous ``Commit()`` method call. Other variants of commit methods also accept a list of offsets to commit or a ``Message`` in order to commit offsets relative to a consumed message. When using manual offset commit, be sure to disable the ``enable.auto.commit`` configuration. .. sourcecode:: go msg_count := 0 for run == true { ev := consumer.Poll(0) switch e := ev.(type) { case *kafka.Message: msg_count += 1 if msg_count % MIN_COMMIT_COUNT == 0 { consumer.Commit() } fmt.Printf("%% Message on %s:\n%s\n", e.TopicPartition, string(e.Value)) case kafka.PartitionEOF: fmt.Printf("%% Reached %v\n", e) case kafka.Error: fmt.Fprintf(os.Stderr, "%% Error: %v\n", e) run = false default: fmt.Printf("Ignored %v\n", e) } } **Delivery Guarantees**: This is as good a time as any to talk about delivery semantics. Using auto-commit gives you "at least once" delivery: Kafka guarantees that no messages will be missed, but duplicates are possible. In the above example, we get the same since the commit follows the message processing. By changing the order, however, we can get "at most once" delivery. But we have to be a little careful with the commit failure, so we change ``doCommitSync`` to return whether or not the commit succeeded. There's also no longer any need to catch the ``WakeupException`` in the synchronous commit. .. sourcecode:: java private boolean doCommitSync() { try { consumer.commitSync(); return true; } catch (CommitFailedException e) { // the commit failed with an unrecoverable error. if there is any // internal state which depended on the commit, you can clean it // up here. otherwise it's reasonable to ignore the error and go on log.debug("Commit failed", e); return false; } } public void run() { try { consumer.subscribe(topics); while (true) { ConsumerRecords records = consumer.poll(Long.MAX_VALUE); if (doCommitSync()) records.forEach(record -> process(record)); } } catch (WakeupException e) { // ignore, we're closing } catch (Exception e) { log.error("Unexpected error", e); } finally { consumer.close(); shutdownLatch.countDown(); } } And in C/C++ (librdkafka): .. literalinclude:: consumer-synchronous-commits-2.c :language: c And in Python: .. sourcecode:: python def consume_loop(consumer, topics): try: consumer.subscribe(topics) while running: msg = consumer.poll(timeout=1.0) if msg is None: continue if msg.error(): if msg.error().code() == KafkaError._PARTITION_EOF: # End of partition event sys.stderr.write('%% %s [%d] reached end at offset %d\n' % (msg.topic(), msg.partition(), msg.offset())) elif msg.error(): raise KafkaException(msg.error()) else: consumer.commit(async=False) msg_process(msg) finally: # Close down consumer to commit final offsets. consumer.close() And in Go: .. sourcecode:: go for run == true { ev := consumer.Poll(0) switch e := ev.(type) { case *kafka.Message: err = consumer.CommitMessage(e) if err == nil { msg_process(e) } case kafka.PartitionEOF: fmt.Printf("%% Reached %v\n", e) case kafka.Error: fmt.Fprintf(os.Stderr, "%% Error: %v\n", e) run = false default: fmt.Printf("Ignored %v\n", e) } } For simplicity in this example, we've used ``rd_kafka_commit_message`` prior to processing the message. Committing on every message would produce a lot of overhead in practice. A better approach would be to collect a batch of messages, execute the synchronous commit, and then process the messages only if the commit succeeded. Correct offset management is crucial because it affects delivery semantics. As of version 0.9.0.0, the best Kafka can give you is at-least-once or at-most-once. If you are not careful, however, it might not give you either. Exactly-once delivery is under active investigation, but it is not currently possible without depending on another system (e.g. a transactional RDBMS). Asynchronous Commits ^^^^^^^^^^^^^^^^^^^^ Each call to the commit API results in an offset commit request being sent to the broker. Using the synchronous API, the consumer is blocked until that request returns successfully. This may reduce overall throughput since the consumer might otherwise be able to process records while that commit is pending. One way to deal with this is to increase the amount of data that is returned in each ``poll()``. The consumer has a configuration setting ``fetch.min.bytes`` which controls how much data is returned in each fetch. The broker will hold onto the fetch until enough data is available (or ``fetch.max.wait.ms`` expires). The tradeoff, however, is that this also increases the amount of duplicates that have to be dealt with in a worst-case failure. A second option is to use asynchronous commits. Instead of waiting for the request to complete, the consumer can send the request and return immediately. .. sourcecode:: java public void run() { try { consumer.subscribe(topics); while (true) { ConsumerRecords records = consumer.poll(Long.MAX_VALUE); records.forEach(record -> process(record)); consumer.commitAsync(); } } catch (WakeupException e) { // ignore, we're closing } catch (Exception e) { log.error("Unexpected error", e); } finally { consumer.close(); shutdownLatch.countDown(); } } And in C/C++ (librdkafka): .. literalinclude:: consumer-asynchronous-commits-1.c :language: c The only difference between this example and the previous one is that we have enabled asynchronous commit in the call to ``rd_kafka_commit``. The change in Python is very similar. The ``async`` parameter to ``commit()`` is changed to ``True``. Here we pass the value in explicitly, but asynchronous commits are the default if the parameter is not included. .. sourcecode:: python def consume_loop(consumer, topics): try: consumer.subscribe(topics) msg_count = 0 while running: msg = consumer.poll(timeout=1.0) if msg is None: continue if msg.error(): if msg.error().code() == KafkaError._PARTITION_EOF: # End of partition event sys.stderr.write('%% %s [%d] reached end at offset %d\n' % (msg.topic(), msg.partition(), msg.offset())) elif msg.error(): raise KafkaException(msg.error()) else: msg_process(msg) msg_count += 1 if msg_count % MIN_COMMIT_COUNT == 0: consumer.commit(async=True) finally: # Close down consumer to commit final offsets. consumer.close() In Go, simply execute the commit in a goroutine to commit asynchronously: .. sourcecode:: go msg_count := 0 for run == true { ev := consumer.Poll(0) switch e := ev.(type) { case *kafka.Message: msg_count += 1 if msg_count % MIN_COMMIT_COUNT == 0 { go func() { offsets, err := consumer.Commit() }() } fmt.Printf("%% Message on %s:\n%s\n", e.TopicPartition, string(e.Value)) case kafka.PartitionEOF: fmt.Printf("%% Reached %v\n", e) case kafka.Error: fmt.Fprintf(os.Stderr, "%% Error: %v\n", e) run = false default: fmt.Printf("Ignored %v\n", e) } } So if it helps performance, why not always use async commits? The main reason is that the consumer does not retry the request if the commit fails. This is something that ``commitSync`` gives you for free; it will retry indefinitely until the commit succeeds or an unrecoverable error is ecountered. The problem with asynchronous commits is dealing with commit ordering. By the time the consumer finds out that a commit has failed, we may already have processed the next batch of messages and even sent the next commit. In this case, a retry of the old commit could cause duplicate consumption. Instead of complicating the consumer internals to try and handle this problem in a sane way, the API gives you a callback which is invoked when the commit either succeeds or fails. If you like, you can use this callback to retry the commit, but you will have to deal with the same reordering problem. .. sourcecode:: java public void run() { try { consumer.subscribe(topics); while (true) { ConsumerRecords records = consumer.poll(Long.MAX_VALUE); records.forEach(record -> process(record)); consumer.commitAsync(new OffsetCommitCallback() { public void onComplete(Map offsets, Exception exception) { if (e != null) log.debug("Commit failed for offsets {}", offsets, e); } }); } } catch (WakeupException e) { // ignore, we're closing } catch (Exception e) { log.error("Unexpected error", e); } finally { consumer.close(); shutdownLatch.countDown(); } } A similar feature is available in C/C++ (librdkafka), but we have to configure it on initialization: .. literalinclude:: consumer-asynchronous-commits-2.c :language: c Similarly, in Python the commit callback can be any callable and can be passed as a configuration parameter to the consumer constructor. .. sourcecode:: python from confluent_kafka import Consumer def commit_completed(err, partitions): if err: print(str(err)) else: print("Committed partition offsets: " + str(partitions)) conf = {'bootstrap.servers': "host1:9092,host2:9092", 'group.id': "foo", 'default.topic.config': {'auto.offset.reset': 'smallest'}, 'on_commit': commit_completed} consumer = Consumer(conf) In Go, rebalance events are exposed as events returned by the ``Poll()`` method. To see these events you must create the consumer with the ``go.application.rebalance.enable`` configuration and handle ``AssignedPartitions`` and ``RevokedPartitions`` events by explicitly calling ``Assign()`` and ``Unassign()`` for ``AssignedPartitions`` and ``RevokedPartitions`` respectively: .. sourcecode:: go consumer, err := kafka.NewConsumer(&kafka.ConfigMap{ "bootstrap.servers": "host1:9092,host2:9092", "group.id": "foo", "go.application.rebalance.enable": true}) msg_count := 0 for run == true { ev := consumer.Poll(0) switch e := ev.(type) { case kafka.AssignedPartitions: fmt.Fprintf(os.Stderr, "%% %v\n", e) c.Assign(e.Partitions) case kafka.RevokedPartitions: fmt.Fprintf(os.Stderr, "%% %v\n", e) c.Unassign() case *kafka.Message: msg_count += 1 if msg_count % MIN_COMMIT_COUNT == 0 { consumer.Commit() } fmt.Printf("%% Message on %s:\n%s\n", e.TopicPartition, string(e.Value)) case kafka.PartitionEOF: fmt.Printf("%% Reached %v\n", e) case kafka.Error: fmt.Fprintf(os.Stderr, "%% Error: %v\n", e) run = false default: fmt.Printf("Ignored %v\n", e) } } Offset commit failures are merely annoying if the following commits succeed since they won't actually result in duplicate reads. However, if the last commit fails before a rebalance occurs or before the consumer is shutdown, then offsets will be reset to the last commit and you will likely see duplicates. A common pattern is therefore to combine async commits in the poll loop with sync commits on rebalances or shutdown. Committing on close is straightforward, but we need a way to hook into rebalances. For this, the ``subscribe()`` method introduced earlier has a variant which accepts a ``ConsumerRebalanceListener``, which has two methods to hook into rebalance behavior. In the example below, we incorporate synchronous commits on rebalances and on close. .. sourcecode:: java private void doCommitSync() { try { consumer.commitSync(); } catch (WakeupException e) { // we're shutting down, but finish the commit first and then // rethrow the exception so that the main loop can exit doCommitSync(); throw e; } catch (CommitFailedException e) { // the commit failed with an unrecoverable error. if there is any // internal state which depended on the commit, you can clean it // up here. otherwise it's reasonable to ignore the error and go on log.debug("Commit failed", e); } } public void run() { try { consumer.subscribe(topics, new ConsumerRebalanceListener() { @Override public void onPartitionsRevoked(Collection partitions) { doCommitSync(); } @Override public void onPartitionsAssigned(Collection partitions) {} }); while (true) { ConsumerRecords records = consumer.poll(Long.MAX_VALUE); records.forEach(record -> process(record)); consumer.commitAsync(); } } catch (WakeupException e) { // ignore, we're closing } catch (Exception e) { log.error("Unexpected error", e); } finally { try { doCommitSync(); } finally { consumer.close(); shutdownLatch.countDown(); } } } Each rebalance has two phases: partition revocation and partition assignment. The revocation method is always called before a rebalance and is our last chance to commit offsets before the partitions are re-asssigned. The assignment method is always called after the rebalance and can be used to set the initial position of the assigned partitions. In this case, we've used the revocation hook to commit the current offsets synchronously. In general, asynchronous commits should be considered less safe than synchronous commits. Consecutive commit failures before a crash will result in increased duplicate processing. You can mitigate this danger by adding logic to handle commit failures in the callback or by mixing calls to ``commitSync()`` occasionally, but we wouldn't recommend too much complexity unless testing shows it is necessary. If you need more reliability, synchronous commits are there for you, and you can still scale up by increasing the number of topic partitions and the number of consumers in the group. But if you just want to maximize throughput and you're willing to accept some increase in the number of duplicates, then asynchronous commits may be a good option. A somewhat obvious point, but one that's worth making is that asynchronous commits only make sense for "at least once" message delivery. To get "at most once," you need to know if the commit succeeded before consuming the message. This implies a synchronous commit unless you have the ability to "unread" a message after you find that the commit failed. Administration ~~~~~~~~~~~~~~ Since 0.9, the Kafka release includes an admin utility for viewing the status of consumer groups. List Groups ^^^^^^^^^^^ To get a list of the active groups in the cluster, you can use the ``kafka-consumer-groups`` utility included in the Kafka distribution. On a large cluster, this may take a little time since we need to collect the list by inspecting each broker in the cluster. .. sourcecode:: bash $ bin/kafka-consumer-groups --bootstrap-server host:9092 --new-consumer --list Describe Group ^^^^^^^^^^^^^^ The utility ``kafka-consumer-groups`` can also be used to collect information on a current group. For example, to see the current assignments for the ``foo`` group, use the following command: .. sourcecode:: bash $ bin/kafka-consumer-groups --bootstrap-server host:9092 --new-consumer --describe --group foo If you happen to invoke this while a rebalance is in progress, the command will report an error. Retry again and you should see the assignments for all the members in the current generation.