Share Groups for Confluent Platform
This topic includes operational guidance for enabling and monitoring share groups on Confluent Platform brokers. This includes server-side configuration settings and commands and JMX metrics. For a Docker example to get a testing environment up and running, see Configure Share Groups for Docker in Confluent Platform.
For client-side details, see Share Consumers for Confluent Platform.
Important
A Preview feature is a Confluent Platform component that is being introduced to gain early feedback from developers. Preview features can be used for evaluation and non-production testing purposes or to provide feedback to Confluent. The warranty, SLA, and Support Services provisions of your agreement with Confluent do not apply to Preview features. Confluent may discontinue providing preview releases of the Preview features at any time at the sole discretion of Confluent.
Brokers and share groups
Share groups are not enabled by default. Broker administrators must enable share groups and can tune parameters related to their behavior and resource consumption. Administrators enable share groups by using the kafka-features.sh tool to set share.version=1 on a cluster. This typically involves:
Generate a cluster UUID:
KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"
Format the log directories.
bin/kafka-storage.sh format --standalone -t $KAFKA_CLUSTER_ID -c config/server.properties
Start the Kafka server.
bin/kafka-server-start.sh config/server.propertiesSet the feature version.
kafka-features.sh --bootstrap-server localhost:9092 upgrade --feature share.version=1
Configurations for share groups
The following broker configurations are pertinent to share groups. For more information, see Kafka Broker and Controller Configuration Reference for Confluent Platform.
Share coordinator configurations
These properties control the operation, resource usage, and persistence of the share coordinator component on the broker.
group.share.max.share.sessionsgroup.share.assignorsshare.coordinator.state.topic.num.partitionsshare.coordinator.state.topic.replication.factorshare.coordinator.state.topic.segment.bytesshare.coordinator.state.topic.min.isrshare.coordinator.state.topic.compression.codecshare.coordinator.append.linger.msshare.coordinator.load.buffer.sizeshare.coordinator.snapshot.update.records.per.snapshotshare.coordinator.threadsshare.coordinator.write.timeout.msshare.fetch.purgatory.purge.interval.requests
Group configurations
These properties establish the default or boundary settings for the share group behavior, which can often be overridden or constrained by dynamic group configuration.
group.share.delivery.count.limitgroup.share.heartbeat.interval.msgroup.share.max.heartbeat.interval.msgroup.share.max.record.lock.duration.msgroup.share.max.session.timeout.msgroup.share.max.sizegroup.share.min.heartbeat.interval.msgroup.share.min.record.lock.duration.msgroup.share.min.session.timeout.msgroup.share.partition.max.record.locksgroup.share.record.lock.duration.msgroup.share.session.timeout.ms
Dynamic group configurations
These configurations are dynamic, meaning they are applied per share group and managed by the broker.
To be clear, the following 5 configurations are applied to the group itself, and all of the ones in the previous two lists are applied to the broker.
share.isolation.levelshare.auto.offset.resetshare.record.lock.duration.msshare.heartbeat.interval.msshare.session.timeout.ms
Metrics for share groups
Confluent Platform exposes several metrics through Java Management Extensions (JMX) that are useful for monitoring share groups. For more information, see Monitor Kafka with JMX in Confluent Platform.
Broker topic metrics (general share fetch)
TotalShareFetchRequestsPerSec
- MBean:
kafka.server:type=BrokerTopicMetrics,name=TotalShareFetchRequestsPerSec,topic={topic} The fetch request rate per second. This metric is tracked cumulatively and per-topic.
FailedShareFetchRequestsPerSec
- MBean:
kafka.server:type=BrokerTopicMetrics,name=FailedShareFetchRequestsPerSec,topic={topic} The share fetch request rate per second for requests that failed. This metric is tracked cumulatively and per-topic.
TotalShareAcknowledgementRequestsPerSec
- MBean:
kafka.server:type=BrokerTopicMetrics,name=TotalShareAcknowledgementRequestsPerSec,topic={topic} The acknowledgement request rate per second. This metric is tracked cumulatively and per-topic.
FailedShareAcknowledgementRequestsPerSec
- MBean:
kafka.server:type=BrokerTopicMetrics,name=FailedShareAcknowledgementRequestsPerSec,topic={topic} The share acknowledgement request rate per second for requests that failed. This metric is tracked cumulatively and per-topic.
Share group metrics
RecordAcknowledgementsPerSec
- MBean:
kafka.server:type=ShareGroupMetrics,name=RecordAcknowledgementsPerSec,ackType={Accept|Release|Reject} The rate per second of records acknowledged, segmented by the acknowledgement type, such as Accept, Release, or Reject.
PartitionLoadTimeMs
- MBean:
kafka.server:type=ShareGroupMetrics,name=PartitionLoadTimeMs The time taken to load the share partitions.
RequestTopicPartitionsFetchRatio
- MBean:
kafka.server:type=ShareGroupMetrics,name=RequestTopicPartitionsFetchRatio,group={group_id} The ratio of topic-partitions successfully acquired for fetching to the total number of topic-partitions requested in a share fetch for a specific share group.
TopicPartitionsAcquireTimeMs
- MBean:
kafka.server:type=ShareGroupMetrics,name=TopicPartitionsAcquireTimeMs,group={group_id} The time elapsed (in milliseconds) to acquire any topic partition for a fetch for a specific share group.
Share partition metrics
AcquisitionLockTimeoutPerSec
- MBean:
kafka.server:type=SharePartitionMetrics,name=AcquisitionLockTimeoutPerSec,group={group_id},topic={topic_name},partition={partition} The rate of acquisition locks for records that were not acknowledged within the configured timeout. This is tracked per share partition.
InFlightMessageCount
- MBean:
kafka.server:type=SharePartitionMetrics,name=InFlightMessageCount,group={group_id},topic={topic_name},partition={partition} The current number of in-flight messages (unacknowledged records) for the share partition.
InFlightBatchCount
- MBean:
kafka.server:type=SharePartitionMetrics,name=InFlightBatchCount,group={group_id},topic={topic_name},partition={partition} The number of in-flight batches currently being processed for the share partition.
InFlightBatchMessageCount
- MBean:
kafka.server:type=SharePartitionMetrics,name=InFlightBatchMessageCount,group={group_id},topic={topic_name},partition={partition} The number of messages in the in-flight batch for the share partition.
FetchLockTimeMs
- MBean:
kafka.server:type=SharePartitionMetrics,name=FetchLockTimeMs,group={group_id},topic={topic_name},partition={partition} The time elapsed (in milliseconds) while a share partition is held under lock for fetching messages.
FetchLockRatio
- MBean:
kafka.server:type=SharePartitionMetrics,name=FetchLockRatio,group={group_id},topic={topic_name},partition={partition} The fraction of time the share partition is held under lock for fetching messages.
Share session cache metrics
ShareSessionEvictionsPerSec
- MBean:
kafka.server:type=ShareSessionCache,name=ShareSessionEvictionsPerSec The share session eviction rate per second.
SharePartitionsCount
- MBean:
kafka.server:type=ShareSessionCache,name=SharePartitionsCount The number of cached share partitions in the session cache.
ShareSessionsCount
- MBean:
kafka.server:type=ShareSessionCache,name=ShareSessionsCount The number of cached share sessions.
Delayed operation metrics
ExpiresPerSec
- MBean:
kafka.server:type=DelayedShareFetchMetrics,name=ExpiresPerSec The expired delayed share fetch operation rate per second.
PurgatorySize
- MBean:
kafka.server:type=DelayedOperationPurgatory,delayedOperation=ShareFetch,name=PurgatorySize The number of requests currently waiting in the share fetch purgatory.
NumDelayedOperations
- MBean:
kafka.server:type=DelayedOperationPurgatory,delayedOperation=ShareFetch,name=NumDelayedOperations The number of delayed operations for the share fetch purgatory.