Streams Rebalance Protocol for Kafka Streams in Confluent Platform

The Streams Rebalance Protocol is a broker-driven rebalancing system designed specifically for Kafka Streams applications. Following the pattern of KIP-848, which moved rebalance coordination of plain consumers from clients to brokers, KIP-1071 extends this model to Kafka Streams workloads.

Instead of clients computing new assignments on the client during rebalance events involving all members of the group, assignments are computed continuously on the broker. Instead of using a consumer group, the streams application registers as a streams group with the broker, which manages and exposes all metadata required for coordination of the Kafka Streams application instances.

This approach brings Kafka Streams coordination in line with the modern broker-driven rebalance model introduced for consumers in KIP-848, providing a dedicated group type with streams-specific semantics and metadata management.

The following features are available in the current release:

  • Core Streams Group Rebalance Protocol: The group.protocol=streams configuration enables the dedicated streams rebalance protocol. This separates streams groups from consumer groups and provides a streams-specific group membership lifecycle and metadata management on the broker.

  • Sticky Task Assignor: A basic task-assignment strategy that minimizes task movement during rebalances is included.

  • Interactive Query Support: IQ operations are compatible with the new streams protocol.

  • New Admin RPC: The StreamsGroupDescribe RPC provides streams-specific metadata separate from consumer group information, with corresponding access via the Admin interface.

  • CLI Integration: You can list, describe, and delete streams groups via the kafka-streams-groups script.

  • Offline Migration: After shutting down all members and waiting for their session.timeout.ms to expire (or forcing an explicit group leave), a classic group can be converted to a streams group and a streams group can be converted to a classic group. The only broker-side group data that is preserved are the committed offsets. Internal topics (changelog and repartition topics) continue to exist as regular Kafka topics.

Why use the Streams Rebalance Protocol?

KIP-1071 delivers broker-driven rebalancing specifically optimized for Kafka Streams applications. This reduces coordination overhead and improves failure detection, leading to more stable and responsive stream processing workloads.

  • Broker-driven coordination: Centralizes task assignment logic on brokers instead of the client. This provides consistent, authoritative task assignment decisions from a single coordination point and reduces the potential for split-brain scenarios.

  • Faster, more stable rebalances: Reduces rebalance duration and impact by removing the global synchronization point. This helps minimize application downtime during membership changes or failures.

  • Better observability: Provides dedicated metrics and admin interfaces that separate streams from consumer groups, leading to clearer troubleshooting with broker-side observability.

Not included in the current release

This release covers a subset of the functionality detailed in KIP-1071. The following features are not included in this release:

  • Static membership: Setting a client instance.id will be rejected.

  • Topology updates: Updates to existing topologies are not supported. If you change a topology significantly, for example, by adding new source topics or changing the number of sub-topologies, you must create a new streams group.

  • High availability assignor: Only the sticky assignor is supported. Warmup tasks and rack aware assignment are not supported in this release.

  • Regular Expressions: Pattern-based topic subscription is not supported.

  • Online migration: Group migration is available between the classic and new streams protocols, but only as an offline upgrade, which means that all instances must first be stopped, and the group must become empty before it can be restarted with the new protocol enabled. Rolling bounce online upgrade is not supported.

  • Custom Kafka clients: The KafkaClientSupplier interface is not fully supported. Using a custom KafkaClientSupplier can provide only restore/global consumer, producer, and admin client. It’s not possible to provide the “main” consumer when “streams” groups are enabled.

Enable the protocol

Starting with Kafka 4.2, the Streams Rebalance Protocol is enabled by default on new clusters. Both brokers and clients must be running Kafka 4.2 or later to use this protocol.

  • Enable the feature on existing clusters:

    ${CONFLUENT_HOME}/bin/kafka-features.sh --bootstrap-server localhost:9092 upgrade --feature streams.version=1
    
  • Disable the feature:

    ${CONFLUENT_HOME}/bin/kafka-features.sh --bootstrap-server localhost:9092 downgrade --feature streams.version=0
    
  • In your Kafka Streams application configuration, set group.protocol=streams.

Migration from classic protocol

Warning

Due to a critical broker-side bug in the offline migration code (KAFKA-20254), do not attempt to migrate existing groups from the classic protocol to the Streams Rebalance Protocol in this release (Confluent Platform version 8.2.0, Kafka Streams 4.2.0). Newly created streams groups are not impacted.

Only offline migration is supported. To migrate a Kafka Streams application from the classic protocol to the streams rebalance protocol:

  1. Shut down all application instances.

  2. Wait for the session.timeout.ms to expire so the group becomes empty (or force an explicit group leave).

  3. Update the application configuration to set group.protocol=streams.

  4. Restart the application instances.

The only broker-side group data that is preserved are the committed offsets. All other group metadata is recreated when the application starts with the new protocol. Internal topics (changelog and repartition topics) continue to exist as regular Kafka topics.

Similarly, you can convert a streams group back to a classic group by following the same process and setting group.protocol=classic.

Online migration, which means migrating while the application is running, is not available. Plan for a maintenance window when migrating between protocols.

Architecture and how it works

Streams groups

The protocol introduces the concept of a streams group in parallel with a consumer group. Streams clients use a dedicated heartbeat RPC, named StreamsGroupHeartbeat, to join a group, leave a group, and update the group coordinator about its currently owned tasks and its client-specific metadata.

The group coordinator manages a streams group similarly to a consumer group, continuously updating the group member metadata via heartbeat responses and running assignment logic when changes are detected. A new group type named streams is introduced to the group coordinator, with new record key and value types for group metadata, topology metadata, and group member metadata. These records are persisted in the __consumer_offsets topic.

A group can be either a streams group, a share group, or a consumer group, defined by the first heartbeat request using the corresponding GroupId.

Topology Configuration and Validation

To assign tasks among streams clients, the group coordinator uses topology metadata that is initialized when a member joins the group and persisted in the consumer offsets topic.

Whenever a member joins the streams group, the first heartbeat request contains metadata of the topology. The metadata describes the topology as a set of subtopologies, each identified by a unique string identifier and containing metadata relevant for creation of internal topics and assignment.

Topology validation and NOT_READY state

During the handling of the streams group heartbeat, the group coordinator may detect that source/sink or internal topics required by the topology do not exist or differ in their configuration from what is required for the topology to execute successfully. This triggers a “topology configuration” process, in which the group coordinator performs the following steps:

  1. Check that all configured source topics exist.

  2. Check that “copartition groups” are satisfied, which means that all source topics that are supposed to be copartitioned are actually copartitioned.

  3. Derive the required number of partitions for all internal topics from the source topic configuration.

  4. Check that all internal topics exist with the right configuration.

If any source topics or internal topics are missing, the group enters a state NOT_READY. In NOT_READY, all heartbeats are handled as usual, so they typically should not fail. But in the heartbeat response, the status indicates which kind of problem exists. All members get an empty assignment when the group is in the NOT_READY state.

Centralized Assignment Configuration

Core assignment options are configured centrally on the broker, without relying on each client’s configuration. This enables tuning a streams group without redeploying the streams application. The core assignment option introduced on the broker-side is num.standby.replicas. This can be configured both globally on the broker and dynamically for specific streams groups through the IncrementalAlterConfigs and DescribeConfigs RPCs.

The last-used assignment configuration is stored in the group metadata on the broker. This way, if an assignment configuration is dynamically changed, reassignment can be triggered immediately.

Administration

Admin API

Use the “streams groups” methods of the Admin interface to manage streams groups programmatically. These APIs are mostly backed by the same implementations as the consumer group API.

The main differences from consumer group APIs are:

  • The describeStreamsGroups uses the DescribeStreamsGroup RPC and contains different information than consumer groups.

  • A streams group has an extra state, named NOT_READY, and no legacy states from the classic protocol.

  • removeMembersFromConsumerGroup does not have a corresponding API in this version, as it uses the LeaveGroup RPC for classic consumer groups, which is not available for KIP-848-style groups.

kafka-streams-groups tool

Starting in Kafka Streams 4.2 (Confluent Platform 8.2), a tool named kafka-streams-groups is available in ${CONFLUENT_HOME}/bin for working with streams groups. You can use it to list, describe, and delete streams groups.

The kafka-streams-groups tool replaces kafka-streams-application-reset for streams groups.

You can delete a previous consumer group by using kafka-consumer-groups before starting the application with the new protocol, but this also deletes all offsets for that group.

Configuration

Broker configuration

The following broker configurations control the behavior of streams groups.

  • group.coordinator.rebalance.protocols: The list of enabled rebalance protocols. “streams” is included in the list of protocols to enable streams groups.

  • group.streams.session.timeout.ms: The default timeout for all streams group (if not specifically overwritten for a specific streams group) to detect client failures when using the streams group protocol.

  • group.streams.min.session.timeout.ms: The minimum session timeout.

  • group.streams.max.session.timeout.ms: The maximum session timeout.

  • group.streams.heartbeat.interval.ms: The default heartbeat interval given to the members.

  • group.streams.min.heartbeat.interval.ms: The minimum heartbeat interval.

  • group.streams.max.heartbeat.interval.ms: The maximum heartbeat interval.

  • group.streams.max.size: The maximum number of streams clients that a single streams group can accommodate.

  • group.streams.num.standby.replicas: The default number of standby replicas for each task.

  • group.streams.max.standby.replicas: Maximum for dynamic configurations of the standby replica configuration.

  • group.streams.initial.rebalance.delay.ms: The first rebalance of a new (ie, previously empty) group is delayed by this amount to allow more members to join the group.

Group configuration

Configurations for the resource type GROUP are available in DescribeConfigs and IncrementalAlterConfigs to override the default broker configurations dynamically for specific groups. These can be set using the Admin Java interface or the bin/kafka-configs.sh utility.

For complete details, see the group configuration documentation.

The following group-level configurations are available for streams groups:

  • streams.session.timeout.ms: The timeout to detect client failures when using the streams group protocol.

  • streams.heartbeat.interval.ms: The heartbeat interval given to the members.

  • streams.num.standby.replicas: The number of standby replicas for each task.

  • streams.initial.rebalance.delay.ms: The first rebalance of a group is delayed by this amount to allow more members to join the group.

You can use the kafka-configs tool to set these configurations, for example:

${CONFLUENT_HOME}/bin/kafka-configs --bootstrap-server localhost:9092 --alter --entity-type groups
 --entity-name wordcount --add-config streams.num.standby.replicas=1

To operate the new streams groups, you can explore the options of kafka-streams-groups to list, describe, and delete streams groups. In the new protocol, session.timeout.ms, heartbeat.interval.ms, initial.rebalance.delay.ms, and num.standby.replicas are group-level configurations. Setting session.timeout.ms, heartbeat.interval.ms, or num.standby.replicas on the client has no effect, and these settings are ignored. Similarly, the existing cluster configuration, initial.rebalance.delay.ms, does not apply to streams groups.

You can use the kafka-configs tool to set these configurations, for example:

kafka-configs --bootstrap-server localhost:9092 --alter --entity-type groups
 --entity-name wordcount --add-config streams.num.standby.replicas=1

Streams configuration

The following client configuration enables the streams rebalance protocol:

  • group.protocol: A flag which indicates if the streams rebalance protocol should be used. Set to streams to enable. In earlier versions, the default is classic.

Ignored configurations

The following configurations are ignored when the Streams Rebalance Protocol is enabled:

  • acceptable.recovery.lag

  • max.warmup.replicas

  • num.standby.replicas (use group-level configuration instead)

  • probing.rebalance.interval.ms

  • rack.aware.assignment.tags

  • rack.aware.assignment.strategy

  • rack.aware.assignment.traffic_cost

  • rack.aware.assignment.non_overlap_cost

  • task.assignor.class

  • session.timeout.ms (use group-level configuration instead)

  • heartbeat.interval.ms (use group-level configuration instead)

Monitoring and metrics

The existing group metrics are extended to differentiate between streams groups and consumer groups and account for streams group states. For complete details, see the streams groups metrics documentation.

Group count by protocol

Number of groups based on type of protocol, where the list of protocols is extended by the protocol=streams variation:

kafka.server:type=group-coordinator-metrics,name=group-count,protocol={consumer|classic|streams}

Streams group count by state

Number of streams groups based on state:

kafka.server:type=group-coordinator-metrics,name=streams-group-count,state={empty|not_ready|assigning|reconciling|stable|dead}

Streams group rebalances

Streams group rebalances sensor:

kafka.server:type=group-coordinator-metrics,name=streams-group-rebalance-rate

kafka.server:type=group-coordinator-metrics,name=streams-group-rebalance-count

Note

This website includes content developed at the Apache Software Foundation under the terms of the Apache License v2.