Share Groups for Confluent Platform

This topic includes operational guidance for enabling and monitoring share groups on Confluent Platform brokers. This includes server-side configuration settings and commands and JMX metrics. For a Docker example to get a testing environment up and running, see Configure Share Groups for Docker in Confluent Platform.

For client-side details, see Share Consumers for Confluent Platform.

Important

A Preview feature is a Confluent Platform component that is being introduced to gain early feedback from developers. Preview features can be used for evaluation and non-production testing purposes or to provide feedback to Confluent. The warranty, SLA, and Support Services provisions of your agreement with Confluent do not apply to Preview features. Confluent may discontinue providing preview releases of the Preview features at any time at the sole discretion of Confluent.

Brokers and share groups

Share groups are not enabled by default. Broker administrators must enable share groups and can tune parameters related to their behavior and resource consumption. Administrators enable share groups by using the kafka-features.sh tool to set share.version=1 on a cluster. This typically involves:

  1. Generate a cluster UUID:

    KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"
    
  2. Format the log directories.

    bin/kafka-storage.sh format --standalone -t $KAFKA_CLUSTER_ID -c config/server.properties
    
  3. Start the Kafka server.

    bin/kafka-server-start.sh config/server.properties
    
  4. Set the feature version.

    kafka-features.sh --bootstrap-server localhost:9092 upgrade --feature share.version=1
    

Configurations for share groups

The following broker configurations are pertinent to share groups. For more information, see Kafka Broker and Controller Configuration Reference for Confluent Platform.

Share coordinator configurations

These properties control the operation, resource usage, and persistence of the share coordinator component on the broker.

  • group.share.max.share.sessions

  • group.share.assignors

  • share.coordinator.state.topic.num.partitions

  • share.coordinator.state.topic.replication.factor

  • share.coordinator.state.topic.segment.bytes

  • share.coordinator.state.topic.min.isr

  • share.coordinator.state.topic.compression.codec

  • share.coordinator.append.linger.ms

  • share.coordinator.load.buffer.size

  • share.coordinator.snapshot.update.records.per.snapshot

  • share.coordinator.threads

  • share.coordinator.write.timeout.ms

  • share.fetch.purgatory.purge.interval.requests

Group configurations

These properties establish the default or boundary settings for the share group behavior, which can often be overridden or constrained by dynamic group configuration.

  • group.share.delivery.count.limit

  • group.share.heartbeat.interval.ms

  • group.share.max.heartbeat.interval.ms

  • group.share.max.record.lock.duration.ms

  • group.share.max.session.timeout.ms

  • group.share.max.size

  • group.share.min.heartbeat.interval.ms

  • group.share.min.record.lock.duration.ms

  • group.share.min.session.timeout.ms

  • group.share.partition.max.record.locks

  • group.share.record.lock.duration.ms

  • group.share.session.timeout.ms

Dynamic group configurations

These configurations are dynamic, meaning they are applied per share group and managed by the broker.

To be clear, the following 5 configurations are applied to the group itself, and all of the ones in the previous two lists are applied to the broker.

  • share.isolation.level

  • share.auto.offset.reset

  • share.record.lock.duration.ms

  • share.heartbeat.interval.ms

  • share.session.timeout.ms

Metrics for share groups

Confluent Platform exposes several metrics through Java Management Extensions (JMX) that are useful for monitoring share groups. For more information, see Monitor Kafka with JMX in Confluent Platform.

Broker topic metrics (general share fetch)

TotalShareFetchRequestsPerSec

MBean: kafka.server:type=BrokerTopicMetrics,name=TotalShareFetchRequestsPerSec,topic={topic}

The fetch request rate per second. This metric is tracked cumulatively and per-topic.

FailedShareFetchRequestsPerSec

MBean: kafka.server:type=BrokerTopicMetrics,name=FailedShareFetchRequestsPerSec,topic={topic}

The share fetch request rate per second for requests that failed. This metric is tracked cumulatively and per-topic.

TotalShareAcknowledgementRequestsPerSec

MBean: kafka.server:type=BrokerTopicMetrics,name=TotalShareAcknowledgementRequestsPerSec,topic={topic}

The acknowledgement request rate per second. This metric is tracked cumulatively and per-topic.

FailedShareAcknowledgementRequestsPerSec

MBean: kafka.server:type=BrokerTopicMetrics,name=FailedShareAcknowledgementRequestsPerSec,topic={topic}

The share acknowledgement request rate per second for requests that failed. This metric is tracked cumulatively and per-topic.

Share group metrics

RecordAcknowledgementsPerSec

MBean: kafka.server:type=ShareGroupMetrics,name=RecordAcknowledgementsPerSec,ackType={Accept|Release|Reject}

The rate per second of records acknowledged, segmented by the acknowledgement type, such as Accept, Release, or Reject.

PartitionLoadTimeMs

MBean: kafka.server:type=ShareGroupMetrics,name=PartitionLoadTimeMs

The time taken to load the share partitions.

RequestTopicPartitionsFetchRatio

MBean: kafka.server:type=ShareGroupMetrics,name=RequestTopicPartitionsFetchRatio,group={group_id}

The ratio of topic-partitions successfully acquired for fetching to the total number of topic-partitions requested in a share fetch for a specific share group.

TopicPartitionsAcquireTimeMs

MBean: kafka.server:type=ShareGroupMetrics,name=TopicPartitionsAcquireTimeMs,group={group_id}

The time elapsed (in milliseconds) to acquire any topic partition for a fetch for a specific share group.

Share partition metrics

AcquisitionLockTimeoutPerSec

MBean: kafka.server:type=SharePartitionMetrics,name=AcquisitionLockTimeoutPerSec,group={group_id},topic={topic_name},partition={partition}

The rate of acquisition locks for records that were not acknowledged within the configured timeout. This is tracked per share partition.

InFlightMessageCount

MBean: kafka.server:type=SharePartitionMetrics,name=InFlightMessageCount,group={group_id},topic={topic_name},partition={partition}

The current number of in-flight messages (unacknowledged records) for the share partition.

InFlightBatchCount

MBean: kafka.server:type=SharePartitionMetrics,name=InFlightBatchCount,group={group_id},topic={topic_name},partition={partition}

The number of in-flight batches currently being processed for the share partition.

InFlightBatchMessageCount

MBean: kafka.server:type=SharePartitionMetrics,name=InFlightBatchMessageCount,group={group_id},topic={topic_name},partition={partition}

The number of messages in the in-flight batch for the share partition.

FetchLockTimeMs

MBean: kafka.server:type=SharePartitionMetrics,name=FetchLockTimeMs,group={group_id},topic={topic_name},partition={partition}

The time elapsed (in milliseconds) while a share partition is held under lock for fetching messages.

FetchLockRatio

MBean: kafka.server:type=SharePartitionMetrics,name=FetchLockRatio,group={group_id},topic={topic_name},partition={partition}

The fraction of time the share partition is held under lock for fetching messages.

Share session cache metrics

ShareSessionEvictionsPerSec

MBean: kafka.server:type=ShareSessionCache,name=ShareSessionEvictionsPerSec

The share session eviction rate per second.

SharePartitionsCount

MBean: kafka.server:type=ShareSessionCache,name=SharePartitionsCount

The number of cached share partitions in the session cache.

ShareSessionsCount

MBean: kafka.server:type=ShareSessionCache,name=ShareSessionsCount

The number of cached share sessions.

Delayed operation metrics

ExpiresPerSec

MBean: kafka.server:type=DelayedShareFetchMetrics,name=ExpiresPerSec

The expired delayed share fetch operation rate per second.

PurgatorySize

MBean: kafka.server:type=DelayedOperationPurgatory,delayedOperation=ShareFetch,name=PurgatorySize

The number of requests currently waiting in the share fetch purgatory.

NumDelayedOperations

MBean: kafka.server:type=DelayedOperationPurgatory,delayedOperation=ShareFetch,name=NumDelayedOperations

The number of delayed operations for the share fetch purgatory.