Share Consumers for Confluent Cloud¶
Share consumers are an alternative to traditional consumers. Groups of these consumers, share groups, are an alternative to traditional consumer groups. Share consumers, through their configuration, inherently create share groups.
Traditional consumer groups use a distributed processing model where consumer members collectively read messages from a Apache Kafka® topic. In the traditional model, each topic partition is assigned to only one group member at a time. In contrast, share groups and their share consumer members use a concurrent processing model. Share group members simultaneously process different messages from the same partition, effectively treating it more like a message queue.
Important
A Preview feature is a Confluent Cloud component that is being introduced to gain early feedback from developers. Preview features can be used for evaluation and non-production testing purposes or to provide feedback to Confluent. The warranty, SLA, and Support Services provisions of your agreement with Confluent do not apply to Preview features. Confluent may discontinue providing preview releases of the Preview features at any time in Confluent’s’ sole discretion.
As a preview feature, share groups are available to Java clients only.
Key capabilities and differences from consumer groups¶
Share groups enable Kafka consumers to cooperatively consume and process records from topics. This contrasts with traditional consumer groups where each partition is assigned to only one consumer within the group. This diagram illustrates the interaction between a partition, the broker, share consumers, and share groups (click on image to expand).
The fundamental differences between a share group and a consumer group are:
- cooperative consumption
- Consumers within a share group cooperatively consume records, and partitions may be assigned to multiple consumers. This allows for increased concurrency beyond the number of topic partitions.
- scalability beyond partitions
- The number of consumers in a share group can exceed the number of partitions in a topic. This is a significant advantage over traditional consumer groups where the maximum parallelism is limited by the number of partitions.
- individual record acknowledgement
- Records are acknowledged individually, though the system is optimized for batch processing to improve efficiency.
- delivery attempt counting
- Delivery attempts to consumers in a share group are counted, which enables automated handling of unprocessable records.
When a consumer in a share group fetches records, it receives available records
from any of the topic-partitions matching its subscriptions. Records are
acquired for delivery to this consumer with a time-limited acquisition lock.
While a record is acquired, it is unavailable to other consumers. The default
lock duration is 30 seconds, controlled by the share.record.lock.duration.ms
group configuration parameter. The lock is released automatically once its
duration elapses, making the record available to another consumer.
A consumer holding the lock can handle the record in the following ways:
- Acknowledge successful processing of the record.
- Release the record, making it available for another delivery attempt.
- Reject the record, indicating it’s unprocessable and preventing further delivery attempts for that record.
- Do nothing, in which case the lock is automatically released when its duration expires.
The Kafka cluster limits the number of records acquired for consumers for each
topic-partition within a share group. Once this limit is reached, fetching
operations will temporarily yield no further records until the number of
acquired records decreases (as locks naturally time out). This limit is
controlled by the broker configuration property
group.share.partition.max.record.locks
. By limiting the duration of the
acquisition lock and automatically releasing the locks, the broker ensures
delivery progresses even in the presence of consumer failures.
Note
Confluent Cloud users cannot set broker configuration properties.
Configuration for client developers¶
To create a share consumer, create an instance of KafkaShareConsumer
in a
Java application. When developing a client application that utilizes share
groups, there are consumer-level configuration properties and group-level
configuration properties you can define.
You define the following consumer-level properties in your consumer code:
share.acknowledgement.mode
- Description: Controls the acknowledgement mode for a share consumer.
- Type: string
- Default:
implicit
- Valid Values: [
implicit
,explicit
] - Importance: medium
If your
KafkaShareConsumer
configuration setsshare.acknowledgement.mode=explicit
then, every record returned from poll must be explicitly acknowledged withKafkaShareConsumer.acknowledge
.If you do not set the
share.acknowledgement.mode
configuration or if you setshare.acknowledgement.mode=implicit
, thenimplicit
mode is used.
You define these group-level configuration properties in using the /kafka/kafka-configs-tool tool:
share.auto.offset.reset
- Description: The strategy to initialize the share-partition start offset.
- Type: string
- Default:
latest
- Valid Values: [
latest
,earliest
,by_duration:<duration>
] - Importance: medium
share.heartbeat.interval.ms
- Description: The heartbeat interval given to the members of a share group.
- Type: int
- Default:
5000
(5 seconds) - Valid Values: [1,…]
- Importance: medium
If not specified, uses the broker’s
group.share.heartbeat.interval.ms
configuration. If specified, the minimum is limited by the broker’sgroup.share.min.heartbeat.interval.ms
configuration, and the maximum is limited by the broker’sgroup.share.max.heartbeat.interval.ms
configuration.share.isolation.level
- Description: Controls how to read records written transactionally.
- Type: string
- Default:
read_uncommitted
- Valid Values: [
read_committed
,read_uncommitted
] - Importance: medium
If set to
read_committed
, the share group only delivers transactional records which were committed. If set toread_uncommitted
, the share group returns all messages, even transactional messages which were aborted. Non-transactional records are returned unconditionally in either mode. Confluent Cloud cannot set broker level configurations.share.record.lock.duration.ms
- Description: The record acquisition lock duration in milliseconds for share groups.
- Type: int
- Default:
30000
(30 seconds) - Valid Values: [1000,…]
- Importance: medium
If not specified, uses the broker’s
group.share.record.lock.duration.ms
configuration. If specified, the minimum is limited by the broker’sgroup.share.min.record.lock.duration.ms
configuration, and the maximum is limited by the broker’sgroup.share.max.record.lock.duration.ms
configuration. Confluent Cloud cannot set broker level configurations.share.session.timeout.ms
- Description: The timeout to detect client failures when using the share group protocol.
- Type: int
- Default:
45000
(45 seconds) - Valid Values: [1,…]
- Importance: medium
If not specified, uses the broker’s
group.share.session.timeout.ms
configuration. If specified, the minimum is limited by the broker’sgroup.share.min.session.timeout.ms
configuration, and the maximum is limited by the broker’sgroup.share.max.session.timeout.ms
configuration. Confluent Cloud cannot set broker level configurations.
Manage share groups using the kafka-share-groups tool¶
Administrators can use the kafka-share-groups.sh
tool to list,
describe, or delete share groups. Only share groups without any active members
can be deleted. For information on using this tool, see the kafka-share-groups tool usage details.
List all share groups:
bin/kafka-share-groups.sh --bootstrap-server localhost:9092 --list
View current start offset of a share group:
bin/kafka-share-groups.sh --bootstrap-server localhost:9092 --describe --group my-share-group
Describe active members in a share group:
bin/kafka-share-groups.sh --bootstrap-server localhost:9092 --describe --group my-share-group --members
Delete offsets of individual topics in a share group:
bin/kafka-share-groups.sh --bootstrap-server localhost:9092 --delete-offsets --group my-share-group --topic topic1
Delete one or more share groups:
bin/kafka-share-groups.sh --bootstrap-server localhost:9092 --delete --group my-share-group
Monitoring Share Groups¶
Metrics for share groups are emitted under the kafka.server:type=group-coordinator-metrics
MBean. Relevant metrics include:
group-count,protocol={share}
- Total number of groups per group type: Share.
share-acknowledgement
- The total number of offsets acknowledged for share groups
record-acknowledgement
- The number of records acknowledged per acknowledgement type.
partition-load-time
- The time taken to load the share partitions.
These metrics provide visibility into the health and performance of share groups on the Kafka cluster.
Using the KafkaShareConsumer for Share Groups¶
Use the org.apache.kafka.clients.consumer.KafkaShareConsumer
class to consume
records from a Kafka cluster using share groups. Setting up a
KafkaShareConsumer
involves configuration and a subscription process similar
to a regular KafkaConsumer
, but with key differences in how messages are
handled and acknowledged.
Configuration¶
The KafkaShareConsumer
can be instantiated using various constructors that
accept either a Map<String, Object>
or Properties
object for configuration,
optionally including key and value deserializers.
Example:
Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("group.id", "my-share-group"); // All consumers in the same share group will have the same group.id
props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaShareConsumer<String, String> consumer = new KafkaShareConsumer<>(props);
Note
After creating a KafkaShareConsumer
, you must call close()
to prevent resource leaks.
Subscribing to Topics¶
Consumers in a share group dynamically subscribe to topics using the
subscribe(Collection<String> topics)
method. Topic subscriptions are not
incremental; a new list replaces any current assignment. Kafka delivers each
message in the subscribed topics to one consumer within the share group.
Example:
consumer.subscribe(Arrays.asList("my-topic"));
Polling for Records and Liveness¶
After subscribed, the consumer joins the share group automatically when
poll(Duration)
is invoked. This method fetches data for the subscribed topics.
To ensure consumer liveness and continued record reception, poll()
must be
called periodically. The consumer sends periodic heartbeats to the broker , and
if heartbeats are not sent within the share group’s session time-out, then the
consumer will be considered dead and its partitions will be reassigned.
Example:
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
// Process records
}
Record Delivery and Acknowledgement¶
When a consumer in a share group fetches records using poll()
, it receives
available records from any matching topic-partitions. These records are acquired
with a time-limited acquisition lock , making them unavailable to other
consumers during this period. The default lock duration is 30 seconds. The lock
duration is controlled by the share.record.lock.duration.ms
group
configuration parameter. The lock is automatically released if its duration
elapses, making the record available for another delivery attempt.
Consumers can handle acquired records in four ways:
- Acknowledge successful processing
- Marks the record as successfully processed.
- Release the record
- Makes the record available for another delivery attempt.
- Reject the record
- Indicates the record is unprocessable and prevents further delivery attempts.
- Do nothing
- The lock automatically releases when its duration expires.
The consumer can choose to use implicit or explicit acknowledgement of the records it
processes by using the consumer share.acknowledgement.mode
configuration property.
Implicit Acknowledgement¶
If the consumer configuration property share.acknowledgement.mode
is set to implicit
or is not set it at all, then the consumer uses implicit acknowledgement. All delivered records are
implicitly marked as successfully processed and acknowledged when commitSync()
, commitAsync() , or poll() is called.
Example (Implicit with poll
):
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); // Implicitly acknowledges previous batch
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
// doProcessing(record);
}
}
Example (implicit with commitSync
):
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
// doProcessing(record);
}
consumer.commitSync(); // Implicitly acknowledges all delivered records
}
For more information, see KafkaShareConsumer Javadocs.
Explicit Acknowledgement¶
If the consumer configuration property share.acknowledgement.mode
is set to explicit
,
then the consumer uses explicit acknowledgement. The application must acknowledge all records
returned from poll(Duration)
using acknowledge(ConsumerRecord, AcknowledgeType)
before
its next call to poll(Duration)
. The AcknowledgeType
argument indicates
whether the record was processed successfully (ACCEPT
), should be released for another
attempt (RELEASE
), or permanently rejected (REJECT
). Acknowledgments are committed
when commitSync()
, commitAsync()
, or poll()
is called.
Example (Explicit acknowledgement with commitSync
):
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
try {
// doProcessing(record);
consumer.acknowledge(record, AcknowledgeType.ACCEPT);
} catch (Exception e) {
consumer.acknowledge(record, AcknowledgeType.REJECT);
}
}
consumer.commitSync();
}
For more information, see KafkaShareConsumer Javadocs.
Multithreaded Processing¶
The KafkaShareConsumer
is not thread-safe, and unsynchronized access will
lead to ConcurrentModificationException
. The only exception is wakeup()
,
which can safely be called from an external thread to interrupt an active
operation (for example, poll()
), causing a WakeupException
to be thrown in the
blocking thread. This mechanism is useful for safely shutting down the consumer
from another thread.
Example of safe shutdown using wakeup()
:
public class KafkaShareConsumerRunner implements Runnable {
private final AtomicBoolean closed = new AtomicBoolean(false);
private final KafkaShareConsumer consumer;
public KafkaShareConsumerRunner(KafkaShareConsumer consumer) {
this.consumer = consumer;
}
@Override
public void run() {
try {
consumer.subscribe(Arrays.asList("topic"));
while (!closed.get()) {
ConsumerRecords records = consumer.poll(Duration.ofMillis(10000));
// Handle new records
}
} catch (WakeupException e) {
// Ignore exception if closing
if (!closed.get()) throw e;
} finally {
consumer.close();
}
}
public void shutdown() {
closed.set(true);
consumer.wakeup();
}
}
Note
While thread interrupts are possible, wakeup()
is the discouraged method
for aborting blocking operations for a clean consumer shutdown.
Transactional Records and Isolation Level¶
The handling of transactional records in share groups is governed by the
share.isolation.level
configuration property, which applies to the
entire share group.
read_uncommitted
(Default)- The share group consumes all non-transactional and transactional records , bounded by the high-water mark.
read_committed
- The share group consumes only non-transactional records and committed transactional records. This level is bounded by the last stable offset, meaning an open transaction can block the share group’s progress.