Kafka Command-Line Interface (CLI) Tools

After you download Apache Kafka® and extract the files, you can access several CLI tools under the /bin directory.

These tools enable you to start and stop Kafka, create and update topics, manage partitions and many more common operations.

You can get details on how to run each tool by by running it with no argument, or by using the --help argument.

Confluent Tip

You can use all of the Kafka CLI tools with Confluent Platform, and they are installed under $CONFLUENT_HOME/bin folder when you install Confluent Platform, along with additional Confluent tools. Confluent has dropped the .sh extensions, so you do not need to use the extensions when calling the Confluent versions of these tools. In addition, when you pass a properties file, remember that Confluent Platform properties files are stored under $CONFLUENT_HOME/etc directory. For more information, see CLI Tools for Confluent Platform.

The following sections group the tools by function and provide basic usage information. In some cases, a tool is listed in more than one section.

Search by tool name

Enter a string to search and filter tools by name.

Manage Kafka and configure metadata

This section contains tools to start Kafka running either ZooKeeper or in KRaft mode, and to manage brokers.

kafka-server-start.sh

Use this tool start a Kafka server. You must pass the path to the properties file you want to use. If you are using ZooKeeper for metadata management, you must start ZooKeeper first. For KRaft mode, first generate a cluster ID and store it in the properties file. For an example of how to start Kafka, see the Kafka quickstart.

kafka-server-stop.sh

Stops the running Kafka server. When you run this tool, you do not need to pass any arguments.

zookeeper-server-start.sh

Starts the ZooKeeper server. ZooKeeper is the default method for metadata management for Kafka versions prior to 3.4. To run this tool, you must pass the path to the ZooKeeper properties file. For an example of how to use this tool, see the Kafka quickstart .

zookeeper-server-stop.sh

Stops the ZooKeeper server. Running this tool does not require arguments.

kafka-storage.sh

This tool is used to generate a Cluster UUID and format storage with the cluster ID when running Kafka in KRaft mode. You must explicitly create a cluster ID for a KRaft cluster, and format the storage specifying that ID.

For example, the following command generates a cluster ID and stores it in a variable named KAFKA_CLUSTER_ID. The next command formats storage with that ID.

KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"

bin/kafka-storage.sh format -t KAFKA_CLUSTER_ID -c config/kraft/server.properties

For another example of how to use this tool, see the Kafka quickstart .

kafka-cluster.sh

Use this tool to get the ID of a cluster or unregister a cluster. The following example shows how to retrieve the cluster ID, which requires a bootstrap-server argument.

bin/kafka-cluster.sh cluster-id --bootstrap-server localhost:9092

The output for this command might look like the following.

Cluster ID: WZEKwK-b123oT3ZOSU0dgw

zookeeper-shell.sh

This tool connects to the ZooKeeper shell.

kafka-features.sh

Manages feature flags to disable or enable functionality at runtime in Kafka. Pass the describe argument to describe the current active feature flags, upgrade to upgrade one or more feature flags, downgrade to downgrade one or more, and disable to disable one or more feature flags, which is the same as downgrading the version to zero.

kafka-broker-api-versions.sh

This tool retrieves and displays Broker information. Example:

bin/kafka-broker-api-versions.sh --bootstrap-server host1:9092

kafka-metadata-quorum.sh

Describes the metadata quorum status.This tool is useful when you are debugging a cluster in KRaft mode. Pass the describe command to describe the current state of the metadata quorum.

The following code example displays a summary of the metadata quorum:

bin/kafka-metadata-quorum.sh --bootstrap-server  host1:9092 describe --status

The output for this command might look like the following.

ClusterId:              fMCL8kv1SWm87L_Md-I2hg
LeaderId:               3002
LeaderEpoch:            2
HighWatermark:          10
MaxFollowerLag:         0
MaxFollowerLagTimeMs:   -1
CurrentVoters:          [3000,3001,3002]
CurrentObservers:       [0,1,2]

kafka-metadata-shell.sh

The kafka-metadata-shell tool enables you to interactively examine the metadata stored in a KRaft cluster.

The following example shows how to open the shell:

kafka-metadata-shell --directory tmp/kraft-combined-logs/_cluster-metadata-0/

After the shell loads, you can explore the contents of the metadata log, and exit. The following code shows an example of this.

Loading...
 [ Kafka Metadata Shell ]
 >> ls
 brokers  configs  features  linkIds  links  shell  topicIds  topics
 >> ls /topics
 test
 >> cat /topics/test/0/data
 {
   "partitionId" : 0,
   "topicId" : "5zoAlv-xEh9xRANKXt1Lbg",
   "replicas" : [ 1 ],
   "isr" : [ 1 ],
   "removingReplicas" : null,
   "addingReplicas" : null,
   "leader" : 1,
   "leaderEpoch" : 0,
   "partitionEpoch" : 0
   }
 >> exit

For more information, see the Kafka Wiki.

kafka-configs.sh

Use this tool to change and describe topic, client, user, broker, ip, or cluster link configuration settings. To change a property, specify the entity-type to the desired entity (topic, broker, user, etc), and use the alter option. The following example shows how you might add the delete.retention configuration property for a topic with kafka-configs.

/bin/kafka-configs.sh --bootstrap-server host1:9092 --entity-type topics --entity-default --alter --add-config delete.retention.ms=172800000

See Topic Operations for more examples of how to work with topics.

zookeeper-security-migration.sh

This tool is used to restrict or provide access to ZooKeeper metadata. The tool updates the ACLs of znodes.

Manage topics, partitions, and replication

kafka-topics.sh

Use to create, delete, or change a topic. You can also use the tool to retrieve a list of topics associated with a Kafka cluster. For more information, see Topic Operations.

Example:

bin/kafka-topics.sh --bootstrap-server host1:9092 --topic test-topic --partitions 3

kafka-configs.sh

Use this tool to change and describe topic, client, user, broker, ip, or cluster link configuration settings. To change a property, specify the entity-type to the desired entity (topic, broker, user, etc), and use the alter option. The following example shows how you might add the delete.retention configuration property for a topic with kafka-configs.

/bin/kafka-configs.sh --bootstrap-server host1:9092 --entity-type topics --entity-default --alter --add-config delete.retention.ms=172800000

See Topic Operations for more examples of how to work with topics.

kafka-get-offsets.sh

A tool to retrieve topic-partition offsets.

kafka-leader-election.sh

This tool attempts to elect a new leader for a set of topic partitions.

Run this tool manually to restore leadership if the auto.leader.rebalance.enable property is set to false.

kafka-transactions.sh

A tool to list and describe transactions. Use to detect and abort hanging transactions. For more information, see Detect and Abort Hanging Transactions

kafka-reassign-partitions.sh

A tool to move topic partitions between replicas You pass a JSON-formatted file to specify the new replicas. To learn more, see Changing the replication factor in the Confluent documentation.

kafka-delete-records.sh

This tool deletes partition records. Use this if a topic receives bad data. Pass a JSON-formatted file that specifies the topic, partition, and offset for data deletion. Data will be deleted up to the offset specified. Example:

bin/kafka-delete-records.sh --bootstrap-server host1:9092 --offset-json-file deleteme.json

kafka-log-dirs.sh

This tool gets a list of replicas per log directory on a broker.

kafka-replica-verification.sh

Used to verify that all replicas of a topic contain the same data. Requires a broker-list parameter that contains a comma-separated list of <hostname:port> entries specifying the server/port to connect to.

kafka-mirror-maker.sh

DEPRECATED: For an alternative, see connect-mirror-maker.sh. Enables the creation of a replica of an existing Kafka cluster. Example: bin/connect-mirror-maker.sh connect-mirror-maker.properties --clusters secondary. Learn more Kafka mirroring

connect-mirror-maker.sh

Replicates topics from one cluster to another using the Connect framework. You must pass an an mm2.properties MM2 configuration file. For more information, see KIP-382: MirrorMaker 2.0

Client, producer, and consumer tools

kafka-verifiable-consumer.sh

This tool consumes messages from a topic and emits consumer events as JSON objects to STDOUT. For example, group rebalances, received messages, and offsets committed. Intended for internal testing.

kafka-configs.sh

Use this tool to change and describe topic, client, user, broker, ip, or cluster link configuration settings. To change a property, specify the entity-type to the desired entity (topic, broker, user, etc), and use the alter option. The following example shows how you might add the delete.retention configuration property for a topic with kafka-configs.

/bin/kafka-configs.sh --bootstrap-server host1:9092 --entity-type topics --entity-default --alter --add-config delete.retention.ms=172800000

See Topic Operations for more examples of how to work with topics.

kafka-verifiable-producer.sh

Used for internal testing. This tool produces increasing integers to the specified topic and prints JSON metadata to STDOUT on each send request. This tool shows which messages have been acked and which have not.

kafka-console-consumer.sh

A tool that enables you to consume records from a topic. Requires bootstrap-server parameter that contains a comma-separated list of <hostname:port> entries specifying the server/port to connect to.

Example:

bin/kafka-console-consumer.sh --bootstrap-server HOST1:PORT1,HOST2:PORT2 --consumer.config config.properties --topic testTopic --property "print.key=true"

kafka-console-producer.sh

Use to produce records to a topic Requires a bootstrap-server parameter that contains a comma-separated list of <hostname:port> entries specifying the server/port to connect to. Example:

kafka-console-producer.sh --bootstrap-server HOST1:PORT1,HOST2:PORT2 --producer.config config.properties --topic testTopic --property "parse.key=true" --property "key.separator=:"

kafka-producer-perf-test.sh

Enables you to produce a large quantity of data to test producer performance for the Kafka cluster.

Example:

bin/kafka-producer-perf-test.sh --topic topic-a --num-records 200000 --record-size 1000 --throughput 10000000 --producer-props bootstrap.servers=host1:9092

kafka-consumer-groups.sh

This tool gets a list of the active groups in the cluster.

For example, to show the position of all consumers in a group named user-group, you might use the following command.

bin/kafka-consumer-groups.sh \
         --bootstrap-server localhost:9092 \
         --describe --group user-group

This would result in output like the following (CONSUMER-ID entries truncated for readability).

TOPIC          PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG CONSUMER-ID       HOST         CLIENT-ID
user           0          2               4               2   consumer-1-...    /127.0.0.1   consumer-1
user           1          2               3               1   consumer-1-...    /127.0.0.1   consumer-1
user           2          2               3               1   consumer-2-...    /127.0.0.1   consumer-2

For more examples, see Manage Consumer Groups.

kafka-consumer-perf-test.sh

This tool tests the consumer performance for the Kafka cluster.

Manage Kafka Connect

connect-distributed.sh

Runs Connect workers in Distributed mode, meaning on multiple, distributed, machines. Distributed mode handles automatic balancing of work, allows you to scale up (or down) dynamically, and offers fault tolerance both in the active tasks and for configuration and offset commit data.

connect-standalone.sh

Runs Connect in standalone mode meaning all work is performed in a single process. This is good for getting started but lacks fault tolerance. For more information, see Kafka Connect

connect-mirror-maker.sh

Replicates topics from one cluster to another using the Connect framework. You must pass an an mm2.properties MM2 configuration file. For more information, see KIP-382: MirrorMaker 2.0

Manage Kafka Streams

kafka-streams-application-reset.sh

For Kafka Streams applications, this tool resets the application and forces it to reprocess its data from the beginning. Useful for debugging and testing.

For example, the following command would reset the my-streams-app application:

kafka-streams-application-reset.sh --application-id my-streams-app \
                                   --input-topics my-input-topic \
                                   --intermediate-topics rekeyed-topic

For more information, see Kafka Streams Application Reset Tool in the Confluent documentation.

Manage security

kafka-acls

Tool for adding, removing, and listing ACLs. For example, if you wanted to add two principal users, Jose and Jane to have read and write permissions on the user topic from specific IP addresses, you could use a command like the following:

bin/kafka-acls.sh --bootstrap-server localhost:9092 --add --allow-principal User:Jose --allow-principal User:Jane --allow-host 198.51.100.0 --allow-host 198.51.100.1 --operation Read --operation Write --topic user

For more information, see ACL Command-line interface.

kafka-delegation-tokens.sh

This tool creates, renews, expires, and describes delegation tokens. For more information, see Authentication using Delegation Tokens in the Confluent Documentation.

zookeeper-security-migration.sh

This tool is used to restrict or provide access to ZooKeeper metadata. The tool updates the ACLs of znodes.

Test and troubleshoot

This section contains tools you can use for testing and troubleshooting your applications.

kafka-dump-log.sh

This tool can be used in KRaft mode to parse a metadata log file and output its contents to the console. Requires a comma-separated list of log files. The tool will scan the provided files and decode the metadata records.

The following example shows using the cluster-metadata-decoder argument to decode the metadata records in a log segment.

bin/kafka-dump-log.sh --cluster-metadata-decoder --files tmp/kraft-combined-logs/_cluster_metadata-0/00000000000000023946.log

trogdor.sh

Trogdor is a test framework for Kafka. Trogdor can run benchmarks and other workloads. Trogdor can also inject faults in order to stress test the system. For more information, see Trogdor and TROGDOR

kafka-run-class.sh

This tool is a thin wrapper around the Kafka Java class. It is called by other tools, and should not be run or modified directly.