Share Consumers for Confluent Platform

Share consumers are an alternative to traditional consumers. Groups of these consumers, share groups, are an alternative to traditional consumer groups. Share consumers, through their configuration, inherently create share groups.

Traditional consumer groups use a distributed processing model where consumer members collectively read messages from a Apache Kafka® topic. In the traditional model, each topic partition is assigned to only one group member at a time. In contrast, share groups and their share consumer members use a concurrent processing model. Share group members simultaneously process different messages from the same partition, effectively treating it more like a message queue.

Important

Share groups are only available for the Kafka Java clients.

Key capabilities and differences from consumer groups

Share groups enable Kafka share consumers to cooperatively consume and process records from topics. This contrasts with traditional consumer groups where each partition is assigned to only one consumer within the group.

The fundamental differences between a share group and a consumer group are:

Cooperative consumption

Consumers within a share group cooperatively consume records, and partitions may be assigned to multiple consumers. This allows for increased concurrency beyond the number of topic partitions.

Scalability beyond partitions

The number of consumers in a share group can exceed the number of partitions in a topic. This is a significant advantage over traditional consumer groups where the maximum parallelism is limited by the number of partitions.

Individual record acknowledgement

Records are acknowledged individually, though the system is optimized for batch processing to improve efficiency.

Delivery attempt counting

Delivery attempts to consumers in a share group are counted, which enables automated handling of unprocessable records.

When a share consumer in a share group fetches records, it receives available records from any of the topic-partitions matching its subscriptions. Records are acquired for delivery to this share consumer with a time-limited acquisition lock. While a record is acquired, it is unavailable to other share consumers. The default lock duration is 30 seconds, controlled by the share.record.lock.duration.ms group configuration parameter. The lock is released automatically once its duration elapses, making the record available to another share consumer.

A share consumer holding the lock can handle the record in the following ways:

  • Acknowledge successful processing of the record.

  • Release the record, making it available for another delivery attempt.

  • Renew the lock, enabling long running processing in the share consumer.

  • Reject the record, indicating it’s unprocessable and preventing further delivery attempts for that record.

  • Do nothing, in which case the lock is automatically released when its duration expires.

The Kafka cluster limits the number of records acquired for consumers for each topic-partition within a share group. Once this limit is reached, fetching operations will temporarily yield no further records until the number of acquired records decreases (as locks naturally time out). This limit is controlled by the broker configuration property group.share.partition.max.record.locks. By limiting the duration of the acquisition lock and automatically releasing the locks, the broker ensures delivery progresses even in the presence of share consumer failures.

Configuration for client developers

To create a share consumer, create an instance of KafkaShareConsumer in a Java application. When developing a client application that utilizes share groups, there are share consumer-level configuration properties and group-level configuration properties you can define.

You define the following share consumer-level properties in your consumer code:

share.acknowledgement.mode
  • Description: Controls the acknowledgement mode for a share consumer.

  • Type: string

  • Default: implicit

  • Valid Values: [implicit, explicit]

  • Importance: medium

If your KafkaShareConsumer configuration sets share.acknowledgement.mode=explicit then, every record returned from poll must be explicitly acknowledged with KafkaShareConsumer.acknowledge.

If you do not set the share.acknowledgement.mode configuration or if you set share.acknowledgement.mode=implicit, then implicit mode is used, which will automatically acknowledge or negative acknowledge messages and commit offsets.

share.acquire.mode
  • Description: Controls how a share consumer obtains records.

  • Type: string

  • Default: batch_optimized

  • Valid Values: [batch_optimized, record_limit]

  • Importance: medium

The behavior of the max.poll.records client configuration will change depending on the selected acquire mode.

If set to record_limit, the number of records returned in each poll() will not exceed the value of max.poll.records.

If set to batch_optimized, the number of records returned in each poll() call may exceed max.poll.records to align with batch boundaries for optimization.

You can define group-level properties using Confluent Cloud, Confluent CLI, or the Confluent Cloud API.

You can also define these group-level configuration properties using the kafka-configs tool.

share.auto.offset.reset
  • Description: The strategy to initialize the share-partition start offset.

  • Type: string

  • Default: latest

  • Valid Values: [latest, earliest, by_duration:<duration>]

  • Importance: medium

share.heartbeat.interval.ms
  • Description: The heartbeat interval given to the members of a share group.

  • Type: int

  • Default: 5000 (5 seconds)

  • Valid Values: [1,…]

  • Importance: medium

If not specified, uses the broker’s group.share.heartbeat.interval.ms configuration. If specified, the minimum is limited by the broker’s group.share.min.heartbeat.interval.ms configuration, and the maximum is limited by the broker’s group.share.max.heartbeat.interval.ms configuration.

share.isolation.level
  • Description: Controls how to read records written transactionally.

  • Type: string

  • Default: read_uncommitted

  • Valid Values: [read_committed, read_uncommitted]

  • Importance: medium

If set to read_committed, the share group only delivers transactional records which were committed. If set to read_uncommitted, the share group returns all messages, even transactional messages which were aborted. Non-transactional records are returned unconditionally in either mode.

share.record.lock.duration.ms
  • Description: The record acquisition lock duration in milliseconds for share groups.

  • Type: int

  • Default: 30000 (30 seconds)

  • Valid Values: [1000,…]

  • Importance: medium

If not specified, uses the broker’s group.share.record.lock.duration.ms configuration. If specified, the minimum is limited by the broker’s group.share.min.record.lock.duration.ms configuration, and the maximum is limited by the broker’s group.share.max.record.lock.duration.ms configuration.

share.session.timeout.ms
  • Description: The timeout to detect client failures when using the share group protocol.

  • Type: int

  • Default: 45000 (45 seconds)

  • Valid Values: [1,…]

  • Importance: medium

If not specified, uses the broker’s group.share.session.timeout.ms configuration. If specified, the minimum is limited by the broker’s group.share.min.session.timeout.ms configuration, and the maximum is limited by the broker’s group.share.max.session.timeout.ms configuration.

share.delivery.count.limit
  • Description: Maximum delivery attempts before a record is skipped.

  • Type: int

  • Default: 5

  • Valid Values: [2, …]

  • Importance: medium

    Controls how many times the broker will attempt to deliver a record to share * Type: int * Default: 5 * Valid Values: [2, …] * Importance: medium

Controls how many times the broker will attempt to deliver a record to share consumers before skipping it. The value must fall within the bounds set by the broker’s group.share.min.delivery.count.limit and group.share.max.delivery.count.limit configurations.

Important

Production deployments should explicitly set this value based on workload characteristics rather than relying on the default. Defaults are permissive for experimentation but may not be suitable for production use.

share.partition.max.record.locks
  • Description: Maximum concurrent record locks per partition.

  • Type: int

  • Default: 2,000

  • Valid Values: [100, …]

  • Importance: medium

Limits the number of records that can be acquired (locked) simultaneously for consumers in this share group for each topic partition. Once this limit is reached, fetch operations will yield no further records until the number of acquired records decreases. The value must fall within the bounds set by the broker’s group.share.min.partition.max.record.locks and group.share.max.partition.max.record.locks configurations.

Important

Production deployments should explicitly set this value based on workload characteristics rather than relying on the default. Defaults are permissive for experimentation but may not be suitable for production use.

share.renew.acknowledge.enable
  • Description: Controls whether acknowledgment renewal is enabled.

  • Type: boolean

  • Default: true

  • Valid Values: [true, false]

  • Importance: medium

When set to false, attempts to renew acknowledgments will return an INVALID_RECORD_STATE error. Use this setting to prevent long-running lock renewals in share consumers.

Broker configuration for share groups

Administrators can configure broker-level settings that set bounds for per-group share group configurations. These are configured in the broker’s server.properties file or via dynamic configuration.

group.share.max.delivery.count.limit
  • Description: Upper bound for per-group delivery retry limit.

  • Type: int

  • Default: 10

  • Valid Values: [5, 25]

  • Importance: medium

Sets the maximum value that share.delivery.count.limit can be configured to for any share group on this broker.

group.share.min.delivery.count.limit
  • Description: Lower bound for per-group delivery retry limit.

  • Type: int

  • Default: 2

  • Valid Values: [2, 5]

  • Importance: medium

Sets the minimum value that share.delivery.count.limit can be configured to for any share group on this broker.

group.share.max.partition.max.record.locks
  • Description: Upper bound for per-group concurrent record locks.

  • Type: int

  • Default: 4,000

  • Valid Values: [2000, 10000]

  • Importance: medium

Sets the maximum value that share.partition.max.record.locks can be configured to for any share group on this broker.

group.share.min.partition.max.record.locks
  • Description: Lower bound for per-group concurrent record locks.

  • Type: int

  • Default: 100

  • Valid Values: [100, 2000]

  • Importance: medium

Sets the minimum value that share.partition.max.record.locks can be configured to for any share group on this broker.

Manage share groups

Administrators can use the kafka-share-groups.sh tool to list, describe, reset, and delete share groups. Only share groups without any active members can be deleted. For information on using this tool, see the kafka-share-groups tool usage details.

List all share groups:

bin/kafka-share-groups.sh --bootstrap-server localhost:9092 --list

View current start offset and lag of a share group:

bin/kafka-share-groups.sh --bootstrap-server localhost:9092 --describe --group my-share-group

This command produces output like the following:

GROUP           TOPIC           PARTITION  START-OFFSET  LAG
my-share-group  topic1          0          4             0

The start offset is the earliest offset for in-flight records being evaluated for delivery to share consumers. Some records after the start offset may already have completed delivery.

Note

The admin client needs DESCRIBE access to all the topics used in the group.

Describe active members in a share group:

bin/kafka-share-groups.sh --bootstrap-server localhost:9092 --describe --group my-share-group --members

Reset offsets of a share group to the latest offset:

bin/kafka-share-groups.sh --bootstrap-server localhost:9092 --reset-offsets --group my-share-group --topic topic1 --to-latest --execute

This command produces output like the following:

GROUP           TOPIC           PARTITION  NEW-OFFSET
my-share-group  topic1          0          10

You can also use --to-earliest or --to-datetime <String: datetime> (format: YYYY-MM-DDThh:mm:ss.sss) to reset to different offsets. Use --dry-run instead of --execute to preview which offsets would be reset.

Delete offsets of individual topics in a share group:

bin/kafka-share-groups.sh --bootstrap-server localhost:9092 --delete-offsets --group my-share-group --topic topic1

Delete one or more share groups:

bin/kafka-share-groups.sh --bootstrap-server localhost:9092 --delete --group my-share-group

Monitor share groups

Share partition lag

Share partition lag represents the number of records that have not yet been fully processed by consumers in a share group. Share partition lag is the primary metric used to automate the scaling of share consumer instances. Unlike standard consumer lag, share partition lag accounts for the non-contiguous state of records in these states:

  • AVAILABLE: Not yet acquired by any consumer

  • ACQUIRED: Currently assigned to a consumer with an active acquisition lock

  • ACKNOWLEDGED: Processing complete

  • ARCHIVED: Removed by compaction

Share partition lag is calculated as:

(highest offset) - (share-partition start offset) + 1 - (processed in-flight records)

To monitor lag effectively, you must distinguish between record states. Records in a Terminal state have been acknowledged or archived. These records no longer contribute to the lag calculation.

Users can retrieve share partition lag data via these APIs:

  • Admin API using Admin.listShareGroupOffsets(), which returns SharePartitionOffsetInfo objects containing startOffset(), leaderEpoch(), and lag() values.

  • JMX through kafka.server:type=tenant-metrics,group-protocol=share,partition={partitionid},group-id={sharegroupid},topic={topicname}, which exposes the lag value for the share partition.

Note

Share partition lag must be enabled for JMX monitoring to be available. Set the confluent.share.lag.calculator.enabled value to true. This property enables share lag monitoring, and is false by default.

Additionally, add org.apache.kafka.common.metrics.JmxReporter to the metric.reporters broker configuration to expose the metric in JMX.

Share group metrics

Metrics for share groups are emitted under the kafka.server:type=group-coordinator-metrics MBean. Relevant metrics include:

group-count,protocol={share}

Total number of groups per group type: Share.

share-acknowledgement

The total number of offsets acknowledged for share groups

record-acknowledgement

The number of records acknowledged per acknowledgement type.

partition-load-time

The time taken to load the share partitions.

These metrics provide visibility into the health and performance of share groups on the Kafka cluster.

Troubleshooting share groups

INVALID_RECORD_STATE errors

If share consumers receive INVALID_RECORD_STATE errors, check the share.renew.acknowledge.enable configuration for the share group. This error is returned when acknowledgment renewal is attempted but share.renew.acknowledge.enable=false.

To resolve, either:

  • Set share.renew.acknowledge.enable=true to allow acknowledgment renewal, or

  • Modify the share consumer application to avoid renewing acknowledgments

Production tuning recommendations

Share group defaults are permissive to help with experimentation and development. For production deployments:

  • Explicitly set share.delivery.count.limit based on your workload’s retry requirements rather than using the broker default.

  • Explicitly set share.partition.max.record.locks based on your expected concurrency and throughput requirements.

  • Monitor share partition lag metrics to validate that configuration values are appropriate for your workload.

Kafka 4.1 version introduced share consumers and share groups. This section is an overview of share groups for client developers and highlights relevant configurations for broker administrators.

Manage access

You can control who has access to share groups by setting group and topic permissions using RBAC in the Confluent CLI.

Managing share group access requires the following permissions:

Action

Required topic permissions

Required group permissions

ShareConsumer read

DeveloperRead

DeveloperRead

User read queue

DeveloperRead

DeveloperRead

Delete share group

None

DeveloperManage

Delete offsets

DeveloperRead

DeveloperManage

Reset offsets

DeveloperRead

DeveloperRead

View default settings

None

DeveloperManage

Alter default settings

None

ResourceOwner

Confluent CLI

You can use the Confluent CLI to grant permissions for share groups and topics.

Grant share group permissions

To grant READ permissions to a share group, use the confluent iam rbac role-binding create command:

confluent iam rbac role-binding create \
  --principal User:u-a03bcd \
  --role DeveloperRead \
  --kafka-cluster 0000000000000000000000 \
  --resource Group:my-share-group

The command returns the role binding details:

+----------------+----------------------------+
| Principal      | User:u-a03bcd              |
| Role           | DeveloperRead              |
| KafkaCluster   | 0000000000000000000000     |
| ResourceType   | Group                      |
| Name           | my-share-group             |
| PatternType    | LITERAL                    |
+----------------+----------------------------+

Grant topic permissions

To grant READ permissions to a topic, use the confluent iam rbac role-binding create command:

confluent iam rbac role-binding create \
  --principal User:u-a03bcd \
  --role DeveloperRead \
  --kafka-cluster 0000000000000000000000 \
  --resource Topic:my-topic

The command returns the role binding details:

+----------------+----------------------------+
| Principal      | User:u-a03bcd              |
| Role           | DeveloperRead              |
| KafkaCluster   | 0000000000000000000000     |
| ResourceType   | Topic                      |
| Name           | my-topic                   |
| PatternType    | LITERAL                    |
+----------------+----------------------------+

For details, see confluent iam rbac role-binding create.

Using the KafkaShareConsumer for Share Groups

Use the org.apache.kafka.clients.consumer.KafkaShareConsumer class to consume records from a Kafka cluster using share groups. Setting up a KafkaShareConsumer involves configuration and a subscription process similar to a regular KafkaConsumer, but with key differences in how messages are handled and acknowledged.

Configuration

The KafkaShareConsumer can be instantiated using various constructors that accept either a Map<String, Object> or Properties object for configuration, optionally including key and value deserializers.

Example:

Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("group.id", "my-share-group"); // All consumers in the same share group will have the same group.id
props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

KafkaShareConsumer<String, String> consumer = new KafkaShareConsumer<>(props);

Note

After creating a KafkaShareConsumer, you must call close() to prevent resource leaks.

Subscribing to Topics

Share consumers in a share group dynamically subscribe to topics using the subscribe(Collection<String> topics) method. Topic subscriptions are not incremental; a new list replaces any current assignment. Kafka delivers each message in the subscribed topics to one share consumer within the share group.

Example:

consumer.subscribe(Arrays.asList("my-topic"));

Polling for Records and Liveness

After subscribing, the share consumer joins the share group automatically when poll(Duration) is invoked. This method fetches data for the subscribed topics. To ensure consumer liveness and continued record reception, poll() must be called periodically. The share consumer sends periodic heartbeats to the broker, and if heartbeats are not sent within the share group’s session time-out, then the share consumer will be considered dead and its partitions will be reassigned.

Example:

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    // Process records
}

Record Delivery and Acknowledgement

When a share consumer in a share group fetches records using poll(), it receives available records from any matching topic-partitions. These records are acquired with a time-limited acquisition lock, making them unavailable to other share consumers during this period. The default lock duration is 30 seconds. The lock duration is controlled by the share.record.lock.duration.ms group configuration parameter. The lock is automatically released if its duration elapses, making the record available for another delivery attempt.

Share consumers can handle acquired records in these ways:

Acknowledge successful processing

Marks the record as successfully processed.

Release the record

Makes the record available for another delivery attempt.

Reject the record

Indicates the record is unprocessable and prevents further delivery attempts.

Renew the record

Requires the lock on the record for long-running processing.

Do nothing

The lock automatically releases when its duration expires.

The share consumer can choose to use implicit or explicit acknowledgement of the records it processes by using the share consumer share.acknowledgement.mode configuration property.

Implicit Acknowledgement

If the share consumer configuration property share.acknowledgement.mode is set to implicit or is not set at all, then the share consumer uses implicit acknowledgement. All delivered records are implicitly marked as successfully processed and acknowledged when commitSync(), commitAsync(), or poll() is called.

Example (Implicit with poll):

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); // Implicitly acknowledges previous batch
    for (ConsumerRecord<String, String> record : records) {
        System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
        // doProcessing(record);
    }
}

Example (implicit with commitSync):

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> record : records) {
        System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
        // doProcessing(record);
    }
    consumer.commitSync(); // Implicitly acknowledges all delivered records
}

For more information, see KafkaShareConsumer Javadocs.

Explicit Acknowledgement

If the share consumer configuration property share.acknowledgement.mode is set to explicit, then the share consumer uses explicit acknowledgement. The application must acknowledge all records returned from poll(Duration) using acknowledge(ConsumerRecord, AcknowledgeType) before its next call to poll(Duration). The AcknowledgeType argument indicates whether the record was processed successfully (ACCEPT), should be released for another attempt (RELEASE), permanently rejected (REJECT), or needs extended processing time (RENEW). Acknowledgements are committed when commitSync(), commitAsync(), or poll() is called.

The available acknowledgement types are:

ACCEPT

Marks the record as successfully processed.

RELEASE

Makes the record available for another delivery attempt.

REJECT

Indicates the record is unprocessable and prevents further delivery attempts.

RENEW

Extends the acquisition lock timeout for records that require additional processing time beyond the default lock duration. Prevents the record from being re-delivered while still being processed. Use acquisitionLockTimeoutMs() to determine when to renew the lock.

Example (Explicit acknowledgement with commitSync):

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> record : records) {
        try {
            // doProcessing(record);
            consumer.acknowledge(record, AcknowledgeType.ACCEPT);
        } catch (Exception e) {
            consumer.acknowledge(record, AcknowledgeType.REJECT);
        }
    }
    consumer.commitSync();
}

Example (Explicit acknowledgement with RENEW for long-running processing):

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> record : records) {
        try {
            // Check if processing exceeds acquisition lock timeout
            Optional<Integer> lockTimeout = consumer.acquisitionLockTimeoutMs();
            if (needsMoreTime(record) && lockTimeout.isPresent()) {
                // Extend the lock to prevent re-delivery during processing
                consumer.acknowledge(record, AcknowledgeType.RENEW);
                consumer.commitSync();
            }
            // doLongRunningProcessing(record);
            consumer.acknowledge(record, AcknowledgeType.ACCEPT);
        } catch (Exception e) {
            consumer.acknowledge(record, AcknowledgeType.REJECT);
        }
    }
    consumer.commitSync();
}

For more information, see KafkaShareConsumer Javadocs.

Multithreaded Processing

The KafkaShareConsumer is not thread-safe, and unsynchronized access will lead to ConcurrentModificationException. The only exception is wakeup(), which can safely be called from an external thread to interrupt an active operation (for example, poll()), causing a WakeupException to be thrown in the blocking thread. This mechanism is useful for safely shutting down the share consumer from another thread.

Example of safe shutdown using wakeup():

public class KafkaShareConsumerRunner implements Runnable {
    private final AtomicBoolean closed = new AtomicBoolean(false);
    private final KafkaShareConsumer consumer;

    public KafkaShareConsumerRunner(KafkaShareConsumer consumer) {
        this.consumer = consumer;
    }

    @Override
    public void run() {
        try {
            consumer.subscribe(Arrays.asList("topic"));
            while (!closed.get()) {
                ConsumerRecords records = consumer.poll(Duration.ofMillis(10000));
                // Handle new records
            }
        } catch (WakeupException e) {
            // Ignore exception if closing
            if (!closed.get()) throw e;
        } finally {
            consumer.close();
        }
    }

    public void shutdown() {
        closed.set(true);
        consumer.wakeup();
    }
}

Note

While thread interrupts are possible, wakeup() is the discouraged method for aborting blocking operations for a clean consumer shutdown.

Transactional Records and Isolation Level

The handling of transactional records in share groups is governed by the share.isolation.level configuration property, which applies to the entire share group.

read_uncommitted (Default)

The share group consumes all non-transactional and transactional records, bounded by the high-water mark.

read_committed

The share group consumes only non-transactional records and committed transactional records. This level is bounded by the last stable offset, meaning an open transaction can block the share group’s progress.