Kafka Command-Line Interface (CLI) Tools

Apache Kafka® provides a suite of command-line interface (CLI) tools that can be accessed from the /bin directory after downloading and extracting the Kafka files. These tools offer a range of capabilities, including starting and stopping Kafka, managing topics, and handling partitions. To learn how to use each tool, simply run it with no argument or use the --help argument for detailed instructions.

Confluent Tip

You can use all of the Kafka CLI tools with Confluent Platform, and they are installed under $CONFLUENT_HOME/bin folder when you install Confluent Platform, along with additional Confluent tools. Confluent has dropped the .sh extensions, so you do not need to use the extensions when calling the Confluent versions of these tools. In addition, when you pass a properties file, remember that Confluent Platform properties files are stored under $CONFLUENT_HOME/etc directory. For more information, see CLI Tools for Confluent Platform.

The following sections group the tools by function and provide basic usage information. In some cases, a tool is listed in more than one section.

Search by tool name

Enter a string to search and filter tools by name.

Manage Kafka and configure metadata

This section contains tools to start Kafka running either ZooKeeper or in KRaft mode, and to manage brokers.

kafka-server-start.sh

Use the kafka-server-start tool to start a Kafka server. You must pass the path to the properties file you want to use. If you are using ZooKeeper for metadata management, you must start ZooKeeper first. For KRaft mode, first generate a cluster ID and store it in the properties file. For an example of how to start Kafka, see the Kafka quickstart.

kafka-server-stop.sh

Use the kafka-server-stop tool to stop the running Kafka server. When you run this tool, you do not need to pass any arguments.

zookeeper-server-start.sh

Use the zookeeper-server-start tool to start the ZooKeeper server. ZooKeeper is the default method for metadata management for Kafka versions prior to 3.4. To run this tool, you must pass the path to the ZooKeeper properties file. For an example of how to use this tool, see the Kafka quickstart .

zookeeper-server-stop.sh

Use the zookeeper-server-stop tool to stop the ZooKeeper server. Running this tool does not require arguments.

kafka-storage.sh

Use the kafka-storage tool to generate a Cluster UUID and format storage with the generated UUID when running Kafka in KRaft mode. You must explicitly create a cluster ID for a KRaft cluster, and format the storage specifying that ID.

For example, the following command generates a cluster ID and stores it in a variable named KAFKA_CLUSTER_ID. The next command formats storage with that ID.

KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"

bin/kafka-storage.sh format -t KAFKA_CLUSTER_ID -c config/kraft/server.properties

For another example of how to use this tool, see the Kafka quickstart .

kafka-cluster.sh

Use the kafka-cluster tool to get the ID of a cluster or unregister a cluster. The following example shows how to retrieve the cluster ID, which requires a bootstrap-server argument.

bin/kafka-cluster.sh cluster-id --bootstrap-server localhost:9092

The output for this command might look like the following.

Cluster ID: WZEKwK-b123oT3ZOSU0dgw

zookeeper-shell.sh

Use the zookeeper-shell tool to connect to the interactive ZooKeeper shell.

Following is an example of how to connect to the ZooKeeper shell:

bin/zookeeper-shell.sh localhost:2181

Your results might look like the following:

Welcome to ZooKeeper!
JLine support is disabled

kafka-features.sh

Use the kafka-features tool to manage feature flags to disable or enable functionality at runtime in Kafka. Pass the describe argument to describe the current active feature flags, upgrade to upgrade one or more feature flags, downgrade to downgrade one or more, and disable to disable one or more feature flags, which is the same as downgrading the version to zero.

kafka-broker-api-versions.sh

The kafka-broker-api-versions tool retrieves and displays broker information. For example, the following command outputs the version of Kafka that is running on the broker:

bin/kafka-broker-api-versions.sh --bootstrap-server host1:9092 --version

This command might have the following output:

3.3.1 (Commit:e23c59d00e687ff5)

kafka-metadata-quorum.sh

Use the kafka-metadata-quorum tool to query the metadata quorum status. This tool is useful when you are debugging a cluster in KRaft mode. Pass the describe command to describe the current state of the metadata quorum.

The following code example displays a summary of the metadata quorum:

bin/kafka-metadata-quorum.sh --bootstrap-server  host1:9092 describe --status

The output for this command might look like the following.

ClusterId:              fMCL8kv1SWm87L_Md-I2hg
LeaderId:               3002
LeaderEpoch:            2
HighWatermark:          10
MaxFollowerLag:         0
MaxFollowerLagTimeMs:   -1
CurrentVoters:          [3000,3001,3002]
CurrentObservers:       [0,1,2]

kafka-metadata-shell.sh

The kafka-metadata-shell tool enables you to interactively examine the metadata stored in a KRaft cluster.

The following example shows how to open the shell:

kafka-metadata-shell.sh --directory tmp/kraft-combined-logs/_cluster-metadata-0/

After the shell loads, you can explore the contents of the metadata log, and exit. The following code shows an example of this.

Loading...
 [ Kafka Metadata Shell ]
 >> ls
 brokers  configs  features  linkIds  links  shell  topicIds  topics
 >> ls /topics
 test
 >> cat /topics/test/0/data
 {
   "partitionId" : 0,
   "topicId" : "5zoAlv-xEh9xRANKXt1Lbg",
   "replicas" : [ 1 ],
   "isr" : [ 1 ],
   "removingReplicas" : null,
   "addingReplicas" : null,
   "leader" : 1,
   "leaderEpoch" : 0,
   "partitionEpoch" : 0
   }
 >> exit

For more information, see the Kafka Wiki.

kafka-configs.sh

Use the kafka-configs tool to change and describe topic, client, user, broker, or IP configuration settings. To change a property, specify the entity-type to the desired entity (topic, broker, user, etc), and use the alter option. The following example shows how you might add the delete.retention configuration property for a topic with kafka-configs.

/bin/kafka-configs.sh --bootstrap-server host1:9092 --entity-type topics --entity-default --alter --add-config delete.retention.ms=172800000

See Topic Operations for more examples of how to work with topics.

zookeeper-security-migration.sh

Use the zookeeper-security-migration tool to restrict or provide access to ZooKeeper metadata. The tool updates the ACLs of znodes.

Manage topics, partitions, and replication

kafka-topics.sh

Use the kafka-topics tool to create or delete a topic. You can also use the tool to retrieve a list of topics associated with a Kafka cluster. For more information, see Topic Operations.

To change a topic, see kafka-configs.sh, or how to modify a topic.

Example:

bin/kafka-topics.sh --bootstrap-server host1:9092 --topic test-topic --partitions 3

kafka-configs.sh

Use the kafka-configs tool to change and describe topic, client, user, broker, or IP configuration settings. To change a property, specify the entity-type to the desired entity (topic, broker, user, etc), and use the alter option. The following example shows how you might add the delete.retention configuration property for a topic with kafka-configs.

/bin/kafka-configs.sh --bootstrap-server host1:9092 --entity-type topics --entity-default --alter --add-config delete.retention.ms=172800000

See Topic Operations for more examples of how to work with topics.

kafka-get-offsets.sh

Use the kafka-get-offsets tool to retrieve topic-partition offsets.

kafka-leader-election.sh

Use the kafka-leader-election tool to attempt to elect a new leader for a set of topic partitions.

Run this tool manually to restore leadership if the auto.leader.rebalance.enable property is set to false.

kafka-transactions.sh

Use the kafka-transactions tool to list and describe transactions. Use to detect and abort hanging transactions. For more information, see Detect and Abort Hanging Transactions

kafka-reassign-partitions.sh

Use the kafka-reassign-partitions to move topic partitions between replicas You pass a JSON-formatted file to specify the new replicas. To learn more, see Changing the replication factor in the Confluent documentation.

kafka-delete-records.sh

Use the kafka-delete-records tool to delete partition records. Use this if a topic receives bad data. Pass a JSON-formatted file that specifies the topic, partition, and offset for data deletion. Data will be deleted up to the offset specified. Example:

bin/kafka-delete-records.sh --bootstrap-server host1:9092 --offset-json-file deleteme.json

kafka-log-dirs.sh

Use the kafka-log-dirs tool to get a list of replicas per log directory on a broker.

kafka-replica-verification.sh

Use the kafka-replica-verification tool to verify that all replicas of a topic contain the same data. Requires a broker-list parameter that contains a comma-separated list of <hostname:port> entries specifying the server/port to connect to.

kafka-mirror-maker.sh

DEPRECATED: For an alternative, see connect-mirror-maker.sh. Enables the creation of a replica of an existing Kafka cluster. Example: bin/connect-mirror-maker.sh connect-mirror-maker.properties --clusters secondary. Learn more Kafka mirroring

connect-mirror-maker.sh

Use the connect-mirror-maker tool to replicate topics from one cluster to another using the Connect framework. You must pass an an mm2.properties MM2 configuration file. For more information, see KIP-382: MirrorMaker 2.0 or Getting up to speed with MirrorMaker 2.

Client, producer, and consumer tools

kafka-verifiable-consumer.sh

The kafka-verifiable-consumer tool consumes messages from a topic and emits consumer events as JSON objects to STDOUT. For example, group rebalances, received messages, and offsets committed. Intended for internal testing.

kafka-configs.sh

Use the kafka-configs tool to change and describe topic, client, user, broker, or IP configuration settings. To change a property, specify the entity-type to the desired entity (topic, broker, user, etc), and use the alter option. The following example shows how you might add the delete.retention configuration property for a topic with kafka-configs.

/bin/kafka-configs.sh --bootstrap-server host1:9092 --entity-type topics --entity-default --alter --add-config delete.retention.ms=172800000

See Topic Operations for more examples of how to work with topics.

kafka-verifiable-producer.sh

The kafka-verifiable-producer tool produces increasing integers to the specified topic and prints JSON metadata to STDOUT on each send request. This tool shows which messages have been acked and which have not. This tool is intended for internal testing.

kafka-console-consumer.sh

Use the kafka-console-consumer tool to consume records from a topic. Requires bootstrap-server parameter that contains a comma-separated list of <hostname:port> entries specifying the server/port to connect to.

Confluent Tip

If you are using Confluent, you can use the Confluent CLI and the kafka topic command to produce and consume from a topic.

Example:

bin/kafka-console-consumer.sh --bootstrap-server HOST1:PORT1,HOST2:PORT2 --consumer.config config.properties --topic testTopic --property "print.key=true"

kafka-console-producer.sh

Use the kafka-console-producer tool to produce records to a topic. Requires a bootstrap-server parameter that contains a comma-separated list of <hostname:port> entries specifying the server/port to connect to. Example:

kafka-console-producer.sh --bootstrap-server HOST1:PORT1,HOST2:PORT2 --producer.config config.properties --topic testTopic --property "parse.key=true" --property "key.separator=:"

Confluent Tip

If you are using Confluent, you can use the Confluent CLI and the kafka topic command to produce and consume from a topic.

kafka-producer-perf-test.sh

The kafka-producer-perf-test tool enables you to produce a large quantity of data to test producer performance for the Kafka cluster.

Example:

bin/kafka-producer-perf-test.sh --topic topic-a --num-records 200000 --record-size 1000 --throughput 10000000 --producer-props bootstrap.servers=host1:9092

kafka-consumer-groups.sh

Use the kafka-consumer-groups tool to get a list of the active groups in the cluster.

For example, to show the position of all consumers in a group named user-group, you might use the following command.

bin/kafka-consumer-groups.sh \
         --bootstrap-server localhost:9092 \
         --describe --group user-group

This would result in output like the following (CONSUMER-ID entries truncated for readability).

TOPIC          PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG CONSUMER-ID       HOST         CLIENT-ID
user           0          2               4               2   consumer-1-...    /127.0.0.1   consumer-1
user           1          2               3               1   consumer-1-...    /127.0.0.1   consumer-1
user           2          2               3               1   consumer-2-...    /127.0.0.1   consumer-2

For more examples, see Manage Consumer Groups.

kafka-consumer-perf-test.sh

This tool tests the consumer performance for the Kafka cluster.

Manage Kafka Connect

connect-distributed.sh

Use the connect-distributed tool to run Connect workers in Distributed mode, meaning on multiple, distributed, machines. Distributed mode handles automatic balancing of work, allows you to scale up (or down) dynamically, and offers fault tolerance both in the active tasks and for configuration and offset commit data.

connect-standalone.sh

Use the connect-standalone tool to run Kafka Connect in standalone mode meaning all work is performed in a single process. This is good for getting started but lacks fault tolerance. For more information, see Kafka Connect

connect-mirror-maker.sh

Use the connect-mirror-maker tool to replicate topics from one cluster to another using the Connect framework. You must pass an an mm2.properties MM2 configuration file. For more information, see KIP-382: MirrorMaker 2.0 or Getting up to speed with MirrorMaker 2.

Manage Kafka Streams

kafka-streams-application-reset.sh

For Kafka Streams applications, the kafka-streams-application-reset tool resets the application and forces it to reprocess its data from the beginning. Useful for debugging and testing.

For example, the following command would reset the my-streams-app application:

kafka-streams-application-reset.sh --application-id my-streams-app \
                                   --input-topics my-input-topic \
                                   --intermediate-topics rekeyed-topic

For more information, see Kafka Streams Application Reset Tool in the Confluent documentation.

Manage security

kafka-acls

Use the kafka-acls tool to add, remove and list ACLs. For example, if you wanted to add two principal users, Jose and Jane to have read and write permissions on the user topic from specific IP addresses, you could use a command like the following:

bin/kafka-acls.sh --bootstrap-server localhost:9092 --add --allow-principal User:Jose --allow-principal User:Jane --allow-host 198.51.100.0 --allow-host 198.51.100.1 --operation Read --operation Write --topic user

For more information, see ACL Command-line interface.

kafka-delegation-tokens.sh

Use the kafka-delegation-tokens tool to create, renew, expire and describe delegation tokens. Delegation tokens are shared secrets between Kafka brokers and clients, and are a lightweight authentication mechanism meant to complement existing SASL/SSL methods. For more information, see Authentication using Delegation Tokens in the Confluent Documentation.

zookeeper-security-migration.sh

Use the zookeeper-security-migration tool to restrict or provide access to ZooKeeper metadata. The tool updates the ACLs of znodes.

Test and troubleshoot

This section contains tools you can use for testing and troubleshooting your applications.

kafka-e2e-latency.sh

The kafka-e2e-latency tool is a performance testing tool used to measure end-to-end latency in Kafka. It works by sending messages to a Kafka topic and then consuming those messages from a Kafka consumer. The tool calculates the time difference between when a message was produced and when it was consumed, giving you an idea of the end-to-end latency for your Kafka cluster. This tool is useful for testing the performance of your Kafka cluster and identifying any bottlenecks or issues that may be affecting latency.

To run the tool, you provide details such as the message size, number of messages and acks setting for the producer. For more about end-to-end latency, see Configure Kafka to Minimize Latency.

Following are the required arguments
  • broker_list: The location of the bootstrap broker for both the producer and the consumer
  • topic: The topic name used by both the producer and the consumer to send/receive messages
  • num_messages: The number of messages to send
  • producer_acks: The producer setting for acks.
  • message_size_bytes: size of each message in bytes

For example:

kafka-e2e-latency.sh localhost:9092 test 10000 1 20

kafka-dump-log.sh

The kafka-dump-log tool can be used in KRaft mode to parse a metadata log file and output its contents to the console. Requires a comma-separated list of log files. The tool will scan the provided files and decode the metadata records.

The following example shows using the cluster-metadata-decoder argument to decode the metadata records in a log segment.

bin/kafka-dump-log.sh --cluster-metadata-decoder --files tmp/kraft-combined-logs/_cluster_metadata-0/00000000000000023946.log

kafka-jmx.sh

The kafka-jmx tool enables you to read JMX metrics from a given endpoint. This tool only works reliably if the JmxServer is fully initialized prior to invoking the tool.

trogdor.sh

Trogdor is a test framework for Kafka. Trogdor can run benchmarks and other workloads. Trogdor can also inject faults in order to stress test the system. For more information, see Trogdor and TROGDOR

kafka-run-class.sh

This kafka-run-class tool is a thin wrapper around the Kafka Java class. It is called by other tools, and should not be run or modified directly.