Kafka Streams Groups Tool for Kafka Streams in Confluent Platform

Use kafka-streams-groups to manage Streams groups for the Streams Rebalance Protocol (KIP‑1071) to list and describe groups, inspect members and offsets/lag, reset or delete offsets for input topics, and delete groups, optionally including internal topics.

A Streams group is a broker-coordinated group type for Kafka Streams that uses Streams-specific RPCs and metadata. A Streams group is distinct from classic consumer groups. The CLI surfaces Streams-specific states, assignments, and input-topic offsets to simplify visibility and administration.

Important

Use the kafka-streams-groups tool with care. Mutating operations, like offset resets/deletes and group deletion, affect how applications reprocess data when they’re restarted. Always preview with the --dry-run option before executing, and ensure application instances are stopped/inactive and the group is empty before executing the command.

The Streams Groups tool enables the following operations:

  • List Streams groups across a cluster and display or filter by group state: Empty, Not Ready, Assigning, Reconciling, Stable, Dead.

  • Describe a Streams group and show:

    • Group state, group epoch, target assignment epoch. For additional details, use the --state and --verbose options.

    • Per-member info such as epochs, current vs. target assignments, and whether a member still uses the classic protocol, with --members and --verbose options.

    • Input-topic offsets and lag with the --offsets option, to understand processing lag.

  • Reset input-topic offsets for a Streams group to control reprocessing boundaries using precise specifiers: earliest, latest, to-offset, to-datetime, by-duration, shift-by, from-file. Requires the --dry-run or --execute options and inactive instances.

  • Delete offsets for input topics to force re-consumption on next start.

  • Delete a Streams group to clean up broker-side Streams metadata for offsets, topology, and assignments. Optionally delete all, or a subset of, internal topics at the same time by using --internal-topics.

Usage

The script is located at ${CONFLUENT_HOME}/bin/kafka-streams-groups and connects to your cluster by using the --bootstrap-server option. For secured clusters, pass AdminClient properties by using --command-config.

${CONFLUENT_HOME}/bin/kafka-streams-groups --bootstrap-server <host:port> [COMMAND] [OPTIONS]

kafka-streams-groups complements the Streams Admin API for Streams groups. The CLI exposes list, describe, delete operations, and offset management similar to consumer-group tools but tailored to Streams groups defined in KIP-1071.

Commands

List Streams groups

Run the following command to list and discover groups.

${CONFLUENT_HOME}/bin/kafka-streams-groups --bootstrap-server localhost:9092 --list

Describe Streams groups

Run the following commands to inspect a group’s state, members, and lag.

# Describe a group: state + epochs
${CONFLUENT_HOME}/bin/kafka-streams-groups --bootstrap-server localhost:9092 \
  --describe --group my-streams-app --state --verbose

# Describe a group: members (assignments vs target, classic/streams)
${CONFLUENT_HOME}/bin/kafka-streams-groups --bootstrap-server localhost:9092 \
  --describe --group my-streams-app --members --verbose

# Describe a group: input-topic offsets and lag
${CONFLUENT_HOME}/bin/kafka-streams-groups --bootstrap-server localhost:9092 \
  --describe --group my-streams-app --offsets

Reset input-topic offsets

Ensure all application instances are stopped/inactive. Always preview changes with --dry-run before using --execute.

Run the following commands to reset input-topic offsets for a Streams group.

# Preview resetting all input topics to a specific timestamp
${CONFLUENT_HOME}/bin/kafka-streams-groups --bootstrap-server localhost:9092 \
  --group my-streams-app \
  --reset-offsets --all-input-topics --to-datetime 2025-01-31T23:57:00.000 \
  --dry-run

# Apply the reset
${CONFLUENT_HOME}/bin/kafka-streams-groups --bootstrap-server localhost:9092 \
  --group my-streams-app \
  --reset-offsets --all-input-topics --to-datetime 2025-01-31T23:57:00.000 \
  --execute

Delete offsets to force re-consumption

Run the following commands to delete offsets for all or specific input topics to have the group re-read data on restart.

# Delete offsets for all input topics (execute)
${CONFLUENT_HOME}/bin/kafka-streams-groups --bootstrap-server localhost:9092 \
  --group my-streams-app \
  --delete-offsets --all-input-topics --execute

# Delete offsets for specific topics
${CONFLUENT_HOME}/bin/kafka-streams-groups --bootstrap-server localhost:9092 \
  --group my-streams-app \
  --delete-offsets --topic input-a --topic input-b --execute

Delete a Streams group (cleanup)

Ensure all application instances are stopped/inactive.

Run the following commands to delete broker-side Streams metadata for a group and optionally remove a subset of internal topics.

# Delete Streams group metadata
${CONFLUENT_HOME}/bin/kafka-streams-groups --bootstrap-server localhost:9092 \
  --delete --group my-streams-app

# Delete a subset of internal topics alongside the group (use with care)
${CONFLUENT_HOME}/bin/kafka-streams-groups --bootstrap-server localhost:9092 \
  --delete --group my-streams-app \
  --internal-topics my-app-repartition-0,my-app-changelog

All options and flags

Core actions

  • --list: List Streams groups. Use --state to display/filter by state.

  • --describe: Describe a group selected by --group. Combine with:

    • --state (group state and epochs), --members (members and assignments), --offsets (input and repartition topics offsets/lag).

    • --verbose for additional details (e.g., leader epochs where applicable).

  • --reset-offsets: Reset input-topic offsets (one group at a time; instances should be inactive). Choose exactly one specifier:

    • --to-earliest, --to-latest, --to-current, --to-offset <n>

    • --by-duration <PnDTnHnMnS>, --to-datetime <YYYY-MM-DDTHH:mm:SS.sss>

    • --shift-by <n> (±), --from-file (CSV)

    Scope:

    • --all-input-topics or one/more --topic <name>; some builds also support --all-topics (all input topics per broker topology metadata).

    Safety:

    • Requires --dry-run or --execute.

  • --delete-offsets: Delete offsets for --all-input-topics, specific --topic names, or --from-file.

  • --delete: Delete Streams group metadata; optionally pass --internal-topics <list> to delete a subset of internal topics.

Common flags

  • --group <id>: Target Streams group (application.id).

  • --all-groups: Operate on all groups (allowed with --delete).

  • --bootstrap-server <host:port>: Broker(s) to connect to (required).

  • --command-config <file>: Properties for AdminClient (security, timeouts, etc.).

  • --timeout <ms>: Wait time for group stabilization in some operations (default: 5000ms).

  • --dry-run, --execute: Preview vs apply for mutating operations.

  • --help, --version, --verbose: Usage, version, verbosity.

Best practices and safety

  • Preview changes with --dry-run to verify topic scope and impact before --execute.

  • Use --internal-topics carefully: deleting internal topics removes state backing topics. Only do this when you intend to rebuild state from input topics.