Share Consumers for Confluent Cloud
Share consumers are an alternative to traditional consumers. Groups of these consumers, share groups, are an alternative to traditional consumer groups. Share consumers, through their configuration, inherently create share groups.
Traditional consumer groups use a distributed processing model where consumer members collectively read messages from a Apache Kafka® topic. In the traditional model, each topic partition is assigned to only one group member at a time. In contrast, share groups and their share consumer members use a concurrent processing model. Share group members simultaneously process different messages from the same partition, effectively treating it more like a message queue.
Important
Share groups are available on only Dedicated and Enterprise clusters. Share groups are available to only Java clients.
Key capabilities and differences from consumer groups
Share groups enable Kafka share consumers to cooperatively consume and process records from topics. This contrasts with traditional consumer groups where each partition is assigned to only one consumer within the group.
The fundamental differences between a share group and a consumer group are:
- Cooperative consumption
Consumers within a share group cooperatively consume records, and partitions may be assigned to multiple consumers. This allows for increased concurrency beyond the number of topic partitions.
- Scalability beyond partitions
The number of consumers in a share group can exceed the number of partitions in a topic. This is a significant advantage over traditional consumer groups where the maximum parallelism is limited by the number of partitions.
- Individual record acknowledgement
Records are acknowledged individually, though the system is optimized for batch processing to improve efficiency.
- Delivery attempt counting
Delivery attempts to consumers in a share group are counted, which enables automated handling of unprocessable records.
When a share consumer in a share group fetches records, it receives available records from any of the topic-partitions matching its subscriptions. Records are acquired for delivery to this share consumer with a time-limited acquisition lock. While a record is acquired, it is unavailable to other share consumers. The default lock duration is 30 seconds, controlled by the share.record.lock.duration.ms group configuration parameter. The lock is released automatically once its duration elapses, making the record available to another share consumer.
A share consumer holding the lock can handle the record in the following ways:
Acknowledge successful processing of the record.
Release the record, making it available for another delivery attempt.
Renew the lock, enabling long running processing in the share consumer.
Reject the record, indicating it’s unprocessable and preventing further delivery attempts for that record.
Do nothing, in which case the lock is automatically released when its duration expires.
Configuration for client developers
To create a share consumer, create an instance of KafkaShareConsumer in a Java application. When developing a client application that utilizes share groups, there are share consumer-level configuration properties and group-level configuration properties you can define.
You define the following share consumer-level properties in your consumer code:
share.acknowledgement.modeDescription: Controls the acknowledgement mode for a share consumer.
Type: string
Default:
implicitValid Values: [
implicit,explicit]Importance: medium
If your
KafkaShareConsumerconfiguration setsshare.acknowledgement.mode=explicitthen, every record returned from poll must be explicitly acknowledged withKafkaShareConsumer.acknowledge.If you do not set the
share.acknowledgement.modeconfiguration or if you setshare.acknowledgement.mode=implicit, thenimplicitmode is used, which will automatically acknowledge or negative acknoweldge messages and commit offsets.share.acquire.modeDescription: Controls how a share consumer obtains records.
Type: string
Default:
batch_optimizedValid Values: [
batch_optimized,record_limit]Importance: medium
The behavior of the
max.poll.recordsclient configuration will change depending on the selected acquire mode.If set to
record_limit, the number of records returned in eachpoll()will not exceed the value ofmax.poll.records.If set to
batch_optimized, the number of records returned in eachpoll()call may exceedmax.poll.recordsto align with batch boundaries for optimization.
You can define group-level properties using Confluent Cloud, Confluent CLI, or the Confluent Cloud API.
You can also define these group-level configuration properties using the kafka-configs tool.
share.auto.offset.reset
Description: The strategy to initialize the share-partition start offset.
Type: string
Default:
latestValid Values: [
latest,earliest,by_duration:<duration>]Importance: medium
share.heartbeat.interval.ms
Description: The heartbeat interval given to the members of a share group.
Type: int
Default:
5000(5 seconds)Valid Values: [1,…]
Importance: medium
If not specified, uses the broker’s
group.share.heartbeat.interval.msconfiguration. If specified, the minimum is limited by the broker’sgroup.share.min.heartbeat.interval.msconfiguration, and the maximum is limited by the broker’sgroup.share.max.heartbeat.interval.msconfiguration.share.isolation.level
Description: Controls how to read records written transactionally.
Type: string
Default:
read_uncommittedValid Values: [
read_committed,read_uncommitted]Importance: medium
If set to
read_committed, the share group only delivers transactional records which were committed. If set toread_uncommitted, the share group returns all messages, even transactional messages which were aborted. Non-transactional records are returned unconditionally in either mode.share.record.lock.duration.ms
Description: The record acquisition lock duration in milliseconds for share groups.
Type: int
Default:
30000(30 seconds)Valid Values: [1000,…]
Importance: medium
If not specified, uses the broker’s
group.share.record.lock.duration.msconfiguration. If specified, the minimum is limited by the broker’sgroup.share.min.record.lock.duration.msconfiguration, and the maximum is limited by the broker’sgroup.share.max.record.lock.duration.msconfiguration.share.session.timeout.ms
Description: The timeout to detect client failures when using the share group protocol.
Type: int
Default:
45000(45 seconds)Valid Values: [1,…]
Importance: medium
If not specified, uses the broker’s
group.share.session.timeout.msconfiguration. If specified, the minimum is limited by the broker’sgroup.share.min.session.timeout.msconfiguration, and the maximum is limited by the broker’sgroup.share.max.session.timeout.msconfiguration.
Manage share groups
Administrator can manage share groups using Confluent Cloud, Confluent CLI, or the Confluent Cloud API.
Administrators can use the kafka-share-groups.sh tool to list, describe, or delete share groups. Only share groups without any active members can be deleted. For information on using this tool, see the kafka-share-groups tool usage details.
List all share groups:
bin/kafka-share-groups.sh --bootstrap-server localhost:9092 --list
View current start offset of a share group:
bin/kafka-share-groups.sh --bootstrap-server localhost:9092 --describe --group my-share-group
Describe active members in a share group:
bin/kafka-share-groups.sh --bootstrap-server localhost:9092 --describe --group my-share-group --members
Delete offsets of individual topics in a share group:
bin/kafka-share-groups.sh --bootstrap-server localhost:9092 --delete-offsets --group my-share-group --topic topic1
Delete one or more share groups:
bin/kafka-share-groups.sh --bootstrap-server localhost:9092 --delete --group my-share-group
Monitor share groups
Share partition lag
Share partition lag represents the number of records that have not yet been fully processed by consumers in a share group. Share partition lag is the primary metric used to automate the scaling of share consumer instances. Unlike standard consumer lag, share partition lag accounts for the non-contiguous state of records in these states:
AVAILABLE: Not yet acquired by any consumer
ACQUIRED: Currently assigned to a consumer with an active acquisition lock
ACKNOWLEDGED: Processing complete
ARCHIVED: Removed by compaction
Share partition lag is calculated as:
(highest offset) - (share-partition start offset) + 1 - (processed in-flight records)
To monitor lag effectively, you must distinguish between record states. Records in a Terminal state have been acknowledged or archived. These records no longer contribute to the lag calculation.
Users can retrieve share partition lag data via these APIs:
Admin API using
Admin.listShareGroupOffsets(), which returnsSharePartitionOffsetInfoobjects containingstartOffset(),leaderEpoch(), andlag()values.Confluent Metrics API using the
io.confluent.kafka.server/share_lag_offsetsmetric
Manage access
You can control who has access to share groups by setting group and topic permissions using RBAC in Cloud Console or the Confluent CLI.
Managing share group access requires the following permissions:
Action | Required topic permissions | Required group permissions |
|---|---|---|
Create a share group | DeveloperRead | DeveloperRead |
User read queue | DeveloperRead or DeveloperWrite | DeveloperRead |
Delete group | DeveloperRead or DeveloperWrite | DeveloperRead or DeveloperManage |
Delete offsets | DeveloperRead or DeveloperWrite | DeveloperRead or DeveloperManage |
Reset offsets | DeveloperRead | ResourceOwner |
View default settings | None | DeveloperRead |
Alter default settings | None | ResourceOwner |
Cloud Console
You can manage who has access to each share group with group-level and topic-level permissions via Accounts in Confluent Community.
To use share groups, you must have at least READ access.
To delete the share group, you must have the DeveloperManager role. To delete share group, ensure the group is empty, with no subscribed consumers.
Grant share group permissions
In Cloud Console, explicitly grant READ permissions to individual share groups.
In Cloud Console, navigate to Accounts and access > Access.
Select the share group to which you want to grant access.
Select the accounts to which you want to grant access.
Select the role with at least READ permissions for the selected share group.
Save the permissions.
Grant topic permissions
Use the Topics RBAC UI screens to explicitly grant permissions to individual topics.
Select the topic to which you want to grant access.
Select the accounts to which you want to grant access.
Select the role with READ permissions for the selected topic.
Save the permissions.
For more about managing permissions, see Role-based Access Control (RBAC) on Confluent Cloud.
Confluent CLI
You can also use the Confluent CLI to grant permissions for share groups and topics.
Grant share group permissions
To grant READ permissions to a share group, use the confluent iam rbac role-binding create command:
confluent iam rbac role-binding create \
--principal User:u-a03bcd \
--role DeveloperRead \
--environment env-a12b34 \
--cloud-cluster lkc-xyxmz \
--kafka-cluster-id lkc-xyxmz \
--resource Group:my-share-group
The command returns the role binding details:
+----------------+-------------------+
| Principal | User:u-a03bcd |
| Role | DeveloperRead |
| Environment | env-a12b34 |
| CloudCluster | lkc-xyxmz |
| ResourceType | Group |
| Name | my-share-group |
| PatternType | LITERAL |
+----------------+-------------------+
Grant topic permissions
To grant READ permissions to a topic, use the confluent iam rbac role-binding create command:
confluent iam rbac role-binding create \
--principal User:u-a03bcd \
--role DeveloperRead \
--environment env-a12b34 \
--cloud-cluster lkc-xyxmz \
--kafka-cluster-id lkc-xyxmz \
--resource Topic:my-topic
The command returns the role binding details:
+----------------+-------------------+
| Principal | User:u-a03bcd |
| Role | DeveloperRead |
| Environment | env-a12b34 |
| CloudCluster | lkc-xyxmz |
| ResourceType | Topic |
| Name | my-topic |
| PatternType | LITERAL |
+----------------+-------------------+
For details, see confluent iam rbac role describe.
Using the KafkaShareConsumer for Share Groups
Use the org.apache.kafka.clients.consumer.KafkaShareConsumer class to consume records from a Kafka cluster using share groups. Setting up a KafkaShareConsumer involves configuration and a subscription process similar to a regular KafkaConsumer, but with key differences in how messages are handled and acknowledged.
Configuration
The KafkaShareConsumer can be instantiated using various constructors that accept either a Map<String, Object> or Properties object for configuration, optionally including key and value deserializers.
Example:
Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("group.id", "my-share-group"); // All consumers in the same share group will have the same group.id
props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaShareConsumer<String, String> consumer = new KafkaShareConsumer<>(props);
Note
After creating a KafkaShareConsumer, you must call close() to prevent resource leaks.
Subscribing to Topics
Share consumers in a share group dynamically subscribe to topics using the subscribe(Collection<String> topics) method. Topic subscriptions are not incremental; a new list replaces any current assignment. Kafka delivers each message in the subscribed topics to one share consumer within the share group.
Example:
consumer.subscribe(Arrays.asList("my-topic"));
Polling for Records and Liveness
After subscribing, the share consumer joins the share group automatically when poll(Duration) is invoked. This method fetches data for the subscribed topics. To ensure consumer liveness and continued record reception, poll() must be called periodically. The share consumer sends periodic heartbeats to the broker, and if heartbeats are not sent within the share group’s session time-out, then the share consumer will be considered dead and its partitions will be reassigned.
Example:
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
// Process records
}
Record Delivery and Acknowledgement
When a share consumer in a share group fetches records using poll(), it receives available records from any matching topic-partitions. These records are acquired with a time-limited acquisition lock, making them unavailable to other share consumers during this period. The default lock duration is 30 seconds. The lock duration is controlled by the share.record.lock.duration.ms group configuration parameter. The lock is automatically released if its duration elapses, making the record available for another delivery attempt.
Share consumers can handle acquired records in these ways:
- Acknowledge successful processing
Marks the record as successfully processed.
- Release the record
Makes the record available for another delivery attempt.
- Reject the record
Indicates the record is unprocessable and prevents further delivery attempts.
- Renew the record
Requires the lock on the record for long-running processing.
- Do nothing
The lock automatically releases when its duration expires.
The share consumer can choose to use implicit or explicit acknowledgement of the records it processes by using the share consumer share.acknowledgement.mode configuration property.
Implicit Acknowledgement
If the share consumer configuration property share.acknowledgement.mode is set to implicit or is not set at all, then the share consumer uses implicit acknowledgement. All delivered records are implicitly marked as successfully processed and acknowledged when commitSync(), commitAsync(), or poll() is called.
Example (Implicit with poll):
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); // Implicitly acknowledges previous batch
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
// doProcessing(record);
}
}
Example (implicit with commitSync):
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
// doProcessing(record);
}
consumer.commitSync(); // Implicitly acknowledges all delivered records
}
For more information, see KafkaShareConsumer Javadocs.
Explicit Acknowledgement
If the share consumer configuration property share.acknowledgement.mode is set to explicit, then the share consumer uses explicit acknowledgement. The application must acknowledge all records returned from poll(Duration) using acknowledge(ConsumerRecord, AcknowledgeType) before its next call to poll(Duration). The AcknowledgeType argument indicates whether the record was processed successfully (ACCEPT), should be released for another attempt (RELEASE), permanently rejected (REJECT), or needs extended processing time (RENEW). Acknowledgements are committed when commitSync(), commitAsync(), or poll() is called.
The available acknowledgement types are:
ACCEPTMarks the record as successfully processed.
RELEASEMakes the record available for another delivery attempt.
REJECTIndicates the record is unprocessable and prevents further delivery attempts.
RENEWExtends the acquisition lock timeout for records that require additional processing time beyond the default lock duration. Prevents the record from being re-delivered while still being processed. Use
acquisitionLockTimeoutMs()to determine when to renew the lock.
Example (Explicit acknowledgement with commitSync):
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
try {
// doProcessing(record);
consumer.acknowledge(record, AcknowledgeType.ACCEPT);
} catch (Exception e) {
consumer.acknowledge(record, AcknowledgeType.REJECT);
}
}
consumer.commitSync();
}
Example (Explicit acknowledgement with RENEW for long-running processing):
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
try {
// Check if processing exceeds acquisition lock timeout
Optional<Integer> lockTimeout = consumer.acquisitionLockTimeoutMs();
if (needsMoreTime(record) && lockTimeout.isPresent()) {
// Extend the lock to prevent re-delivery during processing
consumer.acknowledge(record, AcknowledgeType.RENEW);
consumer.commitSync();
}
// doLongRunningProcessing(record);
consumer.acknowledge(record, AcknowledgeType.ACCEPT);
} catch (Exception e) {
consumer.acknowledge(record, AcknowledgeType.REJECT);
}
}
consumer.commitSync();
}
For more information, see KafkaShareConsumer Javadocs.
Multithreaded Processing
The KafkaShareConsumer is not thread-safe, and unsynchronized access will lead to ConcurrentModificationException. The only exception is wakeup(), which can safely be called from an external thread to interrupt an active operation (for example, poll()), causing a WakeupException to be thrown in the blocking thread. This mechanism is useful for safely shutting down the share consumer from another thread.
Example of safe shutdown using wakeup():
public class KafkaShareConsumerRunner implements Runnable {
private final AtomicBoolean closed = new AtomicBoolean(false);
private final KafkaShareConsumer consumer;
public KafkaShareConsumerRunner(KafkaShareConsumer consumer) {
this.consumer = consumer;
}
@Override
public void run() {
try {
consumer.subscribe(Arrays.asList("topic"));
while (!closed.get()) {
ConsumerRecords records = consumer.poll(Duration.ofMillis(10000));
// Handle new records
}
} catch (WakeupException e) {
// Ignore exception if closing
if (!closed.get()) throw e;
} finally {
consumer.close();
}
}
public void shutdown() {
closed.set(true);
consumer.wakeup();
}
}
Note
While thread interrupts are possible, wakeup() is the discouraged method for aborting blocking operations for a clean consumer shutdown.
Transactional Records and Isolation Level
The handling of transactional records in share groups is governed by the share.isolation.level configuration property, which applies to the entire share group.
read_uncommitted(Default)The share group consumes all non-transactional and transactional records, bounded by the high-water mark.
read_committedThe share group consumes only non-transactional records and committed transactional records. This level is bounded by the last stable offset, meaning an open transaction can block the share group’s progress.
Limits
The following table lists the limits for share groups and share consumers.
Limit Name | Description | Limit | Overridable |
|---|---|---|---|
Share Group Delivery Count | The maximum number of delivery attempts in a share group for an individual message | 5 | No |
Share Group Lock Duration | The acquisition lock duration in milliseconds in a share group for an individual message in milliseconds | 30000 | Yes Use the Share Group Lock Duration setting: share.record.lock.duration.msMin 1000 and max 300000 (5 minutes) |
Share Partition Number of Acquisition Locks | The maximum number of messages that can be in an acquired state per topic:partitionIncrease the number of partitions in the share group’s subscribed topics when you need more messages in an acquired state. | 2000 | No |
Share Group Number of Share Consumers | The maximum number of share consumers that can be members of a single share group. | 200 | No |
Share Group Heartbeat Interval | The heartbeat interval given to the members in milliseconds | 5000 | Yes Use the Heartbeat interval setting: share.heartbeat.interval.msMin 5000 and max 15000 |
Share Group Session Timeout | The timeout to detect client failures using the share group protocol | 45000 | Yes Use the Session timeout setting: share.session.timeout.msMin 45000 and max 60000 |