Share Consumers for Confluent Cloud

Share consumers are an alternative to traditional consumers. Groups of these consumers, share groups, are an alternative to traditional consumer groups. Share consumers, through their configuration, inherently create share groups.

Traditional consumer groups use a distributed processing model where consumer members collectively read messages from a Apache Kafka® topic. In the traditional model, each topic partition is assigned to only one group member at a time. In contrast, share groups and their share consumer members use a concurrent processing model. Share group members simultaneously process different messages from the same partition, effectively treating it more like a message queue.

Important

A Preview feature is a Confluent Cloud component that is being introduced to gain early feedback from developers. Preview features can be used for evaluation and non-production testing purposes or to provide feedback to Confluent. The warranty, SLA, and Support Services provisions of your agreement with Confluent do not apply to Preview features. Confluent may discontinue providing preview releases of the Preview features at any time in Confluent’s’ sole discretion.

As a preview feature, share groups are available to Java clients only.

Key capabilities and differences from consumer groups

Share groups enable Kafka consumers to cooperatively consume and process records from topics. This contrasts with traditional consumer groups where each partition is assigned to only one consumer within the group. This diagram illustrates the interaction between a partition, the broker, share consumers, and share groups (click on image to expand).

../_images/clients-shareconsumer-flow.svg

The fundamental differences between a share group and a consumer group are:

cooperative consumption
Consumers within a share group cooperatively consume records, and partitions may be assigned to multiple consumers. This allows for increased concurrency beyond the number of topic partitions.
scalability beyond partitions
The number of consumers in a share group can exceed the number of partitions in a topic. This is a significant advantage over traditional consumer groups where the maximum parallelism is limited by the number of partitions.
individual record acknowledgement
Records are acknowledged individually, though the system is optimized for batch processing to improve efficiency.
delivery attempt counting
Delivery attempts to consumers in a share group are counted, which enables automated handling of unprocessable records.

When a consumer in a share group fetches records, it receives available records from any of the topic-partitions matching its subscriptions. Records are acquired for delivery to this consumer with a time-limited acquisition lock. While a record is acquired, it is unavailable to other consumers. The default lock duration is 30 seconds, controlled by the share.record.lock.duration.ms group configuration parameter. The lock is released automatically once its duration elapses, making the record available to another consumer.

A consumer holding the lock can handle the record in the following ways:

  • Acknowledge successful processing of the record.
  • Release the record, making it available for another delivery attempt.
  • Reject the record, indicating it’s unprocessable and preventing further delivery attempts for that record.
  • Do nothing, in which case the lock is automatically released when its duration expires.

The Kafka cluster limits the number of records acquired for consumers for each topic-partition within a share group. Once this limit is reached, fetching operations will temporarily yield no further records until the number of acquired records decreases (as locks naturally time out). This limit is controlled by the broker configuration property group.share.partition.max.record.locks. By limiting the duration of the acquisition lock and automatically releasing the locks, the broker ensures delivery progresses even in the presence of consumer failures.

Note

Confluent Cloud users cannot set broker configuration properties.

Configuration for client developers

To create a share consumer, create an instance of KafkaShareConsumer in a Java application. When developing a client application that utilizes share groups, there are consumer-level configuration properties and group-level configuration properties you can define.

You define the following consumer-level properties in your consumer code:

share.acknowledgement.mode
  • Description: Controls the acknowledgement mode for a share consumer.
  • Type: string
  • Default: implicit
  • Valid Values: [implicit, explicit]
  • Importance: medium

If your KafkaShareConsumer configuration sets share.acknowledgement.mode=explicit then, every record returned from poll must be explicitly acknowledged with KafkaShareConsumer.acknowledge.

If you do not set the share.acknowledgement.mode configuration or if you set share.acknowledgement.mode=implicit, then implicit mode is used.

You define these group-level configuration properties in using the /kafka/kafka-configs-tool tool:

share.auto.offset.reset
  • Description: The strategy to initialize the share-partition start offset.
  • Type: string
  • Default: latest
  • Valid Values: [latest, earliest, by_duration:<duration>]
  • Importance: medium
share.heartbeat.interval.ms
  • Description: The heartbeat interval given to the members of a share group.
  • Type: int
  • Default: 5000 (5 seconds)
  • Valid Values: [1,…]
  • Importance: medium

If not specified, uses the broker’s group.share.heartbeat.interval.ms configuration. If specified, the minimum is limited by the broker’s group.share.min.heartbeat.interval.ms configuration, and the maximum is limited by the broker’s group.share.max.heartbeat.interval.ms configuration.

share.isolation.level
  • Description: Controls how to read records written transactionally.
  • Type: string
  • Default: read_uncommitted
  • Valid Values: [read_committed, read_uncommitted]
  • Importance: medium

If set to read_committed, the share group only delivers transactional records which were committed. If set to read_uncommitted, the share group returns all messages, even transactional messages which were aborted. Non-transactional records are returned unconditionally in either mode. Confluent Cloud cannot set broker level configurations.

share.record.lock.duration.ms
  • Description: The record acquisition lock duration in milliseconds for share groups.
  • Type: int
  • Default: 30000 (30 seconds)
  • Valid Values: [1000,…]
  • Importance: medium

If not specified, uses the broker’s group.share.record.lock.duration.ms configuration. If specified, the minimum is limited by the broker’s group.share.min.record.lock.duration.ms configuration, and the maximum is limited by the broker’s group.share.max.record.lock.duration.ms configuration. Confluent Cloud cannot set broker level configurations.

share.session.timeout.ms
  • Description: The timeout to detect client failures when using the share group protocol.
  • Type: int
  • Default: 45000 (45 seconds)
  • Valid Values: [1,…]
  • Importance: medium

If not specified, uses the broker’s group.share.session.timeout.ms configuration. If specified, the minimum is limited by the broker’s group.share.min.session.timeout.ms configuration, and the maximum is limited by the broker’s group.share.max.session.timeout.ms configuration. Confluent Cloud cannot set broker level configurations.

Manage share groups using the kafka-share-groups tool

Administrators can use the kafka-share-groups.sh tool to list, describe, or delete share groups. Only share groups without any active members can be deleted. For information on using this tool, see the kafka-share-groups tool usage details.

List all share groups:

bin/kafka-share-groups.sh --bootstrap-server localhost:9092 --list

View current start offset of a share group:

bin/kafka-share-groups.sh --bootstrap-server localhost:9092 --describe --group my-share-group

Describe active members in a share group:

bin/kafka-share-groups.sh --bootstrap-server localhost:9092 --describe --group my-share-group --members

Delete offsets of individual topics in a share group:

bin/kafka-share-groups.sh --bootstrap-server localhost:9092 --delete-offsets --group my-share-group --topic topic1

Delete one or more share groups:

bin/kafka-share-groups.sh --bootstrap-server localhost:9092 --delete --group my-share-group

Monitoring Share Groups

Metrics for share groups are emitted under the kafka.server:type=group-coordinator-metrics MBean. Relevant metrics include:

group-count,protocol={share}
Total number of groups per group type: Share.
share-acknowledgement
The total number of offsets acknowledged for share groups
record-acknowledgement
The number of records acknowledged per acknowledgement type.
partition-load-time
The time taken to load the share partitions.

These metrics provide visibility into the health and performance of share groups on the Kafka cluster.

Using the KafkaShareConsumer for Share Groups

Use the org.apache.kafka.clients.consumer.KafkaShareConsumer class to consume records from a Kafka cluster using share groups. Setting up a KafkaShareConsumer involves configuration and a subscription process similar to a regular KafkaConsumer, but with key differences in how messages are handled and acknowledged.

Configuration

The KafkaShareConsumer can be instantiated using various constructors that accept either a Map<String, Object> or Properties object for configuration, optionally including key and value deserializers.

Example:

Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("group.id", "my-share-group"); // All consumers in the same share group will have the same group.id
props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

KafkaShareConsumer<String, String> consumer = new KafkaShareConsumer<>(props);

Note

After creating a KafkaShareConsumer, you must call close() to prevent resource leaks.

Subscribing to Topics

Consumers in a share group dynamically subscribe to topics using the subscribe(Collection<String> topics) method. Topic subscriptions are not incremental; a new list replaces any current assignment. Kafka delivers each message in the subscribed topics to one consumer within the share group.

Example:

consumer.subscribe(Arrays.asList("my-topic"));

Polling for Records and Liveness

After subscribed, the consumer joins the share group automatically when poll(Duration) is invoked. This method fetches data for the subscribed topics. To ensure consumer liveness and continued record reception, poll() must be called periodically. The consumer sends periodic heartbeats to the broker , and if heartbeats are not sent within the share group’s session time-out, then the consumer will be considered dead and its partitions will be reassigned.

Example:

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    // Process records
}

Record Delivery and Acknowledgement

When a consumer in a share group fetches records using poll(), it receives available records from any matching topic-partitions. These records are acquired with a time-limited acquisition lock , making them unavailable to other consumers during this period. The default lock duration is 30 seconds. The lock duration is controlled by the share.record.lock.duration.ms group configuration parameter. The lock is automatically released if its duration elapses, making the record available for another delivery attempt.

Consumers can handle acquired records in four ways:

Acknowledge successful processing
Marks the record as successfully processed.
Release the record
Makes the record available for another delivery attempt.
Reject the record
Indicates the record is unprocessable and prevents further delivery attempts.
Do nothing
The lock automatically releases when its duration expires.

The consumer can choose to use implicit or explicit acknowledgement of the records it processes by using the consumer share.acknowledgement.mode configuration property.

Implicit Acknowledgement

If the consumer configuration property share.acknowledgement.mode is set to implicit or is not set it at all, then the consumer uses implicit acknowledgement. All delivered records are implicitly marked as successfully processed and acknowledged when commitSync() , commitAsync() , or poll() is called.

Example (Implicit with poll):

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); // Implicitly acknowledges previous batch
    for (ConsumerRecord<String, String> record : records) {
        System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
        // doProcessing(record);
    }
}

Example (implicit with commitSync):

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> record : records) {
        System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
        // doProcessing(record);
    }
    consumer.commitSync(); // Implicitly acknowledges all delivered records
}

For more information, see KafkaShareConsumer Javadocs.

Explicit Acknowledgement

If the consumer configuration property share.acknowledgement.mode is set to explicit, then the consumer uses explicit acknowledgement. The application must acknowledge all records returned from poll(Duration) using acknowledge(ConsumerRecord, AcknowledgeType) before its next call to poll(Duration). The AcknowledgeType argument indicates whether the record was processed successfully (ACCEPT), should be released for another attempt (RELEASE), or permanently rejected (REJECT). Acknowledgments are committed when commitSync(), commitAsync(), or poll() is called.

Example (Explicit acknowledgement with commitSync):

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> record : records) {
        try {
            // doProcessing(record);
            consumer.acknowledge(record, AcknowledgeType.ACCEPT);
        } catch (Exception e) {
            consumer.acknowledge(record, AcknowledgeType.REJECT);
        }
    }
    consumer.commitSync();
}

For more information, see KafkaShareConsumer Javadocs.

Multithreaded Processing

The KafkaShareConsumer is not thread-safe, and unsynchronized access will lead to ConcurrentModificationException. The only exception is wakeup(), which can safely be called from an external thread to interrupt an active operation (for example, poll()), causing a WakeupException to be thrown in the blocking thread. This mechanism is useful for safely shutting down the consumer from another thread.

Example of safe shutdown using wakeup():

public class KafkaShareConsumerRunner implements Runnable {
    private final AtomicBoolean closed = new AtomicBoolean(false);
    private final KafkaShareConsumer consumer;

    public KafkaShareConsumerRunner(KafkaShareConsumer consumer) {
        this.consumer = consumer;
    }

    @Override
    public void run() {
        try {
            consumer.subscribe(Arrays.asList("topic"));
            while (!closed.get()) {
                ConsumerRecords records = consumer.poll(Duration.ofMillis(10000));
                // Handle new records
            }
        } catch (WakeupException e) {
            // Ignore exception if closing
            if (!closed.get()) throw e;
        } finally {
            consumer.close();
        }
    }

    public void shutdown() {
        closed.set(true);
        consumer.wakeup();
    }
}

Note

While thread interrupts are possible, wakeup() is the discouraged method for aborting blocking operations for a clean consumer shutdown.

Transactional Records and Isolation Level

The handling of transactional records in share groups is governed by the share.isolation.level configuration property, which applies to the entire share group.

read_uncommitted (Default)
The share group consumes all non-transactional and transactional records , bounded by the high-water mark.
read_committed
The share group consumes only non-transactional records and committed transactional records. This level is bounded by the last stable offset, meaning an open transaction can block the share group’s progress.