Kafka Streams Groups Tool for Kafka Streams in Confluent Platform
Use kafka-streams-groups to manage Streams groups for the Streams Rebalance Protocol (KIP‑1071) to list and describe groups, inspect members and offsets/lag, reset or delete offsets for input topics, and delete groups, optionally including internal topics.
A Streams group is a broker-coordinated group type for Kafka Streams that uses Streams-specific RPCs and metadata. A Streams group is distinct from classic consumer groups. The CLI surfaces Streams-specific states, assignments, and input-topic offsets to simplify visibility and administration.
Important
Use the kafka-streams-groups tool with care. Mutating operations, like offset resets/deletes and group deletion, affect how applications reprocess data when they’re restarted. Always preview with the --dry-run option before executing, and ensure application instances are stopped/inactive and the group is empty before executing the command.
The Streams Groups tool enables the following operations:
List Streams groups across a cluster and display or filter by group state: Empty, Not Ready, Assigning, Reconciling, Stable, Dead.
Describe a Streams group and show:
Group state, group epoch, target assignment epoch. For additional details, use the
--stateand--verboseoptions.Per-member info such as epochs, current vs. target assignments, and whether a member still uses the classic protocol, with
--membersand--verboseoptions.Input-topic offsets and lag with the
--offsetsoption, to understand processing lag.
Reset input-topic offsets for a Streams group to control reprocessing boundaries using precise specifiers: earliest, latest, to-offset, to-datetime, by-duration, shift-by, from-file. Requires the
--dry-runor--executeoptions and inactive instances.Delete offsets for input topics to force re-consumption on next start.
Delete a Streams group to clean up broker-side Streams metadata for offsets, topology, and assignments. Optionally delete all, or a subset of, internal topics at the same time by using
--internal-topics.
Usage
The script is located at ${CONFLUENT_HOME}/bin/kafka-streams-groups and connects to your cluster by using the --bootstrap-server option. For secured clusters, pass AdminClient properties by using --command-config.
${CONFLUENT_HOME}/bin/kafka-streams-groups --bootstrap-server <host:port> [COMMAND] [OPTIONS]
kafka-streams-groups complements the Streams Admin API for Streams groups. The CLI exposes list, describe, delete operations, and offset management similar to consumer-group tools but tailored to Streams groups defined in KIP-1071.
Commands
List Streams groups
Run the following command to list and discover groups.
${CONFLUENT_HOME}/bin/kafka-streams-groups --bootstrap-server localhost:9092 --list
Describe Streams groups
Run the following commands to inspect a group’s state, members, and lag.
# Describe a group: state + epochs
${CONFLUENT_HOME}/bin/kafka-streams-groups --bootstrap-server localhost:9092 \
--describe --group my-streams-app --state --verbose
# Describe a group: members (assignments vs target, classic/streams)
${CONFLUENT_HOME}/bin/kafka-streams-groups --bootstrap-server localhost:9092 \
--describe --group my-streams-app --members --verbose
# Describe a group: input-topic offsets and lag
${CONFLUENT_HOME}/bin/kafka-streams-groups --bootstrap-server localhost:9092 \
--describe --group my-streams-app --offsets
Reset input-topic offsets
Ensure all application instances are stopped/inactive. Always preview changes with --dry-run before using --execute.
Run the following commands to reset input-topic offsets for a Streams group.
# Preview resetting all input topics to a specific timestamp
${CONFLUENT_HOME}/bin/kafka-streams-groups --bootstrap-server localhost:9092 \
--group my-streams-app \
--reset-offsets --all-input-topics --to-datetime 2025-01-31T23:57:00.000 \
--dry-run
# Apply the reset
${CONFLUENT_HOME}/bin/kafka-streams-groups --bootstrap-server localhost:9092 \
--group my-streams-app \
--reset-offsets --all-input-topics --to-datetime 2025-01-31T23:57:00.000 \
--execute
Delete offsets to force re-consumption
Run the following commands to delete offsets for all or specific input topics to have the group re-read data on restart.
# Delete offsets for all input topics (execute)
${CONFLUENT_HOME}/bin/kafka-streams-groups --bootstrap-server localhost:9092 \
--group my-streams-app \
--delete-offsets --all-input-topics --execute
# Delete offsets for specific topics
${CONFLUENT_HOME}/bin/kafka-streams-groups --bootstrap-server localhost:9092 \
--group my-streams-app \
--delete-offsets --topic input-a --topic input-b --execute
Delete a Streams group (cleanup)
Ensure all application instances are stopped/inactive.
Run the following commands to delete broker-side Streams metadata for a group and optionally remove a subset of internal topics.
# Delete Streams group metadata
${CONFLUENT_HOME}/bin/kafka-streams-groups --bootstrap-server localhost:9092 \
--delete --group my-streams-app
# Delete a subset of internal topics alongside the group (use with care)
${CONFLUENT_HOME}/bin/kafka-streams-groups --bootstrap-server localhost:9092 \
--delete --group my-streams-app \
--internal-topics my-app-repartition-0,my-app-changelog
All options and flags
Core actions
--list: List Streams groups. Use--stateto display/filter by state.--describe: Describe a group selected by--group. Combine with:--state(group state and epochs),--members(members and assignments),--offsets(input and repartition topics offsets/lag).--verbosefor additional details (e.g., leader epochs where applicable).
--reset-offsets: Reset input-topic offsets (one group at a time; instances should be inactive). Choose exactly one specifier:--to-earliest,--to-latest,--to-current,--to-offset <n>--by-duration <PnDTnHnMnS>,--to-datetime <YYYY-MM-DDTHH:mm:SS.sss>--shift-by <n>(±),--from-file(CSV)
Scope:
--all-input-topicsor one/more--topic <name>; some builds also support--all-topics(all input topics per broker topology metadata).
Safety:
Requires
--dry-runor--execute.
--delete-offsets: Delete offsets for--all-input-topics, specific--topicnames, or--from-file.--delete: Delete Streams group metadata; optionally pass--internal-topics <list>to delete a subset of internal topics.
Common flags
--group <id>: Target Streams group (application.id).--all-groups: Operate on all groups (allowed with--delete).--bootstrap-server <host:port>: Broker(s) to connect to (required).--command-config <file>: Properties for AdminClient (security, timeouts, etc.).--timeout <ms>: Wait time for group stabilization in some operations (default: 5000ms).--dry-run,--execute: Preview vs apply for mutating operations.--help,--version,--verbose: Usage, version, verbosity.
Best practices and safety
Preview changes with
--dry-runto verify topic scope and impact before--execute.Use
--internal-topicscarefully: deleting internal topics removes state backing topics. Only do this when you intend to rebuild state from input topics.