Kafka Consumer for Confluent Cloud

An Apache Kafka® consumer is a client application that subscribes to, reads from, and processes events related to a topic. This section provides an overview of the Kafka consumer and an introduction to the configuration settings for tuning.

Ready to get started?

A Quick Consumer Review

The Kafka consumer works by issuing “fetch” requests to the brokers leading the partitions it wants to consume. The Kafka consumer fetches messages from broker partitions using offset-based log access. You can control the consumer’s position (offset) for flexible message processing, including rewinding to reprocess data.

Each consumer request specifies an offset in the log. The consumer receives back a chunk of log that contains all the messages in that topic beginning from the offset position. The consumer has significant control over this position and can rewind it to re-consume data if desired.

Consumer groups

A consumer group is a set of consumers that cooperate to consume topic data. You subscribe or assign a consumer to a group by setting its group.id property using the subscribe or commit methods provided by the KafkaConsumer API. If you don’t set group.id, these methods throw an exception when called.

When a consumer is assigned to a group, the topic partitions are distributed among the consumers in that group. New members can join, and existing members can leave a group. When this happens, Kafka reassigns the topic partitions to the consumers within the group, ensuring that each member receives a proportional share. This process of topic partition distribution is known as rebalancing.

In a Kafka cluster, one of the brokers is designated as the group coordinator. This specific broker takes on the responsibility of managing a particular consumer group’s membership, tracking the heartbeats of the consumers within that group, and triggering rebalances when needed.

The broker that becomes the group coordinator is selected based on the consumer group’s ID. The group’s ID is hashed to a specific partition of Kafka’s internal offsets topic, __consumer_offsets. The broker which is the leader for that particular partition is then assigned as the coordinator for that consumer group. This mechanism ensures that the duties of group coordination are spread out evenly across the various brokers in the Kafka cluster.

Groups and rebalance protocols

Recall that rebalancing is the process of assigning (or reassigning) Kafka topic partitions to the members of a consumer group. Kafka now supports two rebalance protocols that determine how rebalancing happens within a consumer group. These protocols are:

  • classic, which was the only protocol prior to Kafka version 4.0
  • consumer, a newer protocol which has General Availability (GA) in the Kafka 4.0 version.

Overview of the rebalance protocols

Confluent Cloud supports both the classic and the consumer rebalance protocol. So, your consumer clients can use either protocol, however, new consumer rebalance protocol is preferred.

The new consumer rebalance protocol improves consumer group scalability by removing the group-wide synchronization barrier, making rebalancing truly incremental. Removing this barrier also enhances stability by preserving existing partition assignments as much as possible during rebalances. The new consumer rebalance protocol also reduces rebalance times and simplifies consumers. By offloading or simplifying consumer-side responsibilities, the entire rebalance is streamlined and less prone to delays caused by complex client-side operations.

Regardless of the rebalance protocol in use, the coordinator begins the group formation process by validating and finalizing the partition assignments. The coordinator computes assignments based on the topic subscriptions and assignment preferences communicated by the members (in the new consumer protocol) or by the group leader (in the classic protocol).

When a consumer starts up, it finds its group’s coordinator and sends a requests to join the group. The coordinator then begins a rebalance. Every rebalance, regardless of which rebalance protocol the group uses, results in a new generation of the group.

Each member in a group must send heartbeats to the coordinator to remain a group member. If the coordinator does not receive a heartbeat from a member before expiration of the configured session timeout, the coordinator kicks the member out of the group and triggers a rebalance to reassign its partitions to other members.

The following table shows the differences between the classic and consumer rebalance protocols:

Group Behavior Classic protocol Consumer protocol
use case Good for architectures that need to continue support for the classic protocol. Ideal for dynamic groups (cloud functions or scaling). Consumer rebalance protocol is enabled for Confluent Cloud.
group leader One member performs assignment on behalf of the group. Not used. No leader election occurs.
coordinator role Assignments computed by the group leader (a consumer) and applied by the broker coordinator. Broker coordinator computes assignments but does not receive one global plan.
rebalance type Eager or Cooperative: assignor dependent. In some circumstances, all consumers may pause and revoke all partitions. Incremental reassignment with minimal disruption.
join group flow Triggered on join/leave; reassigns all partitions. Incremental assignment; consumer joins a group by sending a heartbeat request; fewer disruptions.
assignment Handled by one group leader among the consumers. Broker coordinator receives consumer subscriptions, validates them, computes, and then orchestrates assignment.

The next section explains how groups come into being and how Kafka determines the group’s rebalance protocol.

How a group’s rebalance protocol is determined

A Kafka consumer group is implicitly created when the first consumer with a specific group.id starts and attempts to subscribe to a topic. There is no explicit “create group” command or administrative action required.

The first consumer to join a group determines the rebalance protocol for that group. If this initial consumer has the configuration group.protocol=consumer set, the group will use the consumer rebalance protocol. Otherwise, the group defaults to the classic rebalance protocol.

You can, for example, change a group’s rebalance protocol by shutting down all its consumers and bringing them back up. The following is a step-by-step explanation of how a consumer group’s protocol can change from classic to the consumer rebalance protocol:

  1. Initially, in this scenario, the consumer group is operating with the classic protocol. The broker’s group coordinator stores metadata about this group, including the protocol it’s currently using (classic).

  2. You shut down all the consumer instances belonging to this group. At this point, the group becomes empty from the broker’s perspective. There are no active members sending heartbeat requests.

    Even though the group is empty, the broker retains the metadata associated with that group, including the last known protocol it was using (classic in this scenario). The group isn’t entirely “forgotten.”

  3. The first consumer instance you bring back up is configured with group.protocol=consumer. This consumer request to join a group to the coordinator. This request explicitly specifies that it wants to use the consumer protocol.

  4. Because the group is currently empty, the broker accepts this new member and updates the stored metadata for that consumer group. The broker now records that this group should use the consumer rebalance protocol.

As other consumer instances (also configured with group.protocol=consumer) are brought back online and send join using the same group.id, the broker will see that the group is now configured to use the consumer protocol and will allow them to join using that protocol.

A consumer using the classic protocol that attempts to join won’t be rejected unless it uses an incompatible protocol, for example, Connect. Typically, the join fails with an InconsistentGroupProtocolException exception.

Upgrading or switching consumer protocols

There a few different ways to upgrade or switch consumer groups between the classic and consumer protocols. If you want to migrate or upgrade your clients to the new consumer rebalance protocol, your choice is between a rolling upgrade or an empty group restart. When choosing between the two approaches, take into consideration these aspects of your consumers:

  • Client version compatibility. Check and ensure your consumer libraries support the Kafka 4.0 version and the group.protocol configuration. For the consumer rebalance protocol to be effective, all consumers in the group should use it.
  • Broker group.version. The broker’s group.version feature flag must be set to a version that supports the desired protocol. By default, Confluent Cloud uses the group.version=1 to support the new consumer rebalance protocol as well as the older, classic protocol.

Rolling upgrades are the preferred method for production environments to minimize downtime. If your environment can tolerate some downtime, you can do an empty group restart.

You can also switch back to the classic rebalance protocol from the consumer rebalance protocol. In this case, you would do a rolling upgrade, but switching from group.protocol=consumer back to the default either by removing the setting or setting it explicitly group.protocol=classic. If you choose this path, make sure you have a good understanding of the potential feature loss and employ careful coordination among your team.

Note

If a consumer attempts to join a group operating with an incompatible protocol, the consumer receives an InconsistentGroupProtocolException.

Before migrating to the consumer rebalance protocol

Check your consumers and adjust it as necessary before migrating or upgrading to the new consumer rebalance. A consumer will use the new “consumer” rebalance protocol under the following conditions:

  • The consumer uses a client version that supports the consumer protocol.
  • The consumer configuration explicitly sets the group.protocol=consumer property.

Ensure that classic legacy configurations are removed from your consumer properties. Several configurations and two APIs from the classic rebalance protocol are no longer applicable in the new, consumer rebalance protocol:

  • The heartbeat.interval.ms property on the consumer side is replaced by the server-side group.consumer.heartbeat.interval.ms in the consumer rebalance protocol.
  • The session.timeout.ms property on the consumer side is replaced by the server-side group.consumer.session.timeout.ms in the consumer rebalance protocol.
  • The partition.assignment.strategy property on classic consumer is replaced in the new consumer rebalance protocol by the server-side group.consumer.assignors on the broker and the group.remote.assignor on the consumer.
  • The enforceRebalance(String) and enforceRebalance() APIs are no longer supported with consumers using thew new rebalance protocol.

If your consumer uses any legacy properties or methods, they are either ignored or result in errors if you use them with a consumer where group.protocol=consumer. For more information on the server-side configuration, see

Finally, the new consumer rebalance protocol also introduces two new subscribe(SubscriptionPattern) and subscribe(SubscriptionPattern, ConsumerRebalanceListener) methods that allow consumers to subscribe to a regular expression. With these methods, the regular expression uses the RE2J format and is now evaluated on the server side.

Tip

For information on using the command line to manage your consumer groups, see the Kafka consumer group tool section later in this page.

How to do a rolling deployment

When upgrading or switching a consumer group to use the group.protocol=consumer configuration via a rolling deployment, a group can temporarily exist in a transitional state where some consumers are using the consumer rebalance protocol and others the classic protocol. While Kafka attempts to interoperate between these protocols during this phase, it’s crucial to understand that this is not a recommended long-term operating model and can lead to potential issues and limitations such as:

  • The broker needs to maintain compatibility with both protocols, which can introduce overhead and prevent the group from fully leveraging the optimizations of the new protocol.
  • Managing a group with mixed protocols can increase the complexity of the rebalance process and potentially lead to less predictable behavior, especially in edge cases or during rapid membership changes.
  • The differences in how the protocols handle assignment could lead to temporary imbalances in partition distribution among consumers using different protocols.
  • Monitoring and troubleshooting the rebalance process and overall group health when different consumers are operating under different protocol rules.

Follow this procedure to do a rolling upgrade:

  1. Ensure that all consumers in the group are using a client version that supports the consumer rebalance protocol.
  2. Remove any classic configurations from the consumer properties.
  3. Deploy new versions of your consumer applications configured with group.protocol=consumer one at a time.
  4. Restart each consumer instance after the configuration change.
  5. As the first consumer protocol consumer joins an existing classic group (with a compatible assignor), the group’s metadata on the broker is updated.
  6. Continue rolling out the new configuration to all consumers in the group.
  7. Once all consumers are upgraded, the group will fully use the new protocol.

How to do an empty group restart

This method is simpler than a rolling restart but involves more downtime.

  1. Shut down all consumer instances in the group, making the group empty.
  2. Ensure that all consumers in the group are using a client version that supports the consumer rebalance protocol.
  3. Remove any classic configurations from the consumer properties.
  4. Bring all consumer instances back online with the group.protocol=consumer configuration.
  5. The first joining consumer with the new protocol will set the group’s protocol for subsequent members.

Offset management

After the consumer receives its assignment from the coordinator, it must determine the initial position for each assigned partition. When the group is first created, before any messages are consumed, the position is set according to a configurable offset reset policy (auto.offset.reset). Typically, consumption starts either at the earliest offset or the latest offset.

Committing offsets and reset policy

As a consumer reads messages from its assigned partitions, it must commit the offsets of the messages it has read. If a consumer crashes or shuts down, its partitions are reassigned to another group member. That member resumes reading from the last committed offset in each partition. If the consumer crashes before committing any offsets, then the next consumer will start from the position defined by the auto.offset.reset policy

Auto-commit offsets

The offset commit policy is crucial to providing the message delivery guarantees needed by your application. By default, the consumer is configured to use an automatic commit policy, which triggers a commit on a periodic interval. The consumer also supports a commit API which can be used for manual offset management. Correct offset management is crucial because it affects delivery semantics.

By default, the consumer is configured to auto-commit offsets. The auto.commit.offset.interval property sets the upper time bound of the commit interval. Using auto-commit offsets can give you “at-least-once” delivery, but you must consume all data returned from a ConsumerRecords<K, V> poll(Duration timeout) call before any subsequent poll calls, or before closing the consumer.

To explain further; when auto-commit is enabled, every time the poll method is called and data is fetched, the consumer is ready to automatically commit the offsets of messages that have been returned by the poll. If the processing of these messages is not completed before the next auto-commit interval, there’s a risk of losing the message’s progress if the consumer crashes or is otherwise restarted. In this case, when the consumer restarts, it will begin consuming from the last committed offset. When this happens, the last committed position can be as old as the auto-commit interval. Any messages that have arrived since the last commit are read again.

Manual commit API

If you want to reduce the window for duplicates, you can reduce the auto-commit interval, but some users may want even finer control over offsets. The consumer therefore supports a commit API which gives you full control over offsets. Note that when you use the commit API directly, you should first disable auto-commit in the configuration by setting the enable.auto.commit property to false.

Each call to the commit API results in an offset commit request being sent to the broker. Using the synchronous API, the consumer is blocked until that request returns successfully. This may reduce overall throughput since the consumer might otherwise be able to process records while that commit is pending.

Tip

To improve throughput, you can increase the amount of data returned when polling. Set the fetch.min.bytes configuration to a higher value. The broker will wait until that much data is available (or until fetch.max.wait.ms expires) before responding.

Be aware that this can increase the number of duplicate messages you may need to handle in failure scenarios.

Asynchronous commits

The commit API supports both synchronous and asynchronous modes. A synchronous commit blocks until the broker confirms the offset. A second option is to use asynchronous commits. Instead of waiting for the request to complete, the consumer can send the request and return immediately by using asynchronous commits.

If it helps performance, why not always use asynchronous commits? The main reason is that the consumer does not retry the request if the commit fails. This is something that committing synchronously gives you for free; it retries until the timeout provided by the user, or until the API timeout from configs, depending on which commitSync function was called. The problem with asynchronous commits is dealing with commit ordering. By the time the consumer finds out that a commit has failed, you may already have processed the next batch of messages and even sent the next commit. In this case, a retry of the old commit could cause duplicate consumption.

Instead of complicating the consumer internals to try and handle this problem in a sane way, the API gives you a callback which is invoked when the commit either succeeds or fails. If you like, you can use this callback to retry the commit, but you will have to deal with the same reordering problem.

Dealing with commit failures and rebalances

Offset commit failures are merely annoying if the following commits succeed since they won’t actually result in duplicate reads. However, if the last commit fails before a rebalance occurs or before the consumer is shut down, then offsets are reset to the last commit and you may see duplicates. A common pattern is to combine asynchronous commits in the poll loop with sync commits on rebalances or shut down. Committing on close is straightforward, but you need a way to hook into rebalances.

Each rebalance has two phases: partition revocation and partition assignment. The revocation method is always called before a rebalance and is the last chance to commit offsets before the partitions are re-assigned. The assignment method is always called after the rebalance and is used to set the initial position of the assigned partitions. In this case, the revocation hook is used to commit the current offsets synchronously.

Sync vs async: safety and performance tradeoffs

In general, you should consider asynchronous commits less safe than synchronous commits. Consecutive commit failures before a crash can result in increased duplicate processing. You can mitigate this danger by adding logic to handle commit failures in the callback or by mixing occasional synchronous commits. However, don’t add too much complexity unless testing shows it is necessary. In consumer rebalance mode, where rebalances may be partial and incremental, fewer frequent synchronous commits may be needed, especially if most assignments are preserved across rebalances.

If you need more reliability, synchronous commits are there for you, and you can still scale up by increasing the number of topic partitions and the number of consumers in the group. But if you just want to maximize throughput and you’re willing to accept some increase in the number of duplicates, then asynchronous commits may be a good option.

A somewhat obvious point, but one that’s worth making is that asynchronous commits only make sense for “at least once” message delivery. To get “at most once,” you need to know if the commit succeeded before consuming the message. This implies a synchronous commit unless you have the ability to “unread” a message after you find that the commit failed.

In the examples, we show several detailed examples of the commit API and discuss the tradeoffs in terms of performance and reliability.

Coordinating offset commits with external systems

When writing to an external system, the consumer’s position must be coordinated with what is stored as output. That is why the consumer stores its offset in the same place as its output. For example, a connector populates data in HDFS along with the offsets of the data it reads so that it is guaranteed that either data and offsets are both updated, or neither is.

A similar pattern is followed for many other data systems that require these stronger semantics, and for which the messages do not have a primary key to allow for deduplication.

Exactly-once processing and transactions

Kafka provides exactly-once processing through the transactional APIs available in Kafka Streams and the core producer/consumer client. These guarantees ensure that messages are neither lost nor duplicated during processing or transfer between topics.

Kafka Streams achieves this by committing both offsets and output results as part of a single atomic transaction. Similarly, a transactional producer or consumer can be used to coordinate offset commits and message writes to support exactly-once semantics.

By default, Kafka guarantees at-least-once delivery. You can implement at-most-once delivery by disabling retries on the producer and committing offsets in the consumer prior to processing a batch of messages.

Tip

Consumers can fetch/consume from out-of-sync follower replicas if using a fetch-from-follower configuration. To learn more, see Multi-Region Clusters.

Kafka consumer configuration

The consumer configuration settings depend on the rebalance protocol you want to use for groups, consumer or classic rebalancing. This section discusses only the key configuration settings in common to both and also those specific to each rebalance protocol. The full list of consumer configuration settings is available in Kafka Consumer Configurations.

Note

By default, Confluent Cloud enables the consumer rebalance protocol group.version=1. However, consumers can use either the classic or consumer rebalance protocol as Confluent Cloud remains backward compatible with the classic rebalance protocol.

Core configuration properties

The following are the core properties supported for all consumers using either the consumer rebalance mode or the classic rebalance mode.

Property Description
bootstrap.servers You are required to set this property so that the consumer can find the Kafka cluster.
client.id Optional, but you should set this property to easily correlate requests on the broker with the client instance which made it. Typically, all consumers within the same group will share the same client ID in order to enforce client quotas.
group.id Optional but you should always configure a group ID unless you are using the simple assignment API and you don’t need to store offsets in Kafka.
max.poll.interval.ms This property specifies the maximum time allowed between calls to the consumer’s poll method (Consume method in .NET) before the consumer process is assumed to have failed. The default is 300 seconds and can be safely increased if your application requires more time to process messages. If you are using the Java consumer, you can also adjust max.poll.records to tune the number of records that are handled on every loop iteration.

Offset management involves two main settings: whether auto-commit is enabled and the offset reset policy.

Property Description
enable.auto.commit This setting enables auto-commit (the default), which means the consumer automatically commit offsets periodically at the interval set by auto.commit.interval.ms. The default interval is 5 seconds.
auto.offset.reset Defines the behavior of the consumer when there is no committed position (which occurs when the group is first initialized) or when an offset is out of range. You can choose either to reset the position to the earliest offset or the latest offset (the default). You can also select none if you would rather set the initial offset yourself and you are willing to handle out of range errors manually.

Consumer rebalance protocol configuration

The following properties apply to consumer using the consumer rebalance protocol (KIP-848) introduced in Kafka version 4.0. Your client must support the 4.0 version to use this consumer rebalance protocol. Consumers must configure the following to use the new consumer rebalance protocol:

Property Description
group.protocol Required. Set this property to consumer to ensure the consumer rebalance protocol is used. Otherwise, the classic rebalance protocol is assumed by default.
group.remote.assignor Optional. The server-side assignor to use. If no assignor is specified, the group coordinator will pick one.

When the group.protocol is set to consumer, the following configurations and APIs from the classic consumer group management fail:

  • heartbeat.interval.ms
  • session.timeout.ms
  • partition.assignment.strategy
  • enforceRebalance(String) and enforceRebalance()

Configure a consumer for classic rebalance protocol

The following properties apply to consumer groups.

Property Description
group.protocol Optional. By default, the group.protocol=classic setting is assumed; so you need not set this property at all.
session.timeout.ms Control the session timeout by overriding this value. The default is 10 seconds in the C/C++ and Java clients, but you can increase the time to avoid excessive rebalancing, for example due to poor network connectivity or long GC pauses. The main drawback to using a larger session timeout is that it will take longer for the coordinator to detect when a consumer instance has crashed, which means it will also take longer for another consumer in the group to take over its partitions. For normal shutdowns, however, the consumer sends an explicit request to the coordinator to leave the group which triggers an immediate rebalance.
heartbeat.interval.ms This controls how often the consumer will send heartbeats to the coordinator. It is also the way that the consumer detects when a rebalance is needed, so a lower heartbeat interval will generally mean faster rebalancing. The default setting is three seconds. For larger groups, it may be wise to increase this setting.
partition.assignment.strategy Sets the partition assignment strategy for a consumer, meaning how partition ownership is distributed between consumer instances when group management is used. All consumers in the same consumer group must have the same partition strategy.

Configure partition assignment

Understanding how Apache Kafka® consumers are assigned partitions is crucial for building scalable and reliable applications. This section delves into the mechanisms behind partition assignment, contrasting the approach taken by the new consumer rebalance protocol with the behavior of the classic protocol.

For the new consumer rebalance protocol

Under the new consumer rebalance protocol (introduced by KIP-848), partition assignment is primarily governed by the configured partition assignment strategy set on the broker. These strategies called assignors act as a blueprint, instructing the group coordinator on how to distribute the available partitions of subscribed topics among the consumer group members.

For the consumer rebalance protocol, Confluent Cloud supports two assignors:

org.apache.kafka.coordinator.group.assignor.UniformAssignor

Distributes topic partitions among group members for a balanced and potentially rack-aware assignment. The assignor employs two different strategies based on the nature of topic subscriptions across the group members.

  • A uniform homogeneous assignment builder: This strategy is used when all members have subscribed to the same set of topics.
  • A uniform Heterogeneous Assignment builder: This strategy is used when members have varied topic subscriptions.

This is the default assignor on Confluent Cloud brokers. For details on this assignor, see the UniformAssignor.java source.

org.apache.kafka.coordinator.group.assignor.RangeAssignor

Assigns contiguous partition ranges to members of a consumer group such that:

  • Each subscribed member receives at least one partition from that topic.
  • Each member receives the same partition number from every subscribed topic when co-partitioning is possible.

For details on this assignor, see the RangeAssignor.java source.

In addition to the assignor, member subscriptions play a crucial role in how the coordinator assigns partitions. Each group member submits its subscription as a list of topic names or a regular expression to the group coordinator. This submission, whether a list of topic names or a regular expression, indicates the topics that the member wants to consume.

This assignment process is governed by the group.remote.assignor consumer configuration property. The property specifies the name of the assignor to use UniformAssignor or RangeAssignor. If not specified, the group coordinator uses the default assignor, for Confluent Cloud this is the org.apache.kafka.coordinator.group.assignor.UniformAssignor.

Note

You may find a discussion of the server-side assignors for consumers helps in understanding assignment with the new consumer rebalance protocol, see the Server Side Assignors for Consumer Groups : KIP-848 presentation. However, please be aware this talk is not specific to Confluent Cloud and the following are not available in Confluent Cloud:

  • the rack awareness feature
  • the broker-side group.consumer.assignors is not supported in Confluent Cloud
  • you cannot use custom assignment strategies in Confluent Cloud

For the classic rebalance protocol

partition.assignment.strategy sets the partition assignment strategy for a consumer, meaning how partition ownership is distributed between consumer instances when group management is used. All consumers in the same consumer group must have the same partition strategy.

partition.assignment.strategy accepts a comma-separated list of fully qualified class names that implement the PartitionAssignor interface. The list enables you to update the strategy for a group, while temporarily keeping the old one for consumers that have not transitioned to the new strategy yet.

When you configure Kafka consumers, the choice of assignment strategy is important and depends on the specific requirements for partition balancing, consumer group stability, and rebalance behavior. In most cases, the default (range assignment) works well, but for specific use cases, changing the assignment strategy can significantly impact performance and reliability.

Available options are:

  • Range Assignment (Default)
    • How it Works: The org.apache.kafka.clients.consumer.RangeAssignor works by evenly distributing partitions of each topic across the consumers in a consumer group. It sorts both the partitions and consumers. Partitions are assigned to consumers in chunks (ranges), aiming for an even distribution.
    • Advantages: It works well when partition count is higher than consumer count, providing a simple and efficient means of partition distribution.
    • Disadvantages: Can result in uneven load distribution if the number of partitions is not a multiple of the number of consumers.
  • Round Robin Assignment
    • How it Works: The org.apache.kafka.clients.consumer.RoundRobinAssignor distributes partitions across all consumers one by one in a round-robin fashion. It ensures a more even distribution of partitions across consumers, regardless of the number of partitions.
    • Advantages: Leads to a more balanced partition allocation across consumers, useful when handling a varying number of partitions or when partitions have significantly different sizes.
    • Disadvantages: May lead to more rebalances compared to Range Assignment in some scenarios.
  • Sticky Assignor
    • How it Works: The org.apache.kafka.clients.consumer.StickyAssignor aims to maintain a stable partition assignment while still balancing the partitions across consumers. It tries to keep the previously assigned partitions to a consumer as unchanged as possible if a rebalance occurs.
    • Advantages: It minimizes the number of partition reassignments across rebalances, reducing the potential for missed messages or duplicated processing.
    • Disadvantages: While it offers stability, it might not always result in the most balanced partition distribution if the cluster or consumer group changes frequently.
  • Cooperative Sticky Assignor
    • How it Works: An evolution of the StickyAssignor, the org.apache.kafka.clients.consumer.CooperativeStickyAssignor enables more incremental rebalancing, which can reduce the latency and resources required during the rebalance process.
    • Advantages: It supports more granular changes to the consumer group memberships or to the partitions themselves, making rebalances less impactful.
    • Disadvantages: Not all consumers or versions of Kafka support this assignor; requires careful management to ensure compatibility across the consumer group.

Message handling

While the Java consumer does all IO and processing in the foreground thread, librdkafka-based clients (C/C++, Python, Go and C#) use a background thread. The main consequence of this is that polling is totally safe when used from multiple threads. You can use this to parallelize message handling in multiple threads. From a high level, poll is taking messages off of a queue which is filled in the background.

Another consequence of using a background thread is that all heartbeats and rebalancing are executed in the background. The benefit of this is that you don’t need to worry about message handling causing the consumer to “miss” a rebalance. The drawback, however, is that the background thread will continue heart beating even if your message processor dies. If this happens, then the consumer will continue to hold on to its partitions and the read lag will continue to build until the process is shut down.

Although the clients have taken different approaches internally, they are not as far apart as they seem. To provide the same abstraction in the Java client, you could place a queue in between the poll loop and the message processors. The poll loop would fill the queue and the processors would pull messages off of it.

Kafka consumer group tool

Kafka includes the kafka-consumer-groups command-line utility to view and manage consumer groups, which is also provided with Confluent Platform. Find the tool in the bin folder under your installation directory.

You can also use the Confluent CLI to complete some of these tasks. For more information, see the Confluent CLI reference.

List consumer groups

You can get a list of the active groups in the cluster using the kafka-consumer-groups tool. On a large cluster, this may take a while since it collects the list by inspecting each broker in the cluster.

bin/kafka-consumer-groups --bootstrap-server host:9092 --list

The output is a list of all consumer groups for the cluster, including consumers for internal use. The output might resemble:

_confluent-controlcenter-7-6-0-1
ConfluentTelemetryReporterSampler--4418883999569981189
test-1234
_confluent-controlcenter-7-6-0-lastProduceTimeConsumer
_confluent-controlcenter-7-6-0-1-command

To use the Confluent CLI for this task, see confluent kafka consumer group list.

Describe groups

The kafka-consumer-groups tool can also be used to collect information on a current group. For example, to see the current assignments for the test-1234 group, you could use the following command:

bin/kafka-consumer-groups --bootstrap-server host:9092 --describe --group test-1234

The output from this command lists the clients, topics, partitions and more for that group and resembles:

GROUP      TOPIC        PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID        HOST            CLIENT-ID
test-1234  test-metrics 5          258420          258519          -               test-client-1234    /127.0.0.1     test-client
test-1234  test-metrics 10         257002          257097          -               test-client-1234    /127.0.0.1     test-client
test-1234  test-metrics 4          259580          259660          -               test-client-1234    /127.0.0.1     test-client
test-1234  test-metrics 7          254004          254131          -               test-client-1234    /127.0.0.1     test-client

You can use the --members --verbose flags with the command to see if groups have upgraded from classic to the new rebalance protocol:

$ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group --members --verbose
GROUP           CONSUMER-ID             HOST            CLIENT-ID           #PARTITIONS     CURRENT-EPOCH   CURRENT-ASSIGNMENT          TARGET-EPOCH    TARGET-ASSIGNMENT           UPGRADED
my-group        T4tbGKsvT7CtsxVBH5J2QQ  /127.0.0.1      consumer-member     3               3               my_topic:0,1;new_topic:0    3               my_topic:0,1;new_topic:0    true
my-group        6mfoIHq4n3BT7n1HCdqihb  /127.0.0.1      classic-member      1               2               -                           3               new_topic:1                 false

The UPGRADED column is showed when a group is in migration. A true value in this column means a consumer member upgraded to use the new consumer rebalance protocol. A false value means the member is using the classic protocol.

If you happen to invoke this while a rebalance is in progress, the command will report an error. Retry again and you should see the assignments for all the members in the current generation.

To use the Confluent CLI for this task, see confluent kafka consumer group describe.

Reset offsets

You can also use this tool to reset the consumer offset in scenarios where a consumer is stalled or significantly lagging. Make sure you deactivate the consumer before you reset its offset. You have many options for changing the offset. For example, you can reset offsets by shifting forward or backward with shift-by or reset them to the beginning with --to-earliest. For all the options, see the kafka-consumer-group tool usage details.

To reset the offsets back by 20 positions, use the following command:

bin/kafka-consumer-groups.sh --bootstrap-server host:9092 --group test-1234 --reset-offsets --shift-by -20 --topic test-metrics -execute --group test-1234

The output will contain the group, topic, and partitions, and the new offset for each:

GROUP             TOPIC                          PARTITION  NEW-OFFSET
test-1234         test-metrics                   5          258400
test-1234         test-metrics                   10         256082
test-1234         test-metrics                   4          259560
test-1234         test-metrics                   7          254984

Consumer examples

Confluent provides several resources to help you get started with Kafka consumers.