Share Consumers for Confluent Cloud

Share consumers are an alternative to traditional consumers. Groups of these consumers, share groups, are an alternative to traditional consumer groups. Share consumers, through their configuration, inherently create share groups.

Traditional consumer groups use a distributed processing model where consumer members collectively read messages from a Apache Kafka® topic. In the traditional model, each topic partition is assigned to only one group member at a time. In contrast, share groups and their share consumer members use a concurrent processing model. Share group members simultaneously process different messages from the same partition, effectively treating it more like a message queue.

Important

Share groups are available on only Dedicated and Enterprise clusters. Share groups are available to only Java clients.

Key capabilities and differences from consumer groups

Share groups enable Kafka share consumers to cooperatively consume and process records from topics. This contrasts with traditional consumer groups where each partition is assigned to only one consumer within the group.

The fundamental differences between a share group and a consumer group are:

Cooperative consumption

Consumers within a share group cooperatively consume records, and partitions may be assigned to multiple consumers. This allows for increased concurrency beyond the number of topic partitions.

Scalability beyond partitions

The number of consumers in a share group can exceed the number of partitions in a topic. This is a significant advantage over traditional consumer groups where the maximum parallelism is limited by the number of partitions.

Individual record acknowledgement

Records are acknowledged individually, though the system is optimized for batch processing to improve efficiency.

Delivery attempt counting

Delivery attempts to consumers in a share group are counted, which enables automated handling of unprocessable records.

When a share consumer in a share group fetches records, it receives available records from any of the topic-partitions matching its subscriptions. Records are acquired for delivery to this share consumer with a time-limited acquisition lock. While a record is acquired, it is unavailable to other share consumers. The default lock duration is 30 seconds, controlled by the share.record.lock.duration.ms group configuration parameter. The lock is released automatically once its duration elapses, making the record available to another share consumer.

A share consumer holding the lock can handle the record in the following ways:

  • Acknowledge successful processing of the record.

  • Release the record, making it available for another delivery attempt.

  • Renew the lock, enabling long running processing in the share consumer.

  • Reject the record, indicating it’s unprocessable and preventing further delivery attempts for that record.

  • Do nothing, in which case the lock is automatically released when its duration expires.

Configuration for client developers

To create a share consumer, create an instance of KafkaShareConsumer in a Java application. When developing a client application that utilizes share groups, there are share consumer-level configuration properties and group-level configuration properties you can define.

You define the following share consumer-level properties in your consumer code:

share.acknowledgement.mode
  • Description: Controls the acknowledgement mode for a share consumer.

  • Type: string

  • Default: implicit

  • Valid Values: [implicit, explicit]

  • Importance: medium

If your KafkaShareConsumer configuration sets share.acknowledgement.mode=explicit then, every record returned from poll must be explicitly acknowledged with KafkaShareConsumer.acknowledge.

If you do not set the share.acknowledgement.mode configuration or if you set share.acknowledgement.mode=implicit, then implicit mode is used, which will automatically acknowledge or negative acknoweldge messages and commit offsets.

share.acquire.mode
  • Description: Controls how a share consumer obtains records.

  • Type: string

  • Default: batch_optimized

  • Valid Values: [batch_optimized, record_limit]

  • Importance: medium

The behavior of the max.poll.records client configuration will change depending on the selected acquire mode.

If set to record_limit, the number of records returned in each poll() will not exceed the value of max.poll.records.

If set to batch_optimized, the number of records returned in each poll() call may exceed max.poll.records to align with batch boundaries for optimization.

You can define group-level properties using Confluent Cloud, Confluent CLI, or the Confluent Cloud API.

You can also define these group-level configuration properties using the kafka-configs tool.

share.auto.offset.reset
  • Description: The strategy to initialize the share-partition start offset.

  • Type: string

  • Default: latest

  • Valid Values: [latest, earliest, by_duration:<duration>]

  • Importance: medium

share.heartbeat.interval.ms
  • Description: The heartbeat interval given to the members of a share group.

  • Type: int

  • Default: 5000 (5 seconds)

  • Valid Values: [1,…]

  • Importance: medium

If not specified, uses the broker’s group.share.heartbeat.interval.ms configuration. If specified, the minimum is limited by the broker’s group.share.min.heartbeat.interval.ms configuration, and the maximum is limited by the broker’s group.share.max.heartbeat.interval.ms configuration.

share.isolation.level
  • Description: Controls how to read records written transactionally.

  • Type: string

  • Default: read_uncommitted

  • Valid Values: [read_committed, read_uncommitted]

  • Importance: medium

If set to read_committed, the share group only delivers transactional records which were committed. If set to read_uncommitted, the share group returns all messages, even transactional messages which were aborted. Non-transactional records are returned unconditionally in either mode.

share.record.lock.duration.ms
  • Description: The record acquisition lock duration in milliseconds for share groups.

  • Type: int

  • Default: 30000 (30 seconds)

  • Valid Values: [1000,…]

  • Importance: medium

If not specified, uses the broker’s group.share.record.lock.duration.ms configuration. If specified, the minimum is limited by the broker’s group.share.min.record.lock.duration.ms configuration, and the maximum is limited by the broker’s group.share.max.record.lock.duration.ms configuration.

share.session.timeout.ms
  • Description: The timeout to detect client failures when using the share group protocol.

  • Type: int

  • Default: 45000 (45 seconds)

  • Valid Values: [1,…]

  • Importance: medium

If not specified, uses the broker’s group.share.session.timeout.ms configuration. If specified, the minimum is limited by the broker’s group.share.min.session.timeout.ms configuration, and the maximum is limited by the broker’s group.share.max.session.timeout.ms configuration.

Manage share groups

Administrator can manage share groups using Confluent Cloud, Confluent CLI, or the Confluent Cloud API.

Administrators can use the kafka-share-groups.sh tool to list, describe, or delete share groups. Only share groups without any active members can be deleted. For information on using this tool, see the kafka-share-groups tool usage details.

List all share groups:

bin/kafka-share-groups.sh --bootstrap-server localhost:9092 --list

View current start offset of a share group:

bin/kafka-share-groups.sh --bootstrap-server localhost:9092 --describe --group my-share-group

Describe active members in a share group:

bin/kafka-share-groups.sh --bootstrap-server localhost:9092 --describe --group my-share-group --members

Delete offsets of individual topics in a share group:

bin/kafka-share-groups.sh --bootstrap-server localhost:9092 --delete-offsets --group my-share-group --topic topic1

Delete one or more share groups:

bin/kafka-share-groups.sh --bootstrap-server localhost:9092 --delete --group my-share-group

Monitor share groups

Share partition lag

Share partition lag represents the number of records that have not yet been fully processed by consumers in a share group. Share partition lag is the primary metric used to automate the scaling of share consumer instances. Unlike standard consumer lag, share partition lag accounts for the non-contiguous state of records in these states:

  • AVAILABLE: Not yet acquired by any consumer

  • ACQUIRED: Currently assigned to a consumer with an active acquisition lock

  • ACKNOWLEDGED: Processing complete

  • ARCHIVED: Removed by compaction

Share partition lag is calculated as:

(highest offset) - (share-partition start offset) + 1 - (processed in-flight records)

To monitor lag effectively, you must distinguish between record states. Records in a Terminal state have been acknowledged or archived. These records no longer contribute to the lag calculation.

Users can retrieve share partition lag data via these APIs:

  • Admin API using Admin.listShareGroupOffsets(), which returns SharePartitionOffsetInfo objects containing startOffset(), leaderEpoch(), and lag() values.

  • Confluent Metrics API using the io.confluent.kafka.server/share_lag_offsets metric

Manage access

You can control who has access to share groups by setting group and topic permissions using RBAC in Cloud Console or the Confluent CLI.

Managing share group access requires the following permissions:

Action

Required topic permissions

Required group permissions

Create a share group

DeveloperRead

DeveloperRead

User read queue

DeveloperRead or DeveloperWrite

DeveloperRead

Delete group

DeveloperRead or DeveloperWrite

DeveloperRead or DeveloperManage

Delete offsets

DeveloperRead or DeveloperWrite

DeveloperRead or DeveloperManage

Reset offsets

DeveloperRead

ResourceOwner

View default settings

None

DeveloperRead

Alter default settings

None

ResourceOwner

Cloud Console

You can manage who has access to each share group with group-level and topic-level permissions via Accounts in Confluent Community.

To use share groups, you must have at least READ access.

To delete the share group, you must have the DeveloperManager role. To delete share group, ensure the group is empty, with no subscribed consumers.

Grant share group permissions

In Cloud Console, explicitly grant READ permissions to individual share groups.

  1. In Cloud Console, navigate to Accounts and access > Access.

  2. Select the share group to which you want to grant access.

  3. Select the accounts to which you want to grant access.

  4. Select the role with at least READ permissions for the selected share group.

  5. Save the permissions.

Grant topic permissions

Use the Topics RBAC UI screens to explicitly grant permissions to individual topics.

  1. Select the topic to which you want to grant access.

  2. Select the accounts to which you want to grant access.

  3. Select the role with READ permissions for the selected topic.

  4. Save the permissions.

For more about managing permissions, see Role-based Access Control (RBAC) on Confluent Cloud.

Confluent CLI

You can also use the Confluent CLI to grant permissions for share groups and topics.

Grant share group permissions

To grant READ permissions to a share group, use the confluent iam rbac role-binding create command:

confluent iam rbac role-binding create \
  --principal User:u-a03bcd \
  --role DeveloperRead \
  --environment env-a12b34 \
  --cloud-cluster lkc-xyxmz \
  --kafka-cluster-id lkc-xyxmz \
  --resource Group:my-share-group

The command returns the role binding details:

+----------------+-------------------+
| Principal      | User:u-a03bcd     |
| Role           | DeveloperRead     |
| Environment    | env-a12b34        |
| CloudCluster   | lkc-xyxmz         |
| ResourceType   | Group             |
| Name           | my-share-group    |
| PatternType    | LITERAL           |
+----------------+-------------------+

Grant topic permissions

To grant READ permissions to a topic, use the confluent iam rbac role-binding create command:

confluent iam rbac role-binding create \
  --principal User:u-a03bcd \
  --role DeveloperRead \
  --environment env-a12b34 \
  --cloud-cluster lkc-xyxmz \
  --kafka-cluster-id lkc-xyxmz \
  --resource Topic:my-topic

The command returns the role binding details:

+----------------+-------------------+
| Principal      | User:u-a03bcd     |
| Role           | DeveloperRead     |
| Environment    | env-a12b34        |
| CloudCluster   | lkc-xyxmz         |
| ResourceType   | Topic             |
| Name           | my-topic          |
| PatternType    | LITERAL           |
+----------------+-------------------+

For details, see confluent iam rbac role describe.

Using the KafkaShareConsumer for Share Groups

Use the org.apache.kafka.clients.consumer.KafkaShareConsumer class to consume records from a Kafka cluster using share groups. Setting up a KafkaShareConsumer involves configuration and a subscription process similar to a regular KafkaConsumer, but with key differences in how messages are handled and acknowledged.

Configuration

The KafkaShareConsumer can be instantiated using various constructors that accept either a Map<String, Object> or Properties object for configuration, optionally including key and value deserializers.

Example:

Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("group.id", "my-share-group"); // All consumers in the same share group will have the same group.id
props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

KafkaShareConsumer<String, String> consumer = new KafkaShareConsumer<>(props);

Note

After creating a KafkaShareConsumer, you must call close() to prevent resource leaks.

Subscribing to Topics

Share consumers in a share group dynamically subscribe to topics using the subscribe(Collection<String> topics) method. Topic subscriptions are not incremental; a new list replaces any current assignment. Kafka delivers each message in the subscribed topics to one share consumer within the share group.

Example:

consumer.subscribe(Arrays.asList("my-topic"));

Polling for Records and Liveness

After subscribing, the share consumer joins the share group automatically when poll(Duration) is invoked. This method fetches data for the subscribed topics. To ensure consumer liveness and continued record reception, poll() must be called periodically. The share consumer sends periodic heartbeats to the broker, and if heartbeats are not sent within the share group’s session time-out, then the share consumer will be considered dead and its partitions will be reassigned.

Example:

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    // Process records
}

Record Delivery and Acknowledgement

When a share consumer in a share group fetches records using poll(), it receives available records from any matching topic-partitions. These records are acquired with a time-limited acquisition lock, making them unavailable to other share consumers during this period. The default lock duration is 30 seconds. The lock duration is controlled by the share.record.lock.duration.ms group configuration parameter. The lock is automatically released if its duration elapses, making the record available for another delivery attempt.

Share consumers can handle acquired records in these ways:

Acknowledge successful processing

Marks the record as successfully processed.

Release the record

Makes the record available for another delivery attempt.

Reject the record

Indicates the record is unprocessable and prevents further delivery attempts.

Renew the record

Requires the lock on the record for long-running processing.

Do nothing

The lock automatically releases when its duration expires.

The share consumer can choose to use implicit or explicit acknowledgement of the records it processes by using the share consumer share.acknowledgement.mode configuration property.

Implicit Acknowledgement

If the share consumer configuration property share.acknowledgement.mode is set to implicit or is not set at all, then the share consumer uses implicit acknowledgement. All delivered records are implicitly marked as successfully processed and acknowledged when commitSync(), commitAsync(), or poll() is called.

Example (Implicit with poll):

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); // Implicitly acknowledges previous batch
    for (ConsumerRecord<String, String> record : records) {
        System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
        // doProcessing(record);
    }
}

Example (implicit with commitSync):

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> record : records) {
        System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
        // doProcessing(record);
    }
    consumer.commitSync(); // Implicitly acknowledges all delivered records
}

For more information, see KafkaShareConsumer Javadocs.

Explicit Acknowledgement

If the share consumer configuration property share.acknowledgement.mode is set to explicit, then the share consumer uses explicit acknowledgement. The application must acknowledge all records returned from poll(Duration) using acknowledge(ConsumerRecord, AcknowledgeType) before its next call to poll(Duration). The AcknowledgeType argument indicates whether the record was processed successfully (ACCEPT), should be released for another attempt (RELEASE), permanently rejected (REJECT), or needs extended processing time (RENEW). Acknowledgements are committed when commitSync(), commitAsync(), or poll() is called.

The available acknowledgement types are:

ACCEPT

Marks the record as successfully processed.

RELEASE

Makes the record available for another delivery attempt.

REJECT

Indicates the record is unprocessable and prevents further delivery attempts.

RENEW

Extends the acquisition lock timeout for records that require additional processing time beyond the default lock duration. Prevents the record from being re-delivered while still being processed. Use acquisitionLockTimeoutMs() to determine when to renew the lock.

Example (Explicit acknowledgement with commitSync):

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> record : records) {
        try {
            // doProcessing(record);
            consumer.acknowledge(record, AcknowledgeType.ACCEPT);
        } catch (Exception e) {
            consumer.acknowledge(record, AcknowledgeType.REJECT);
        }
    }
    consumer.commitSync();
}

Example (Explicit acknowledgement with RENEW for long-running processing):

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> record : records) {
        try {
            // Check if processing exceeds acquisition lock timeout
            Optional<Integer> lockTimeout = consumer.acquisitionLockTimeoutMs();
            if (needsMoreTime(record) && lockTimeout.isPresent()) {
                // Extend the lock to prevent re-delivery during processing
                consumer.acknowledge(record, AcknowledgeType.RENEW);
                consumer.commitSync();
            }
            // doLongRunningProcessing(record);
            consumer.acknowledge(record, AcknowledgeType.ACCEPT);
        } catch (Exception e) {
            consumer.acknowledge(record, AcknowledgeType.REJECT);
        }
    }
    consumer.commitSync();
}

For more information, see KafkaShareConsumer Javadocs.

Multithreaded Processing

The KafkaShareConsumer is not thread-safe, and unsynchronized access will lead to ConcurrentModificationException. The only exception is wakeup(), which can safely be called from an external thread to interrupt an active operation (for example, poll()), causing a WakeupException to be thrown in the blocking thread. This mechanism is useful for safely shutting down the share consumer from another thread.

Example of safe shutdown using wakeup():

public class KafkaShareConsumerRunner implements Runnable {
    private final AtomicBoolean closed = new AtomicBoolean(false);
    private final KafkaShareConsumer consumer;

    public KafkaShareConsumerRunner(KafkaShareConsumer consumer) {
        this.consumer = consumer;
    }

    @Override
    public void run() {
        try {
            consumer.subscribe(Arrays.asList("topic"));
            while (!closed.get()) {
                ConsumerRecords records = consumer.poll(Duration.ofMillis(10000));
                // Handle new records
            }
        } catch (WakeupException e) {
            // Ignore exception if closing
            if (!closed.get()) throw e;
        } finally {
            consumer.close();
        }
    }

    public void shutdown() {
        closed.set(true);
        consumer.wakeup();
    }
}

Note

While thread interrupts are possible, wakeup() is the discouraged method for aborting blocking operations for a clean consumer shutdown.

Transactional Records and Isolation Level

The handling of transactional records in share groups is governed by the share.isolation.level configuration property, which applies to the entire share group.

read_uncommitted (Default)

The share group consumes all non-transactional and transactional records, bounded by the high-water mark.

read_committed

The share group consumes only non-transactional records and committed transactional records. This level is bounded by the last stable offset, meaning an open transaction can block the share group’s progress.

Limits

The following table lists the limits for share groups and share consumers.

Limit Name

Description

Limit

Overridable

Share Group Delivery Count

The maximum number of delivery attempts in a share group for an individual message

5

No

Share Group Lock Duration

The acquisition lock duration in milliseconds in a share group for an individual message in milliseconds

30000

Yes
Use the Share Group Lock Duration setting: share.record.lock.duration.ms
Min 1000 and max 300000 (5 minutes)

Share Partition Number of Acquisition Locks

The maximum number of messages that can be in an acquired state per topic:partition
Increase the number of partitions in the share group’s subscribed topics when you need more messages in an acquired state.

2000

No

Share Group Number of Share Consumers

The maximum number of share consumers that can be members of a single share group.

200

No

Share Group Heartbeat Interval

The heartbeat interval given to the members in milliseconds

5000

Yes
Use the Heartbeat interval setting: share.heartbeat.interval.ms
Min 5000 and max 15000

Share Group Session Timeout

The timeout to detect client failures using the share group protocol

45000

Yes
Use the Session timeout setting: share.session.timeout.ms
Min 45000 and max 60000