Kafka Command-Line Interface (CLI) Tools
Apache Kafka® is an open-source distributed streaming system used for stream processing, real-time data pipelines, and data integration at scale.
Kafka provides a suite of command-line interface (CLI) tools that can be accessed from the /bin directory after downloading Kafka and extracting the files. These tools offer a range of capabilities, including starting and stopping Kafka, managing topics, and handling partitions. To learn how to use each tool, simply run it with no argument or use the --help argument for detailed instructions.
Use Kafka CLI Tools with Confluent Cloud or Confluent Platform
Quickly manage and interact with your Kafka cluster using built-in Kafka CLI tools. Sign up for Confluent Cloud to start with $400 in free credits, or download Confluent Platform for local development.
To use the Kafka tools with Confluent Cloud, see How to Use Kafka Tools With Confluent Cloud.
You can use all of the Kafka CLI tools with Confluent Platform. They are installed under $CONFLUENT_HOME/bin folder when you install Confluent Platform, along with additional Confluent tools. Confluent has dropped the .sh extensions, so you do not need to use the extensions when calling the Confluent versions of these tools. In addition, when you pass a properties file, remember that Confluent Platform properties files are stored under $CONFLUENT_HOME/etc directory. For more information, see CLI Tools for Confluent Platform.
The following sections group the tools by function and provide basic usage information. In some cases, a tool is listed in more than one section.
Search by tool name
Enter a string to search and filter tools by name.
Manage Kafka and configure metadata
This section contains tools to start Kafka running in KRaft mode and to manage brokers.
kafka-server-start.sh
Use the kafka-server-start tool to start a Kafka server. You must pass the path to the properties file you want to use. To start Kafka in KRaft mode, first generate a cluster ID and store it in the properties file.
Usage details
USAGE: ./kafka-server-start.sh [-daemon] server.properties [--override property=value]*
Option Description
------ -----------
--override <String> Optional property that should override values set in
server.properties file
--version Print version information and exit.
kafka-server-stop.sh
Use the kafka-server-stop tool to stop the running Kafka server. When you run this tool, you do not need to have any arguments, but starting with Kafka 3.7 you can optionally specify either a process-role value of broker or controller or a node-id value indicating the node you want to stop.
For example, to stop all brokers, you would use the following command:
./bin/kafka-server-stop.sh --process-role=broker
To stop node 1234, you would use the following command.
./bin/kafka-server-stop.sh --node-id=1234
Usage details
USAGE: ./kafka-server-stop.sh {[--process-role=value] | [--node-id=value]}
kafka-storage.sh
Use the kafka-storage tool to generate a Cluster UUID and format storage with the generated UUID when running Kafka in KRaft mode. You must explicitly create a cluster ID for a KRaft cluster, and format the storage specifying that ID.
For example, the following command generates a cluster ID and stores it in a variable named KAFKA_CLUSTER_ID. The next command formats storage with that ID.
KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"
bin/kafka-storage.sh format -t KAFKA_CLUSTER_ID -c config/kraft/server.properties
Usage details
USAGE: kafka-storage.sh [-h] {info,format,version-mapping,feature-dependencies,random-uuid} ...
The Kafka storage tool.
positional arguments:
{info,format,version-mapping,feature-dependencies,random-uuid}
info Get information about the Kafka log directories on this node.
format Format the Kafka log directories on this node.
version-mapping Look up the corresponding features for a given metadata version. Using the command with no
``--release-version`` argument will return the mapping for the latest stable metadata version.
feature-dependencies Look up dependencies for a given feature version. If the feature is not known or the version
not yet defined, an error is thrown. Multiple features can be specified.
random-uuid Print a random UUID.
optional arguments:
--config CONFIG, -c CONFIG The Kafka configuration file to use.
--cluster-id CLUSTER_ID, -t CLUSTER_ID The cluster ID to use.
--add-scram ADD_SCRAM, -S ADD_SCRAM A SCRAM_CREDENTIAL to add to the __cluster_metadata log e.g.
'SCRAM-SHA-256=[name=alice,password=alice-secret]'
'SCRAM-SHA-512=[name=alice,iterations=8192,salt="N3E=",saltedpassword="YCE="]'
--ignore-formatted, -g When this option is passed, the format command will skip over already formatted
directories rather than failing.
--release-version The release version to use for the initial feature settings. The minimum is 3.3-IV3; the
RELEASE_VERSION, -r default is 4.0-IV3
RELEASE_VERSION
--feature FEATURE, The setting to use for a specific feature, in feature=level format. For example: ``kraft.version=1``.
-f FEATURE
--standalone, -s Used to initialize a controller as a single-node dynamic quorum.
--no-initial-controllers, -N Used to initialize a server without a dynamic quorum topology.
--initial-controllers Used to initialize a server with a specific dynamic quorum topology. The argument is a
INITIAL_CONTROLLERS, comma-separated list of id@hostname:port:directory. The same values must be used to
-I INITIAL_CONTROLLERS format all nodes. For example:
0@example.com:8082:JEXY6aqzQY-32P5TStzaFg,1@example.com:8083:MvDxzVmcRsaTz33bUuRU6A,
2@example.com:8084:07R5amHmR32VDA6jHkGbTA
-h, --help show this help message and exit
kafka-cluster.sh
Use the kafka-cluster tool to get the ID of a cluster or unregister a cluster. The following example shows how to retrieve the cluster ID, which requires a bootstrap-server argument.
bin/kafka-cluster.sh cluster-id --bootstrap-server localhost:9092
The output for this command might look like the following.
Cluster ID: WZEKwK-b123oT3ZOSU0dgw
Usage details
USAGE: kafka-cluster.sh [-h] {cluster-id,unregister,list-endpoints} ...
The Kafka cluster tool.
positional arguments:
{cluster-id,unregister,list-endpoints}
cluster-id Get information about the ID of a cluster.
unregister Unregister a broker.
list-endpoints List endpoints
optional arguments:
-h, --help show this help message and exit
kafka-features.sh
Use the kafka-features tool to manage feature flags to disable or enable functionality at runtime in Kafka. Pass the describe argument to describe the current active feature flags, upgrade to upgrade one or more feature flags, downgrade to downgrade one or more, and disable to disable one or more feature flags, which is the same as downgrading the version to zero.
Usage details
usage: kafka-features [-h] [--command-config COMMAND_CONFIG] (--bootstrap-server BOOTSTRAP_SERVER |
--bootstrap-controller BOOTSTRAP_CONTROLLER) {describe,upgrade,downgrade,disable,version-mapping,feature-dependencies} ...
This tool manages feature flags in Kafka.
positional arguments:
{describe,upgrade,downgrade,disable,version-mapping,feature-dependencies}
describe Describes the current active feature flags.
upgrade Upgrade one or more feature flags.
downgrade Downgrade one or more feature flags.
disable Disable one or more feature flags. This is the same as downgrading the version to zero.
version-mapping Look up the corresponding features for a given metadata version. Using the command with no
--release-version argument will return the mapping for the latest stable metadata version
feature-dependencies Look up dependencies for a given feature version. If the feature is not known or the version
not yet defined, an error is thrown. Multiple features can be specified.
optional arguments:
-h, --help show this help message and exit
--bootstrap-server BOOTSTRAP_SERVER
A comma-separated list of host:port pairs to use for establishing the connection to the
Kafka cluster.
--bootstrap-controller BOOTSTRAP_CONTROLLER
A comma-separated list of host:port pairs to use for establishing the connection to the
KRaft quorum.
--command-config COMMAND_CONFIG
Property file containing configurations to be passed to Admin Client.
kafka-broker-api-versions.sh
The kafka-broker-api-versions tool retrieves and displays broker information. For example, the following command outputs the version of Kafka that is running on the broker:
bin/kafka-broker-api-versions.sh --bootstrap-server host1:9092 --version
This command might have the following output:
3.3.1 (Commit:e23c59d00e687ff5)
Usage details
This tool helps to retrieve broker version information.
Option Description
------ -----------
--bootstrap-server <String: servers REQUIRED: The server to connect to.
to use for bootstrapping>
--command-config <String: command A property file containing configs to
config property file> be passed to Admin Client.
--help Print usage information.
--version Display Kafka version.
kafka-metadata-quorum.sh
Use the kafka-metadata-quorum tool to query the metadata quorum status. This tool is useful when you are debugging a cluster in KRaft mode. Pass the describe command to describe the current state of the metadata quorum.
The following code example displays a summary of the metadata quorum:
bin/kafka-metadata-quorum.sh --bootstrap-server host1:9092 describe --status
The output for this command might look like the following.
ClusterId: fMCL8kv1SWm87L_Md-I2hg
LeaderId: 3002
LeaderEpoch: 2
HighWatermark: 10
MaxFollowerLag: 0
MaxFollowerLagTimeMs: -1
CurrentVoters: [3000,3001,3002]
CurrentObservers: [0,1,2]
Usage details
usage: kafka-metadata-quorum [-h] [--command-config COMMAND_CONFIG] (--bootstrap-server BOOTSTRAP_SERVER |
--bootstrap-controller BOOTSTRAP_CONTROLLER) {describe,add-controller,remove-controller} ...
This tool describes kraft metadata quorum status.
positional arguments:
{describe,add-controller,remove-controller}
describe Describe the metadata quorum info
add-controller Add a controller to the KRaft controller cluster
remove-controller Remove a controller from the KRaft controller cluster
optional arguments:
-h, --help show this help message and exit
--bootstrap-server BOOTSTRAP_SERVER
A comma-separated list of host:port pairs to use for establishing the connection to the
Kafka cluster.
--bootstrap-controller BOOTSTRAP_CONTROLLER
A comma-separated list of host:port pairs to use for establishing the connection to the
Kafka controllers.
--command-config COMMAND_CONFIG
Property file containing configurations to be passed to Admin Client. For add-controller, the file is used to specify
the controller properties as well.
kafka-metadata-shell.sh
The kafka-metadata-shell tool enables you to interactively examine the metadata stored in a KRaft cluster.
To open the metadata shell, provide the path to your cluster’s metadata log directory using the --directory flag:
kafka-metadata-shell.sh --directory tmp/kraft-combined-logs/_cluster-metadata-0/
To ensure kafka-metadata-shell.sh functions correctly, the --directory flag must point to your Kafka cluster’s metadata log directory. While examples often show /tmp/kraft-combined-logs/_cluster-metadata-0/, this path varies based on your Kafka configuration.
If your metadata files are not in their default or expected location, you have two options:
Specify the full, correct path to your metadata log directory.
Copy the metadata log directory to a temporary location (for example, /tmp), and then specify that temporary path.
After the shell loads, you can explore the contents of the metadata log, and exit. The following code shows an example of this.
Loading...
[ Kafka Metadata Shell ]
>> ls
image local
>>ls image
acls cells clientQuotas cluster clusterLinks configs delegationToken encryptor features producerIds provenance replicaExclusions scram tenants topics
>> ls image/topics/
byID byLinkId byName byTenant
>>ls image/topics/byName
test perf_test_124 othersourcetopic
>> cat /image/topics/byName/othersourcetopic/0/
{
"replicas": [1],
"observers": null,
"directories": ["U9TE_0zN-oReJ5XwsShc-w"],
"isr": [1],
"removingReplicas": null,
"addingReplicas": null,
"removingObservers": null,
"addingObservers": null,
"elr": null,
"lastKnownElr": null,
"leader": -1,
"leaderRecoveryState": "RECOVERED",
"leaderEpoch": 57,
"partitionEpoch": 9,
"linkedLeaderEpoch": 54,
"linkState": "ACTIVE"
}
>> exit
For more information, see the Kafka Wiki.
Usage details
usage: kafka-metadata-shell [-h] --snapshot SNAPSHOT [command [command ...]]
The Apache Kafka metadata shell
positional arguments:
command The command to run.
optional arguments:
-h, --help show this help message and exit
--snapshot SNAPSHOT, -s SNAPSHOT
The metadata snapshot file to read.
kafka-configs.sh
Use the kafka-configs tool to change and describe topic, client, user, broker, IP, group configuration setting or KRaft controller. To describe or view a KRaft controller, use the --bootstrap-controller option, and do not specify a bootstrap-server.
To change a property, specify the entity-type to the desired entity (topic, broker, user, etc), and use the alter option. The following example shows how you might add the delete.retention configuration property for a topic with kafka-configs.
/bin/kafka-configs.sh --bootstrap-server host1:9092 --entity-type topics --entity-default --alter --add-config delete.retention.ms=172800000
When you use the --add-config flag to add multiple values, use square brackets around the comma-separated list like the following example:
/bin/kafka-configs.sh --bootstrap-server host1:9092 --alter --add-config max.connections.per.ip.overrides=[host1:50,host2:9] --entity-type brokers --entity-default
The following example shows how you might check the cluster ID, by specifying the --bootstrap-controller option.
/bin/kafka-cluster.sh cluster-id --bootstrap-controller localhost:9092
See Kafka Topic Operations for more examples of how to work with topics.
Usage details
This tool helps to manipulate and describe entity config for a topic, client, user, broker, ip, client-metrics or group
Option Description
------ -----------
--add-config <String> Key Value pairs of configs to add.
Square brackets can be used to group
values which contain commas: 'k1=v1,
k2=[v1,v2,v2],k3=v3'. The following
is a list of valid configurations:
For entity-type 'topics':
cleanup.policy
compression.gzip.level
compression.lz4.level
compression.type
compression.zstd.level
delete.retention.ms
file.delete.delay.ms
flush.messages
flush.ms
follower.replication.throttled.
replicas
index.interval.bytes
leader.replication.throttled.replicas
local.retention.bytes
local.retention.ms
max.compaction.lag.ms
max.message.bytes
message.timestamp.after.max.ms
message.timestamp.before.max.ms
message.timestamp.type
min.cleanable.dirty.ratio
min.compaction.lag.ms
min.insync.replicas
preallocate
remote.log.copy.disable
remote.log.delete.on.disable
remote.storage.enable
retention.bytes
retention.ms
segment.bytes
segment.index.bytes
segment.jitter.ms
segment.ms
unclean.leader.election.enable
For entity-type 'brokers':
background.threads
compression.gzip.level
compression.lz4.level
compression.type
compression.zstd.level
config.providers
follower.replication.throttled.rate
leader.replication.throttled.rate
listener.security.protocol.map
listeners
log.cleaner.backoff.ms
log.cleaner.dedupe.buffer.size
log.cleaner.delete.retention.ms
log.cleaner.io.buffer.load.factor
log.cleaner.io.buffer.size
log.cleaner.io.max.bytes.per.second
log.cleaner.max.compaction.lag.ms
log.cleaner.min.cleanable.ratio
log.cleaner.min.compaction.lag.ms
log.cleaner.threads
log.cleanup.policy
log.flush.interval.messages
log.flush.interval.ms
log.index.interval.bytes
log.index.size.max.bytes
log.local.retention.bytes
log.local.retention.ms
log.message.timestamp.after.max.ms
log.message.timestamp.before.max.ms
log.message.timestamp.type
log.preallocate
log.retention.bytes
log.retention.ms
log.roll.jitter.ms
log.roll.ms
log.segment.bytes
log.segment.delete.delay.ms
max.connection.creation.rate
max.connections
max.connections.per.ip
max.connections.per.ip.overrides
message.max.bytes
metric.reporters
min.insync.replicas
num.io.threads
num.network.threads
num.recovery.threads.per.data.dir
num.replica.fetchers
principal.builder.class
producer.id.expiration.ms
remote.fetch.max.wait.ms
remote.list.offsets.request.timeout.ms
remote.log.index.file.cache.total.
size.bytes
remote.log.manager.copier.thread.pool.
size
remote.log.manager.copy.max.bytes.per.
second
remote.log.manager.expiration.thread.
pool.size
remote.log.manager.fetch.max.bytes.
per.second
remote.log.manager.follower.thread.
pool.size
remote.log.reader.threads
replica.alter.log.dirs.io.max.bytes.
per.second
sasl.enabled.mechanisms
sasl.jaas.config
sasl.kerberos.kinit.cmd
sasl.kerberos.min.time.before.relogin
sasl.kerberos.principal.to.local.rules
sasl.kerberos.service.name
sasl.kerberos.ticket.renew.jitter
sasl.kerberos.ticket.renew.window.
factor
sasl.login.refresh.buffer.seconds
sasl.login.refresh.min.period.seconds
sasl.login.refresh.window.factor
sasl.login.refresh.window.jitter
sasl.mechanism.inter.broker.protocol
ssl.cipher.suites
ssl.client.auth
ssl.enabled.protocols
ssl.endpoint.identification.algorithm
ssl.engine.factory.class
ssl.key.password
ssl.keymanager.algorithm
ssl.keystore.certificate.chain
ssl.keystore.key
ssl.keystore.location
ssl.keystore.password
ssl.keystore.type
ssl.protocol
ssl.provider
ssl.secure.random.implementation
ssl.trustmanager.algorithm
ssl.truststore.certificates
ssl.truststore.location
ssl.truststore.password
ssl.truststore.type
transaction.partition.verification.
enable
unclean.leader.election.enable
For entity-type 'users':
SCRAM-SHA-256
SCRAM-SHA-512
consumer_byte_rate
controller_mutation_rate
producer_byte_rate
request_percentage
For entity-type 'clients':
consumer_byte_rate
controller_mutation_rate
producer_byte_rate
request_percentage
For entity-type 'ips':
connection_creation_rate
For entity-type 'client-metrics':
interval.ms
match
metrics
For entity-type 'groups':
consumer.heartbeat.interval.ms
consumer.session.timeout.ms
share.auto.offset.reset
share.heartbeat.interval.ms
share.isolation.level
share.record.lock.duration.ms
share.session.timeout.ms
streams.heartbeat.interval.ms
streams.initial.rebalance.delay.ms
streams.num.standby.replicas
streams.session.timeout.ms
Entity types 'users' and 'clients' may
be specified together to update
config for clients of a specific
user.
--add-config-file <String> Path to a properties file with configs
to add. See add-config for a list of
valid configurations.
--all List all configs for the given entity,
including static configs if
available.
--alter Alter the configuration for the entity.
--bootstrap-controller <String: The Kafka controllers to connect to.
controller to connect to>
--bootstrap-server <String: server to The Kafka servers to connect to.
connect to>
--broker <String> The broker's ID.
--broker-defaults The config defaults for all brokers.
--broker-logger <String> The broker's ID for its logger config.
--client <String> The client's ID.
--client-defaults The config defaults for all clients.
--client-metrics <String> The client metrics config resource
name.
--command-config <String: command Property file containing configs to be
config property file> passed to Admin Client. This is used
only with --bootstrap-server option
for describing and altering broker
configs.
--delete-config <String> config keys to remove 'k1,k2'
--describe List configs for the given entity.
--entity-default Default entity name for
clients/users/brokers/ips (applies
to corresponding entity type)
--entity-name <String> Name of entity (topic name/client
id/user principal name/broker
id/ip/client metrics/group id)
--entity-type <String> Type of entity
(topics/clients/users/brokers/broker-
loggers/ips/client-metrics/groups)
--group <String> The group's ID.
--help Print usage information.
--ip <String> The IP address.
--ip-defaults The config defaults for all IPs.
--topic <String> The topic's name.
--user <String> The user's principal name.
--user-defaults The config defaults for all users.
--version Display Kafka version.
Manage topics, partitions, and replication
kafka-topics.sh
Use the kafka-topics tool to create or delete a topic. You can also use the tool to retrieve a list of topics associated with a Kafka cluster. For more information, see Kafka Topic Operations.
To change a topic, see kafka-configs.sh, or how to modify a topic.
Example:
bin/kafka-topics.sh --bootstrap-server host1:9092 --topic test-topic --partitions 3
Usage details
This tool helps to create, delete, describe, or change a topic.
Option Description
------ -----------
--alter Alter the number of partitions and
replica assignment. (To alter topic
configurations, the kafka-configs
tool can be used.)
--at-min-isr-partitions If set when describing topics, only
show partitions whose isr count is
equal to the configured minimum.
--bootstrap-server <String: server to REQUIRED: The Kafka server to connect
connect to> to.
--command-config <String: command Property file containing configurations to be
config property file> passed to Admin Client.
--config <String: name=value> A topic configuration override for the
topic being created. The
following is a list of valid
configurations:
cleanup.policy
compression.gzip.level
compression.lz4.level
compression.type
compression.zstd.level
delete.retention.ms
file.delete.delay.ms
flush.messages
flush.ms
follower.replication.throttled.
replicas
index.interval.bytes
leader.replication.throttled.replicas
local.retention.bytes
local.retention.ms
max.compaction.lag.ms
max.message.bytes
message.timestamp.after.max.ms
message.timestamp.before.max.ms
message.timestamp.type
min.cleanable.dirty.ratio
min.compaction.lag.ms
min.insync.replicas
preallocate
remote.log.copy.disable
remote.log.delete.on.disable
remote.storage.enable
retention.bytes
retention.ms
segment.bytes
segment.index.bytes
segment.jitter.ms
segment.ms
unclean.leader.election.enable
See the Kafka documentation for full
details on the topic configs. It is
supported only in combination with --
create. (To alter topic
configurations, the kafka-configs
tool can be used.)
--create Create a new topic.
--delete Delete a topic.
--delete-config <String: name> This option is no longer supported and
has been deprecated since 4.0
--describe List details for the given topics.
--exclude-internal Exclude internal topics when listing
or describing topics. By default,
the internal topics are included.
--help Print usage information.
--if-exists If set when altering or deleting or
describing topics, the action will
only execute if the topic exists.
--if-not-exists If set when creating topics, the
create request will not fail if the
topic already exists, but the
request will still be sent.
--list List all available topics.
--partition-size-limit-per-response The maximum partition size to be
<Integer: maximum number of included in one
partitions per response> DescribeTopicPartitions response.
--partitions <Integer: # of partitions> The number of partitions for the topic
being created or altered. If not
supplied with --create, the topic
uses the cluster default. If
partitions are increased for a
topic that has a key, the partition
logic or ordering of the messages
will be affected.
--replica-assignment <String: A list of manual partition-to-broker
broker_id_for_part1_replica1 : assignments for the topic being
broker_id_for_part1_replica2 , created or altered.
broker_id_for_part2_replica1 :
broker_id_for_part2_replica2 , ...>
--replication-factor <Integer: The replication factor for each
replication factor> partition in the topic being
created. If not supplied, the topic
uses the cluster default.
--topic <String: topic> The topic to create, alter, describe
or delete. It also accepts a regular
expression, except for --create
option.
--topic-id <String: topic-id> The topic-id to describe.
--topics-with-overrides If set when describing topics, only
show topics that have overridden
configs.
--unavailable-partitions If set when describing topics, only
show partitions whose leader is not
available.
--under-min-isr-partitions If set when describing topics, only
show partitions whose isr count is
less than the configured minimum.
--under-replicated-partitions If set when describing topics, only
show under-replicated partitions.
--version Display Kafka version.
kafka-configs.sh
Use the kafka-configs tool to change and describe topic, client, user, broker, IP, group configuration setting or KRaft controller. To describe or view a KRaft controller, use the --bootstrap-controller option, and do not specify a bootstrap-server.
To change a property, specify the entity-type to the desired entity (topic, broker, user, etc), and use the alter option. The following example shows how you might add the delete.retention configuration property for a topic with kafka-configs.
/bin/kafka-configs.sh --bootstrap-server host1:9092 --entity-type topics --entity-default --alter --add-config delete.retention.ms=172800000
When you use the --add-config flag to add multiple values, use square brackets around the comma-separated list like the following example:
/bin/kafka-configs.sh --bootstrap-server host1:9092 --alter --add-config max.connections.per.ip.overrides=[host1:50,host2:9] --entity-type brokers --entity-default
The following example shows how you might check the cluster ID, by specifying the --bootstrap-controller option.
/bin/kafka-cluster.sh cluster-id --bootstrap-controller localhost:9092
See Kafka Topic Operations for more examples of how to work with topics.
Usage details
This tool helps to manipulate and describe entity config for a topic, client, user, broker, ip, client-metrics or group
Option Description
------ -----------
--add-config <String> Key Value pairs of configs to add.
Square brackets can be used to group
values which contain commas: 'k1=v1,
k2=[v1,v2,v2],k3=v3'. The following
is a list of valid configurations:
For entity-type 'topics':
cleanup.policy
compression.gzip.level
compression.lz4.level
compression.type
compression.zstd.level
delete.retention.ms
file.delete.delay.ms
flush.messages
flush.ms
follower.replication.throttled.
replicas
index.interval.bytes
leader.replication.throttled.replicas
local.retention.bytes
local.retention.ms
max.compaction.lag.ms
max.message.bytes
message.timestamp.after.max.ms
message.timestamp.before.max.ms
message.timestamp.type
min.cleanable.dirty.ratio
min.compaction.lag.ms
min.insync.replicas
preallocate
remote.log.copy.disable
remote.log.delete.on.disable
remote.storage.enable
retention.bytes
retention.ms
segment.bytes
segment.index.bytes
segment.jitter.ms
segment.ms
unclean.leader.election.enable
For entity-type 'brokers':
background.threads
compression.gzip.level
compression.lz4.level
compression.type
compression.zstd.level
config.providers
follower.replication.throttled.rate
leader.replication.throttled.rate
listener.security.protocol.map
listeners
log.cleaner.backoff.ms
log.cleaner.dedupe.buffer.size
log.cleaner.delete.retention.ms
log.cleaner.io.buffer.load.factor
log.cleaner.io.buffer.size
log.cleaner.io.max.bytes.per.second
log.cleaner.max.compaction.lag.ms
log.cleaner.min.cleanable.ratio
log.cleaner.min.compaction.lag.ms
log.cleaner.threads
log.cleanup.policy
log.flush.interval.messages
log.flush.interval.ms
log.index.interval.bytes
log.index.size.max.bytes
log.local.retention.bytes
log.local.retention.ms
log.message.timestamp.after.max.ms
log.message.timestamp.before.max.ms
log.message.timestamp.type
log.preallocate
log.retention.bytes
log.retention.ms
log.roll.jitter.ms
log.roll.ms
log.segment.bytes
log.segment.delete.delay.ms
max.connection.creation.rate
max.connections
max.connections.per.ip
max.connections.per.ip.overrides
message.max.bytes
metric.reporters
min.insync.replicas
num.io.threads
num.network.threads
num.recovery.threads.per.data.dir
num.replica.fetchers
principal.builder.class
producer.id.expiration.ms
remote.fetch.max.wait.ms
remote.list.offsets.request.timeout.ms
remote.log.index.file.cache.total.
size.bytes
remote.log.manager.copier.thread.pool.
size
remote.log.manager.copy.max.bytes.per.
second
remote.log.manager.expiration.thread.
pool.size
remote.log.manager.fetch.max.bytes.
per.second
remote.log.manager.follower.thread.
pool.size
remote.log.reader.threads
replica.alter.log.dirs.io.max.bytes.
per.second
sasl.enabled.mechanisms
sasl.jaas.config
sasl.kerberos.kinit.cmd
sasl.kerberos.min.time.before.relogin
sasl.kerberos.principal.to.local.rules
sasl.kerberos.service.name
sasl.kerberos.ticket.renew.jitter
sasl.kerberos.ticket.renew.window.
factor
sasl.login.refresh.buffer.seconds
sasl.login.refresh.min.period.seconds
sasl.login.refresh.window.factor
sasl.login.refresh.window.jitter
sasl.mechanism.inter.broker.protocol
ssl.cipher.suites
ssl.client.auth
ssl.enabled.protocols
ssl.endpoint.identification.algorithm
ssl.engine.factory.class
ssl.key.password
ssl.keymanager.algorithm
ssl.keystore.certificate.chain
ssl.keystore.key
ssl.keystore.location
ssl.keystore.password
ssl.keystore.type
ssl.protocol
ssl.provider
ssl.secure.random.implementation
ssl.trustmanager.algorithm
ssl.truststore.certificates
ssl.truststore.location
ssl.truststore.password
ssl.truststore.type
transaction.partition.verification.
enable
unclean.leader.election.enable
For entity-type 'users':
SCRAM-SHA-256
SCRAM-SHA-512
consumer_byte_rate
controller_mutation_rate
producer_byte_rate
request_percentage
For entity-type 'clients':
consumer_byte_rate
controller_mutation_rate
producer_byte_rate
request_percentage
For entity-type 'ips':
connection_creation_rate
For entity-type 'client-metrics':
interval.ms
match
metrics
For entity-type 'groups':
consumer.heartbeat.interval.ms
consumer.session.timeout.ms
share.auto.offset.reset
share.heartbeat.interval.ms
share.isolation.level
share.record.lock.duration.ms
share.session.timeout.ms
streams.heartbeat.interval.ms
streams.initial.rebalance.delay.ms
streams.num.standby.replicas
streams.session.timeout.ms
Entity types 'users' and 'clients' may
be specified together to update
config for clients of a specific
user.
--add-config-file <String> Path to a properties file with configs
to add. See add-config for a list of
valid configurations.
--all List all configs for the given entity,
including static configs if
available.
--alter Alter the configuration for the entity.
--bootstrap-controller <String: The Kafka controllers to connect to.
controller to connect to>
--bootstrap-server <String: server to The Kafka servers to connect to.
connect to>
--broker <String> The broker's ID.
--broker-defaults The config defaults for all brokers.
--broker-logger <String> The broker's ID for its logger config.
--client <String> The client's ID.
--client-defaults The config defaults for all clients.
--client-metrics <String> The client metrics config resource
name.
--command-config <String: command Property file containing configs to be
config property file> passed to Admin Client. This is used
only with --bootstrap-server option
for describing and altering broker
configs.
--delete-config <String> config keys to remove 'k1,k2'
--describe List configs for the given entity.
--entity-default Default entity name for
clients/users/brokers/ips (applies
to corresponding entity type)
--entity-name <String> Name of entity (topic name/client
id/user principal name/broker
id/ip/client metrics/group id)
--entity-type <String> Type of entity
(topics/clients/users/brokers/broker-
loggers/ips/client-metrics/groups)
--group <String> The group's ID.
--help Print usage information.
--ip <String> The IP address.
--ip-defaults The config defaults for all IPs.
--topic <String> The topic's name.
--user <String> The user's principal name.
--user-defaults The config defaults for all users.
--version Display Kafka version.
kafka-get-offsets.sh
Use the kafka-get-offsets tool to retrieve topic-partition offsets.
Usage details
An interactive shell for getting topic-partition offsets.
Option Description
------ -----------
--bootstrap-server <String: HOST1: REQUIRED. The servers to connect to
PORT1,...,HOST3:PORT3> in the form HOST1:PORT1,HOST2:PORT2.
--command-config <String: config file> Property file containing configurations to be
passed to Admin Client.
--exclude-internal-topics By default, internal topics are
included. If specified, internal
topics are excluded.
--help Print usage information.
--partitions <String: partition ids> Comma separated list of partition ids
to get the offsets for. If not
present, all partitions of the
authorized topics are queried.
Cannot be used if --topic-partitions
is present.
--time <String: <timestamp> / -1 or timestamp of the offsets before that.
latest / -2 or earliest / -3 or max- [Note: No offset is returned, if the
timestamp / -4 or earliest-local / timestamp greater than recently
-5 or latest-tiered / -6 or earliest- committed record timestamp is
pending-upload> given.] (default: latest)
--topic <String: topic> The topic to get the offsets for. It
also accepts a regular expression.
If not present, all authorized
topics are queried. Cannot be used
if --topic-partitions is present.
--topic-partitions <String: topic1:1, Comma separated list of topic-
topic2:0-3,topic3,topic4:5-,topic5:-3 partition patterns to get the
> offsets for, with the format of
'([^:,]*)(?::(?:([0-9]*)|(?:([0-9]*)
-([0-9]*))))?'. The first group is
an optional regex for the topic
name, if omitted, it matches any
topic name. The section after ':'
describes a 'partition' pattern,
which can be: a number, a range in
the format of 'NUMBER-NUMBER' (lower
inclusive, upper exclusive), an
inclusive lower bound in the format
of 'NUMBER-', an exclusive upper
bound in the format of '-NUMBER' or
may be omitted to accept all
partitions.
--version Display Kafka version.
kafka-leader-election.sh
Use the kafka-leader-election tool to attempt to elect a new leader for a set of topic partitions.
Run this tool manually to restore leadership if the auto.leader.rebalance.enable property is set to false.
Usage details
This tool attempts to elect a new leader for a set of topic partitions. The type of elections supported are preferred replicas and unclean replicas.
Option Description
------ -----------
--admin.config <String: config file> DEPRECATED: Configuration properties
files to pass to the admin client.
This option will be removed in a
future version. Use --command-config
instead.
--all-topic-partitions Perform election on all of the
eligible topic partitions based on
the type of election (see the --
election-type flag). Not allowed if
--topic or --path-to-json-file is
specified.
--bootstrap-server <String: host:port> A hostname and port for the broker to
connect to, in the form host:port.
Multiple comma separated URLs can be
given. REQUIRED.
--command-config <String: Config file> Configuration properties file to pass to the
admin client.
--election-type <[PREFERRED,UNCLEAN]: Type of election to attempt. Possible
election type> values are "preferred" for preferred
leader election or "unclean" for
unclean leader election. If
preferred election is selection, the
election is only performed if the
current leader is not the preferred
leader for the topic partition. If
unclean election is selected, the
election is only performed if there
are no leader for the topic
partition. REQUIRED.
--help Print usage information.
--partition <Integer: partition id> Partition id for which to perform an
election. REQUIRED if --topic is
specified.
--path-to-json-file <String: Path to The JSON file with the list of
JSON file> partition for which leader elections
should be performed. This is an
example format.
{"partitions":
[{"topic": "foo", "partition": 1},
{"topic": "foobar", "partition": 2}]
}
Not allowed if --all-topic-partitions
or --topic flags are specified.
--topic <String: topic name> Name of topic for which to perform an
election. Not allowed if --path-to-
json-file or --all-topic-partitions
is specified.
--version Display Kafka version.
kafka-transactions.sh
Use the kafka-transactions tool to list and describe transactions. Use to detect and abort hanging transactions. For more information, see Detect and Abort Hanging Transactions
Usage details
usage: kafka-transactions.sh [-h] [-v] [--command-config FILE] --bootstrap-server host:port COMMAND ...
This tool is used to analyze the transactional state of producers in the cluster. It can be used to detect and
recover from hanging transactions.
optional arguments:
-h, --help show this help message and exit
-v, --version show the version of this Kafka distribution and exit
--command-config FILE property file containing configurations to be passed to admin client
--bootstrap-server host:port
hostname and port for the broker to connect to, in the form `host:port`;
multiple comma-separated entries can be given
commands:
list list transactions
describe describe the state of an active transactional-id
describe-producers describe the states of active producers for a topic partition
abort abort a hanging transaction (requires administrative privileges)
find-hanging find hanging transactions
forceTerminateTransaction
Force abort an ongoing transaction on transactionalId (requires administrative privileges)
kafka-reassign-partitions.sh
Use the kafka-reassign-partitions to move topic partitions between replicas You pass a JSON-formatted file to specify the new replicas. To learn more, see Changing the replication factor in the Confluent documentation.
Usage details
This tool helps to move topic partitions between replicas.
Option Description
------ -----------
--additional Execute this reassignment in addition
to any other ongoing ones. This
option can also be used to change
the throttle of an ongoing
reassignment.
--bootstrap-controller The controller to use for reassignment.
[String: bootstrap controller By default, the tool will get the quorum controller. This
to connect to] option supports the actions --cancel and --list.
--bootstrap-server [String: Servers the servers to use for bootstrapping.
to use for bootstrapping]
--broker-list <String: brokerlist> The list of brokers to which the
partitions need to be reassigned in
the form "0,1,2". This is required
if --topics-to-move-json-file is
used to generate reassignment
configuration
--cancel Cancel an active reassignment.
--command-config <String: Admin client Property file containing configurations to be
property file> passed to Admin Client.
--disable-rack-aware Disable rack aware replica assignment
--disallow-replication-factor-change Denies the ability to change a
partition's replication factor as
part of this reassignment through
adding validation against it.
--execute Kick off the reassignment as specified
by the --reassignment-json-file
option.
--generate Generate a candidate partition
reassignment configuration. Note
that this only generates a candidate
assignment, it does not execute it.
--help Print usage information.
--list List all active partition
reassignments.
--preserve-throttles Do not modify broker or topic
throttles.
--reassignment-json-file <String: The JSON file with the partition
manual assignment json file path> reassignment configurationThe format
to use is -
{"partitions":
[{"topic": "foo",
"partition": 1,
"replicas": [1,2,3],
"log_dirs": ["dir1","dir2","dir3"]
}],
"version":1
}
Note that "log_dirs" is optional. When
it is specified, its length must
equal the length of the replicas
list. The value in this list can be
either "any" or the absolution path
of the log directory on the broker.
If absolute log directory path is
specified, the replica will be moved
to the specified log directory on
the broker.
--replica-alter-log-dirs-throttle The movement of replicas between log
<Long: replicaAlterLogDirsThrottle> directories on the same broker will
be throttled to this value
(bytes/sec). This option can be
included with --execute when a
reassignment is started, and it can
be altered by resubmitting the
current reassignment along with the
--additional flag. The throttle rate
should be at least 1 KB/s. (default:
-1)
--throttle <Long: throttle> The movement of partitions between
brokers will be throttled to this
value (bytes/sec). This option can
be included with --execute when a
reassignment is started, and it can
be altered by resubmitting the
current reassignment along with the
--additional flag. The throttle rate
should be at least 1 KB/s. (default:
-1)
--timeout <Long: timeout> The maximum time in ms to wait for log
directory replica assignment to
begin. (default: 10000)
--topics-to-move-json-file <String: Generate a reassignment configuration
topics to reassign json file path> to move the partitions of the
specified topics to the list of
brokers specified by the --broker-
list option. The format to use is -
{"topics":
[{"topic": "foo"},{"topic": "foo1"}],
"version":1
}
--verify Verify if the reassignment completed
as specified by the --reassignment-
json-file option. If there is a
throttle engaged for the replicas
specified, and the rebalance has
completed, the throttle will be
removed
--version Display Kafka version.
kafka-delete-records.sh
Use the kafka-delete-records tool to delete partition records. Use this if a topic receives bad data. Pass a JSON-formatted file that specifies the topic, partition, and offset for data deletion. Data will be deleted up to the offset specified. Example:
bin/kafka-delete-records.sh --bootstrap-server host1:9092 --offset-json-file deleteme.json
Usage details
This tool helps to delete records of the given partitions down to the specified offset.
Option Description
------ -----------
--bootstrap-server <String: servers REQUIRED: The server to connect to.
to use for bootstrapping>
--command-config <String: command A property file containing configs to
config property file path> be passed to Admin Client.
--help Print usage information.
--offset-json-file <String: Offset REQUIRED: The JSON file with offset
json file path> per partition. The format to use is:
{"partitions":
[{"topic": "foo", "partition": 1,
"offset": 1}],
"version":1
}
--version Display Kafka version.
kafka-log-dirs.sh
Use the kafka-log-dirs tool to get a list of replicas per log directory on a broker.
Usage details
This tool helps to query log directory usage on the specified brokers.
Option Description
------ -----------
--bootstrap-server <String: The server REQUIRED: the servers to use for
(s) to use for bootstrapping> bootstrapping
--broker-list <String: Broker list> The list of brokers to be queried in
the form "0,1,2". All brokers in the
cluster will be queried if no broker
list is specified (default: )
--command-config <String: Admin client Property file containing configurations to be
property file> passed to Admin Client.
--describe Describe the specified log directories
on the specified brokers.
--help Print usage information.
--topic-list <String: Topic list> The list of topics to be queried in
the form "topic1,topic2,topic3". All
topics will be queried if no topic
list is specified (default: )
--version Display Kafka version.
kafka-replica-verification.sh
Use the kafka-replica-verification tool to verify that all replicas of a topic contain the same data. Requires a broker-list parameter that contains a comma-separated list of <hostname:port> entries specifying the server/port to connect to.
Usage details
This tool is deprecated and may be removed in a future major release.
Validate that all replicas for a set of topics have the same data.
Option Description
------ -----------
--broker-list <String: hostname: REQUIRED: The list of hostname and
port,...,hostname:port> port of the server to connect to.
--fetch-size <Integer: bytes> The fetch size of each request.
(default: 1048576)
--help Print usage information.
--max-wait-ms <Integer: ms> The max amount of time each fetch
request waits. (default: 1000)
--report-interval-ms <Long: ms> The reporting interval. (default:
30000)
--time <Long: timestamp/-1(latest)/-2 Timestamp for getting the initial
(earliest)> offsets. (default: -1)
--topics-include <String: Java regex List of topics to verify replica
(String)> consistency. (default: .*)
--version Display Kafka version.
kafka-mirror-maker.sh
DEPRECATED: For an alternative, see connect-mirror-maker.sh. Enables the creation of a replica of an existing Kafka cluster. Example: bin/connect-mirror-maker.sh connect-mirror-maker.properties --clusters secondary. Learn more Kafka mirroring
Usage details
This tool helps to continuously copy data between two Kafka clusters.
Option Description
------ -----------
--abort.on.send.failure <String: Stop Configure the mirror maker to exit on
the entire mirror maker when a send a failed send. (default: true)
failure occurs>
--consumer.config <String: config file> Embedded consumer configuration for consuming
from the source cluster.
--consumer.rebalance.listener <String: The consumer rebalance listener to use
A custom rebalance listener of type for mirror maker consumer.
ConsumerRebalanceListener>
--help Print usage information.
--include <String: Java regex (String)> List of included topics to mirror.
--message.handler <String: A custom Message handler which will process
message handler of type every record in-between consumer and
MirrorMakerMessageHandler> producer.
--message.handler.args <String: Arguments used by custom message
Arguments passed to message handler handler for mirror maker.
constructor.>
--new.consumer DEPRECATED Use new consumer in mirror
maker (this is the default so this
option will be removed in a future
version).
--num.streams <Integer: Number of Number of consumption streams.
threads> (default: 1)
--offset.commit.interval.ms <Integer: Offset commit interval in ms.
offset commit interval in (default: 60000)
millisecond>
--producer.config <String: config file> Embedded producer configuration.
--rebalance.listener.args <String: Arguments used by custom rebalance
Arguments passed to custom rebalance listener for mirror maker consumer.
listener constructor as a string.>
--version Display Kafka version.
--whitelist <String: Java regex DEPRECATED, use --include instead;
(String)> ignored if --include specified. List
of included topics to mirror.
connect-mirror-maker.sh
Use the connect-mirror-maker tool to replicate topics from one cluster to another using the Connect framework. You must pass an an mm2.properties MM2 configuration file. For more information, see KIP-382: MirrorMaker 2.0 or Getting up to speed with MirrorMaker 2.
Usage details
usage: connect-mirror-maker [-h] [--clusters CLUSTER [CLUSTER ...]] mm2.properties
MirrorMaker 2.0 driver
positional arguments:
mm2.properties MM2 configuration file.
optional arguments:
-h, --help show this help message and exit
--clusters CLUSTER [CLUSTER ...]
Target cluster to use for this node.
Client, producer, and consumer tools
kafka-client-metrics.sh
Use the kafka-client-metrics tool to manipulate and describe client metrics configurations for clusters where client metrics are enabled. This tool provides a simpler alternative to using kafka-configs.sh to configure client metrics.
For example, to list all of the client metric configuration resource, use the following command:
kafka-client-metrics.sh --bootstrap-server HOST1:PORT1 --describe
To describe a specific configuration:
kafka-client-metrics.sh --bootstrap-server HOST1:PORT1 --describe --name MYMETRICS
You can use this tool to create a client metric configuration resource and generate a unique name. In this example, --generate-name is used to create a type-4 UUID to use as the client metrics configuration resource name:
kafka-client-metrics.sh --bootstrap-server HOST1:PORT1 --alter --generate-name \
--metrics org.apache.kafka.producer.node.request.latency.,org.apache.kafka.consumer.node.request.latency. \
--interval 60000
Usage details
Option Description
------ -----------
--alter Alter the configuration for the client
metrics resource.
--bootstrap-server <String: server to REQUIRED: The Kafka server to connect
connect to> to.
--command-config <String: command Property file containing configurations to be
config property file> passed to Admin Client.
--delete Delete the configuration for the
client metrics resource.
--describe List configurations for the client
metrics resource.
--generate-name Generate a UUID to use as the name.
--help Print usage information.
--interval <String: push interval> The metrics push interval in
milliseconds.
Leave empty to reset the interval.
--list List the client metrics resources.
--match <String: k1=v1,k2=v2> Matching selector 'k1=v1,k2=v2'. The
following is a list of valid
selector names:
client_id
client_instance_id
client_software_name
client_software_version
client_source_address
client_source_port
--metrics <String: m1,m2> Telemetry metric name prefixes 'm1,m2'.
--name <String: name> Name of client metrics configuration
resource.
--version Display Kafka version.
kafka-verifiable-consumer.sh
The kafka-verifiable-consumer tool consumes messages from a topic and emits consumer events as JSON objects to STDOUT. For example, group rebalances, received messages, and offsets committed. Intended for internal testing.
Usage details
usage: verifiable-consumer [-h] --topic TOPIC [--group-protocol GROUP-PROTOCOL] [--group-remote-assignor GROUP-REMOTE-ASSIGNOR]
--group-id GROUP-ID [--group-instance-id GROUP-INSTANCE-ID] [--max-messages MAX-MESSAGES]
[--session-timeout TIMEOUT-MS] [--verbose] [--enable-autocommit] [--reset-policy RESET-POLICY]
[--assignment-strategy ASSIGNMENT-STRATEGY] [--consumer.config CONFIG-FILE] [--command-config CONFIG-FILE]
--bootstrap-server HOST1:PORT1[,HOST2:PORT2[...]]
This tool consumes messages from a specific topic and emits consumer events such as group
rebalances, received messages, and offsets committed as JSON objects to STDOUT.
optional arguments:
-h, --help show this help message and exit
--topic TOPIC Consumes messages from this topic.
--group-protocol GROUP-PROTOCOL
Group protocol. Must be one of CLASSIC or CONSUMER. Default: classic.
--group-remote-assignor GROUP-REMOTE-ASSIGNOR
Group remote assignor; only used if the group protocol is CONSUMER
--group-id GROUP-ID The group id of the consumer group
--group-instance-id GROUP-INSTANCE-ID
A unique identifier of the consumer instance
--max-messages MAX-MESSAGES
Consume this many messages. If -1, the consumer will consume until the process is killed
externally. Default: -1.
--session-timeout TIMEOUT-MS
Set the consumer's session timeout, note that this configuration is not supported when group protocol is consumer
--verbose Enable to log individual consumed records. Default: false.
--enable-autocommit Enable offset auto-commit on consumer. Default: false.
--reset-policy RESET-POLICY
Set reset policy. Must be either 'earliest', 'latest', or 'none'. Default: earliest.
--assignment-strategy ASSIGNMENT-STRATEGY
Set assignment strategy such as org.apache.kafka.clients.consumer.RoundRobinAssignor; only used if the group
protocol is CLASSIC. Default: org.apache.kafka.clients.consumer.RangeAssignor.
--consumer.config CONFIG-FILE
DEPRECATED: Consumer configuration properties file. This option will be removed in a future version. Use --command-
config instead
--command-config CONFIG-FILE
Configuration properties file; configuration options shared with command line parameters will be overridden.
Connection Group:
Group of arguments for connection to brokers
--bootstrap-server HOST1:PORT1[,HOST2:PORT2[...]]
The servers to connect to. Comma-separated list of Kafka brokers in the form HOST1:PORT1,HOST2:PORT2,...
kafka-configs.sh
Use the kafka-configs tool to change and describe topic, client, user, broker, IP, group configuration setting or KRaft controller. To describe or view a KRaft controller, use the --bootstrap-controller option, and do not specify a bootstrap-server.
To change a property, specify the entity-type to the desired entity (topic, broker, user, etc), and use the alter option. The following example shows how you might add the delete.retention configuration property for a topic with kafka-configs.
/bin/kafka-configs.sh --bootstrap-server host1:9092 --entity-type topics --entity-default --alter --add-config delete.retention.ms=172800000
When you use the --add-config flag to add multiple values, use square brackets around the comma-separated list like the following example:
/bin/kafka-configs.sh --bootstrap-server host1:9092 --alter --add-config max.connections.per.ip.overrides=[host1:50,host2:9] --entity-type brokers --entity-default
The following example shows how you might check the cluster ID, by specifying the --bootstrap-controller option.
/bin/kafka-cluster.sh cluster-id --bootstrap-controller localhost:9092
See Kafka Topic Operations for more examples of how to work with topics.
Usage details
This tool helps to manipulate and describe entity config for a topic, client, user, broker, ip, client-metrics or group
Option Description
------ -----------
--add-config <String> Key Value pairs of configs to add.
Square brackets can be used to group
values which contain commas: 'k1=v1,
k2=[v1,v2,v2],k3=v3'. The following
is a list of valid configurations:
For entity-type 'topics':
cleanup.policy
compression.gzip.level
compression.lz4.level
compression.type
compression.zstd.level
delete.retention.ms
file.delete.delay.ms
flush.messages
flush.ms
follower.replication.throttled.
replicas
index.interval.bytes
leader.replication.throttled.replicas
local.retention.bytes
local.retention.ms
max.compaction.lag.ms
max.message.bytes
message.timestamp.after.max.ms
message.timestamp.before.max.ms
message.timestamp.type
min.cleanable.dirty.ratio
min.compaction.lag.ms
min.insync.replicas
preallocate
remote.log.copy.disable
remote.log.delete.on.disable
remote.storage.enable
retention.bytes
retention.ms
segment.bytes
segment.index.bytes
segment.jitter.ms
segment.ms
unclean.leader.election.enable
For entity-type 'brokers':
background.threads
compression.gzip.level
compression.lz4.level
compression.type
compression.zstd.level
config.providers
follower.replication.throttled.rate
leader.replication.throttled.rate
listener.security.protocol.map
listeners
log.cleaner.backoff.ms
log.cleaner.dedupe.buffer.size
log.cleaner.delete.retention.ms
log.cleaner.io.buffer.load.factor
log.cleaner.io.buffer.size
log.cleaner.io.max.bytes.per.second
log.cleaner.max.compaction.lag.ms
log.cleaner.min.cleanable.ratio
log.cleaner.min.compaction.lag.ms
log.cleaner.threads
log.cleanup.policy
log.flush.interval.messages
log.flush.interval.ms
log.index.interval.bytes
log.index.size.max.bytes
log.local.retention.bytes
log.local.retention.ms
log.message.timestamp.after.max.ms
log.message.timestamp.before.max.ms
log.message.timestamp.type
log.preallocate
log.retention.bytes
log.retention.ms
log.roll.jitter.ms
log.roll.ms
log.segment.bytes
log.segment.delete.delay.ms
max.connection.creation.rate
max.connections
max.connections.per.ip
max.connections.per.ip.overrides
message.max.bytes
metric.reporters
min.insync.replicas
num.io.threads
num.network.threads
num.recovery.threads.per.data.dir
num.replica.fetchers
principal.builder.class
producer.id.expiration.ms
remote.fetch.max.wait.ms
remote.list.offsets.request.timeout.ms
remote.log.index.file.cache.total.
size.bytes
remote.log.manager.copier.thread.pool.
size
remote.log.manager.copy.max.bytes.per.
second
remote.log.manager.expiration.thread.
pool.size
remote.log.manager.fetch.max.bytes.
per.second
remote.log.manager.follower.thread.
pool.size
remote.log.reader.threads
replica.alter.log.dirs.io.max.bytes.
per.second
sasl.enabled.mechanisms
sasl.jaas.config
sasl.kerberos.kinit.cmd
sasl.kerberos.min.time.before.relogin
sasl.kerberos.principal.to.local.rules
sasl.kerberos.service.name
sasl.kerberos.ticket.renew.jitter
sasl.kerberos.ticket.renew.window.
factor
sasl.login.refresh.buffer.seconds
sasl.login.refresh.min.period.seconds
sasl.login.refresh.window.factor
sasl.login.refresh.window.jitter
sasl.mechanism.inter.broker.protocol
ssl.cipher.suites
ssl.client.auth
ssl.enabled.protocols
ssl.endpoint.identification.algorithm
ssl.engine.factory.class
ssl.key.password
ssl.keymanager.algorithm
ssl.keystore.certificate.chain
ssl.keystore.key
ssl.keystore.location
ssl.keystore.password
ssl.keystore.type
ssl.protocol
ssl.provider
ssl.secure.random.implementation
ssl.trustmanager.algorithm
ssl.truststore.certificates
ssl.truststore.location
ssl.truststore.password
ssl.truststore.type
transaction.partition.verification.
enable
unclean.leader.election.enable
For entity-type 'users':
SCRAM-SHA-256
SCRAM-SHA-512
consumer_byte_rate
controller_mutation_rate
producer_byte_rate
request_percentage
For entity-type 'clients':
consumer_byte_rate
controller_mutation_rate
producer_byte_rate
request_percentage
For entity-type 'ips':
connection_creation_rate
For entity-type 'client-metrics':
interval.ms
match
metrics
For entity-type 'groups':
consumer.heartbeat.interval.ms
consumer.session.timeout.ms
share.auto.offset.reset
share.heartbeat.interval.ms
share.isolation.level
share.record.lock.duration.ms
share.session.timeout.ms
streams.heartbeat.interval.ms
streams.initial.rebalance.delay.ms
streams.num.standby.replicas
streams.session.timeout.ms
Entity types 'users' and 'clients' may
be specified together to update
config for clients of a specific
user.
--add-config-file <String> Path to a properties file with configs
to add. See add-config for a list of
valid configurations.
--all List all configs for the given entity,
including static configs if
available.
--alter Alter the configuration for the entity.
--bootstrap-controller <String: The Kafka controllers to connect to.
controller to connect to>
--bootstrap-server <String: server to The Kafka servers to connect to.
connect to>
--broker <String> The broker's ID.
--broker-defaults The config defaults for all brokers.
--broker-logger <String> The broker's ID for its logger config.
--client <String> The client's ID.
--client-defaults The config defaults for all clients.
--client-metrics <String> The client metrics config resource
name.
--command-config <String: command Property file containing configs to be
config property file> passed to Admin Client. This is used
only with --bootstrap-server option
for describing and altering broker
configs.
--delete-config <String> config keys to remove 'k1,k2'
--describe List configs for the given entity.
--entity-default Default entity name for
clients/users/brokers/ips (applies
to corresponding entity type)
--entity-name <String> Name of entity (topic name/client
id/user principal name/broker
id/ip/client metrics/group id)
--entity-type <String> Type of entity
(topics/clients/users/brokers/broker-
loggers/ips/client-metrics/groups)
--group <String> The group's ID.
--help Print usage information.
--ip <String> The IP address.
--ip-defaults The config defaults for all IPs.
--topic <String> The topic's name.
--user <String> The user's principal name.
--user-defaults The config defaults for all users.
--version Display Kafka version.
kafka-verifiable-producer.sh
The kafka-verifiable-producer tool produces increasing integers to the specified topic and prints JSON metadata to STDOUT on each send request. This tool shows which messages have been acked and which have not. This tool is intended for internal testing.
Usage details
usage: verifiable-producer [-h] --topic TOPIC [--max-messages MAX-MESSAGES] [--throughput THROUGHPUT] [--acks ACKS]
[--producer.config CONFIG-FILE] [--message-create-time CREATE-TIME] [--value-prefix VALUE-PREFIX]
[--repeating-keys REPEATING-KEYS] [--command-config CONFIG-FILE]
--bootstrap-server HOST1:PORT1[,HOST2:PORT2[...]]
This tool produces increasing integers to the specified topic and prints JSON metadata to
stdout on each "send" request, making externally visible which messages have been acked and
which have not.
optional arguments:
-h, --help show this help message and exit
--topic TOPIC Produce messages to this topic.
--max-messages MAX-MESSAGES
Produce this many messages. If -1, produce messages until the process is killed externally. Default: -1.
--throughput THROUGHPUT
If set >= 0, throttle maximum message throughput to *approximately* THROUGHPUT messages/sec. Default: -1.
--acks ACKS Acks required on each produced message. See Kafka docs on acks for details. Default: -1.
--producer.config CONFIG-FILE
DEPRECATED: Producer configuration properties file. This option will be removed in a future version. Use --command-
config instead.
--message-create-time CREATE-TIME
Send messages with creation time starting at the arguments value, in milliseconds since epoch. Default: -1.
--value-prefix VALUE-PREFIX
If specified, each produced value will have this prefix with a dot separator
--repeating-keys REPEATING-KEYS
If specified, each produced record will have a key starting at 0 increment by 1 up to the number specified,
then the key is set to 0 again
--command-config CONFIG-FILE
Configuration properties file; configuration options shared with command line parameters will be overridden.
Connection Group:
Group of arguments for connection to brokers
--bootstrap-server HOST1:PORT1[,HOST2:PORT2[...]]
REQUIRED: The servers to connect to. Comma-separated list of Kafka brokers in the form HOST1:PORT1,HOST2:
PORT2,...
kafka-console-consumer.sh
Use the kafka-console-consumer tool to consume records from a topic. Requires bootstrap-server parameter that contains a comma-separated list of <hostname:port> entries specifying the server/port to connect to.
Confluent Tip
If you are using Confluent, you can use the Confluent CLI and the kafka topic command to produce and consume from a topic.
Example:
bin/kafka-console-consumer.sh --bootstrap-server HOST1:PORT1,HOST2:PORT2 --consumer.config config.properties --topic testTopic --property "print.key=true"
Usage details
This tool helps to read data from Kafka topics and outputs it to standard output.
Option Description
------ -----------
--bootstrap-server <String: server to REQUIRED: The servers to connect to.
connect to>
--command-config <String: config file> Consumer configuration properties file. Note
that command-property takes
precedence over this configuration.
--command-property <String: Consumer configuration properties in the form
consumer_prop> key=value.
--consumer-property <String: DEPRECATED: Consumer configuration
consumer_prop> properties in the form key=value.
This option will be removed in a
future version. Use --command-
property instead.
--consumer.config <String: config file> DEPRECATED: Consumer configuration
properties file. Note that [command-
property] takes precedence over this
configuration. This option will be removed
in a future version. Use --command-
config instead.
--enable-systest-events Log lifecycle events of the consumer
in addition to logging consumed
messages. (This is specific for
system tests.)
--formatter <String: class> The name of a class to use for
formatting kafka messages for
display. (default: org.apache.kafka.
tools.consumer.
DefaultMessageFormatter)
--formatter-config <String: config Configuration properties file to initialize
file> the message formatter. Note that
formatter-property takes
precedence over this configuration.
--formatter-property <String: prop> The properties to initialize the
message formatter. Default
properties include:
print.timestamp=true|false
print.key=true|false
print.offset=true|false
print.epoch=true|false
print.partition=true|false
print.headers=true|false
print.value=true|false
key.separator=<key.separator>
line.separator=<line.separator>
headers.separator=<headers.separator>
null.literal=<null.literal>
key.deserializer=<key.deserializer>
value.deserializer=<value.
deserializer>
header.deserializer=<header.
deserializer>
Users can also pass in customized
properties for their formatter; more
specifically, users can pass in
properties keyed with 'key.
deserializer.', 'value.
deserializer.' and 'headers.
deserializer.' prefixes to configure
their deserializers.
--from-beginning If the consumer does not already have
an established offset to consume
from, start with the earliest
message present in the log rather
than the latest message.
--group <String: consumer group id> The consumer group id of the consumer.
--help Print usage information.
--include <String: Java regex (String)> Regular expression specifying list of
topics to include for consumption.
--isolation-level <String> Set to read_committed in order to
filter out transactional messages
which are not committed. Set to
read_uncommitted to read all
messages. (default: read_uncommitted)
--key-deserializer <String: The name of the class to use for
deserializer for keys> deserializing keys.
--max-messages <Integer: num_messages> The maximum number of messages to
consume before exiting. If not set,
consumption is continual.
--offset <String: consume offset> The offset to consume from (a non-
negative number), or 'earliest'
which means from beginning, or
'latest' which means from end
(default: latest)
--partition <Integer: partition> The partition to consume from.
Consumption starts from the end of
the partition unless '--offset' is
specified.
--property <String: prop> DEPRECATED: The properties to
initialize the message formatter.
Default properties include:
print.timestamp=true|false
print.key=true|false
print.offset=true|false
print.epoch=true|false
print.partition=true|false
print.headers=true|false
print.value=true|false
key.separator=<key.separator>
line.separator=<line.separator>
headers.separator=<headers.separator>
null.literal=<null.literal>
key.deserializer=<key.deserializer>
value.deserializer=<value.
deserializer>
header.deserializer=<header.
deserializer>
Users can also pass in customized
properties for their formatter; more
specifically, users can pass in
properties keyed with 'key.
deserializer.', 'value.
deserializer.' and 'headers.
deserializer.' prefixes to configure
their deserializers.
This option will be removed in a
future version. Use --formatter-
property instead.
--skip-message-on-error If there is an error when processing a
message, skip it instead of halt.
--timeout-ms <Long: timeout_ms> If specified, exit if no message is
available for consumption for the
specified interval.
--topic <String: topic> The topic to consume from.
--value-deserializer <String: The name of the class to use for
deserializer for values> deserializing values.
--version Display Kafka version.
kafka-console-producer.sh
Use the kafka-console-producer tool to produce records to a topic. Requires a bootstrap-server parameter that contains a comma-separated list of <hostname:port> entries specifying the server/port to connect to. Example:
kafka-console-producer.sh --bootstrap-server HOST1:PORT1,HOST2:PORT2 --producer.config config.properties --topic testTopic --property "parse.key=true" --property "key.separator=:"
Confluent Tip
If you are using Confluent, you can use the Confluent CLI and the kafka topic command to produce and consume from a topic.
Usage details
This tool helps to read data from standard input and publish it to Kafka.
Option Description
------ -----------
--batch-size <Integer: size> The buffer size in bytes allocated for
a partition. When records are
received which are smaller than this
size the producer will attempt to
optimistically group them together
until this size is reached. This is
the option to control batch.size in
producer configs. Please note that
this option will be replaced if max-
partition-memory-bytes is also set.
Default: 16384.
--bootstrap-server <String: server to REQUIRED: The servers to connect to.
connect to> The broker list string in the form
HOST1:PORT1,HOST2:PORT2.
--command-config <String: config file> Producer configuration properties file. Note
that command-property takes
precedence over this configuration.
--command-property <String: Producer configuration properties in the form
producer_prop> key=value.
--compression-codec [String: The compression codec: either 'none',
compression-codec] 'gzip', 'snappy', 'lz4', or 'zstd'.
If specified without value, then it
defaults to 'gzip'
--help Print usage information.
--line-reader <String: reader_class> The class name of the class to use for
reading lines from standard in. By
default each line is read as a
separate message. (default: org.
apache.kafka.tools.LineMessageReader)
--max-block-ms <Long: max block on The max time that the producer will
send> block for during a send request.
Default: 60000.
--max-memory-bytes <Long: total memory The total memory used by the producer
in bytes> to buffer records waiting to be sent
to the server. This is the option to
control `buffer.memory` in producer
configs. Default: 33554432.
--max-partition-memory-bytes <Integer: DEPRECATED: The buffer size in bytes
memory in bytes per partition> allocated for a partition. When
records are received which are
smaller than this size the producer
will attempt to optimistically group
them together until this size is
reached. This is the option to
control batch.size in producer
configs. This option will be removed
in Apache Kafka 5.0. Use --batch-
size instead. Default: 16384.
--message-send-max-retries <Integer> Brokers can fail receiving the message
for multiple reasons, and being
unavailable transiently is just one
of them. This property specifies the
number of retries before the
producer give up and drop this
message. This is the option to
control `retries` in producer
configs. Default: 3.
--metadata-expiry-ms <Long: metadata The period of time in milliseconds
expiration interval> after which we force a refresh of
metadata even if we haven't seen any
leadership changes. This is the
option to control `metadata.max.age.
ms` in producer configs. Default:
300000.
--producer-property <String: DEPRECATED: Producer configuration
producer_prop> properties in the form key=value.
This option will be removed in a
future version. Use --command-
property instead.
--producer.config <String: config file> DEPRECATED: Producer configuration
properties file. Note that command-
property takes precedence over this
configuration. This option will be removed
in a future version. Use --command-
config instead.
--property <String: prop> DEPRECATED: A mechanism to pass user-
defined properties in the form
key=value to the message reader.
This allows custom configuration for
a user-defined message reader.
Default properties include:
parse.key=false
parse.headers=false
ignore.error=false
key.separator=\t
headers.delimiter=\t
headers.separator=,
headers.key.separator=:
null.marker= When set, any fields
such as key, value and headers equal to
this will be replaced by null
Default parsing pattern when:
parse.headers=true and parse.key=true:
"h1:v1,h2:v2...\tkey\tvalue"
parse.key=true:
"key\tvalue"
parse.headers=true:
"h1:v1,h2:v2...\tvalue"
This option will be removed in a
future version. Use --reader-
property instead.
--reader-config <String: config file> Configuration properties file for the message
reader. Note that reader-property
takes precedence over this configuration.
--reader-property <String: prop> A mechanism to pass user-defined
properties in the form key=value to
the message reader. This allows
custom configuration for a user-
defined message reader.
Default properties include:
parse.key=false
parse.headers=false
ignore.error=false
key.separator=\t
headers.delimiter=\t
headers.separator=,
headers.key.separator=:
null.marker= When set, any fields
such as key, value and headers equal to
this will be replaced by null
Default parsing pattern when:
parse.headers=true and parse.key=true:
"h1:v1,h2:v2...\tkey\tvalue"
parse.key=true:
"key\tvalue"
parse.headers=true:
"h1:v1,h2:v2...\tvalue"
--request-required-acks <String: The required `acks` of the producer
request required acks> requests (default: -1)
--request-timeout-ms <Integer: request The ack timeout of the producer
timeout ms> requests. Value must be non-negative
and non-zero. (default: 1500)
--retry-backoff-ms <Long> Before each retry, the producer
refreshes the metadata of relevant
topics. Since leader election takes
a bit of time, this property
specifies the amount of time that
the producer waits before refreshing
the metadata. This is the option to
control `retry.backoff.ms` in
producer configs. Default: 100.
--socket-buffer-size <Integer: size> The size of the tcp RECV size. This is
the option to control `send.buffer.
bytes` in producer configs.
Default: 102400.
--sync If set message send requests to the
brokers are synchronously, one at a
time as they arrive.
--timeout <Long: timeout_ms> If set and the producer is running in
asynchronous mode, this gives the
maximum amount of time a message
will queue awaiting sufficient batch
size. The value is given in ms. This
is the option to control `linger.ms`
in producer configs. Default: 1000.
--topic <String: topic> REQUIRED: The topic name to produce
messages to.
--version Display Kafka version.
kafka-producer-perf-test.sh
The kafka-producer-perf-test tool enables you to produce a large quantity of data to test producer performance for the Kafka cluster.
Example:
bin/kafka-producer-perf-test.sh --topic topic-a --num-records 200000 --record-size 1000 --throughput 10000000 --producer-props bootstrap.servers=host1:9092
Usage details
usage: kafka-producer-perf-test
[-h] [--bootstrap-server BOOTSTRAP-SERVER] --topic TOPIC --num-records NUM-RECORDS [--payload-delimiter PAYLOAD-DELIMITER]
--throughput THROUGHPUT [--producer-props PROP-NAME=PROP-VALUE [PROP-NAME=PROP-VALUE ...]]
[--command-property PROP-NAME=PROP-VALUE [PROP-NAME=PROP-VALUE ...]] [--producer.config CONFIG-FILE] [--command-config CONFIG-FILE]
[--print-metrics] [--transactional-id TRANSACTIONAL-ID] [--transaction-duration-ms TRANSACTION-DURATION]
[--warmup-records WARMUP-RECORDS] [--reporting-interval INTERVAL-MS] (--record-size RECORD-SIZE | --payload-file PAYLOAD-FILE |
--payload-monotonic)
This tool is used to verify the producer performance. To enable transactions, you can specify a transactional id or set a transaction
duration using --transaction-duration-ms. There are three ways to specify the transactional id: set transactional.id=<id> via --command-
property, set transactional.id=<id> in the config file via --command-config, or use --transactional-id <id>.
optional arguments:
-h, --help show this help message and exit
--bootstrap-server BOOTSTRAP-SERVER
The servers to connect to. This configuration takes precedence over bootstrap.servers specified via --command-
property or --command-config.
--topic TOPIC Produce records to this topic.
--num-records NUM-RECORDS
Number of records to produce.
--payload-delimiter PAYLOAD-DELIMITER
Provides the delimiter to be used when --payload-file is provided. Defaults to new line. Note that this
parameter will be ignored if --payload-file is not provided. (default: \n)
--throughput THROUGHPUT
Throttle maximum record throughput to *approximately* THROUGHPUT records/sec. Set this to -1 to disable
throttling.
--producer-props PROP-NAME=PROP-VALUE [PROP-NAME=PROP-VALUE ...]
DEPRECATED: Kafka producer related configuration properties like client.id. These configurations take precedence over
those passed via --command-config or --producer.config. This option will be removed in a future version. Use --
command-property instead.
--command-property PROP-NAME=PROP-VALUE [PROP-NAME=PROP-VALUE ...]
Kafka producer related configuration properties like client.id. These configurations take precedence over those passed
via --command-config or --producer.config.
--producer.config CONFIG-FILE
DEPRECATED: Producer configuration properties file. This option will be removed in a future version. Use --command-
config instead.
--command-config CONFIG-FILE
Producer configuration properties file.
--print-metrics Print out metrics at the end of the test. (default: false)
--transactional-id TRANSACTIONAL-ID
The transactional id to use. This config takes precedence over the transactional.id specified via --command-
property or --command-config. Note that if the transactional id is not specified while --transaction-duration-ms
is provided, the default value for the transactional id will be performance-producer- followed by a random uuid.
--transaction-duration-ms TRANSACTION-DURATION
The maximum duration of each transaction. The commitTransaction will be called after this time has elapsed. The
value should be greater than 0. If the transactional id is specified via --command-property, --command-config or
--transactional-id but --transaction-duration-ms is not specified, the default value will be 3000.
--warmup-records WARMUP-RECORDS
The number of records to treat as warmup. These initial records will not be included in steady-state statistics.
An additional summary line will be printed describing the steady-state statistics. (default: 0)
--reporting-interval INTERVAL-MS
Interval in milliseconds at which to print progress info. (default: 5000)
Note that you must provide exactly one of --record-size, --payload-file or --payload-monotonic.
--record-size RECORD-SIZE
Record size in bytes. Note that you must provide exactly one of --record-size, --payload-file or --payload-
monotonic.
--payload-file PAYLOAD-FILE
File to read the record payloads from. This works only for UTF-8 encoded text files. Payloads will be read from
this file and a payload will be randomly selected when sending records. Note that you must provide exactly one
of --record-size, --payload-file or --payload-monotonic.
--payload-monotonic Payload is a monotonically increasing integer. Note that you must provide exactly one of --record-size, --
payload-file or --payload-monotonic. (default: false)
kafka-groups.sh
This tool helps to list groups of all types.
Usage details
Option (* = required) Description
--------------------- -----------
* --bootstrap-server <String: server REQUIRED: The Kafka server to connect
to connect to> to.
--command-config <String: command Property file containing configurations to be
config property file> passed to Admin Client.
--consumer Filter the groups to show all kinds of
consumer groups, including classic
and simple consumer groups. This
matches group type 'consumer', and
group type 'classic' where the
protocol type is 'consumer' or empty.
--group-type <String: type> Filter the groups based on group type.
Valid types are: 'classic',
'consumer', 'share' and 'streams'.
--help Print usage information.
--list List the groups.
--protocol <String: protocol> Filter the groups based on protocol
type.
--share Filter the groups to show share groups.
--streams Filter the groups to show streams
groups.
--version Display Kafka version.
kafka-consumer-groups.sh
Use the kafka-consumer-groups tool to get a list of the active groups in the cluster.
For example, to show the position of all consumers in a group named user-group, you might use the following command.
bin/kafka-consumer-groups.sh \
--bootstrap-server localhost:9092 \
--describe --group user-group
This would result in output like the following (CONSUMER-ID entries truncated for readability).
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
user 0 2 4 2 consumer-1-... /127.0.0.1 consumer-1
user 1 2 3 1 consumer-1-... /127.0.0.1 consumer-1
user 2 2 3 1 consumer-2-... /127.0.0.1 consumer-2
For more examples, see View Consumer Group Info in Kafka.
Usage details
Option Description
------ -----------
--all-groups Apply to all consumer groups.
--all-topics Consider all topics assigned to a
group in the `reset-offsets` process.
--bootstrap-server <String: server to The servers to connect to. REQUIRED
connect to> for all options except for --
validate-regex.
--by-duration <String: duration> Reset offsets to offset by duration
from current timestamp. Format:
'PnDTnHnMnS'
--command-config <String: command Property file containing configurations to be
config property file> passed to Admin Client and Consumer.
--delete Pass in groups to delete topic
partition offsets and ownership
information over the entire consumer
group. For instance --group g1 --
group g2
--delete-offsets Delete offsets of consumer group.
Supports one consumer group at the
time, and multiple topics.
--describe Describe consumer group and list
offset lag (number of messages not
yet processed) related to given
group.
--dry-run Only show results without executing
changes on Consumer Groups.
Supported operations: reset-offsets.
--execute Execute operation. Supported
operations: reset-offsets.
--export Export operation execution to a CSV
file. Supported operations: reset-
offsets.
--from-file <String: path to CSV file> Reset offsets to values defined in CSV
file.
--group <String: consumer group> The consumer group we wish to act on.
--help Print usage information.
--list List all consumer groups.
--members Describe members of the group. This
option may be used with '--describe'
and '--bootstrap-server' options
only.
Example: --bootstrap-server localhost:
9092 --describe --group group1 --
members
--offsets Describe the group and list all topic
partitions in the group along with
their offset lag. This is the
default sub-action of and may be
used with '--describe' and '--
bootstrap-server' options only.
Example: --bootstrap-server localhost:
9092 --describe --group group1 --
offsets
--reset-offsets Reset offsets of consumer group.
Supports one consumer group at the
time, and instances should be
inactive
Has 2 execution options: --dry-run
(the default) to plan which offsets
to reset, and --execute to update
the offsets. Additionally, the --
export option is used to export the
results to a CSV format.
You must choose one of the following
reset specifications: --to-datetime,
--by-duration, --to-earliest, --to-
latest, --shift-by, --from-file, --
to-current, --to-offset.
To define the scope use --all-topics
or --topic. One scope must be
specified unless you use '--from-
file'.
--shift-by <Long: number-of-offsets> Reset offsets shifting current offset
by 'n', where 'n' can be positive or
negative.
--state [String] When specified with '--describe',
includes the state of the group.
Example: --bootstrap-server localhost:
9092 --describe --group group1 --
state
When specified with '--list', it
displays the state of all groups. It
can also be used to list groups with
specific states.
Example: --bootstrap-server localhost:
9092 --list --state stable,empty
This option may be used with '--
describe', '--list' and '--bootstrap-
server' options only.
--timeout <Long: timeout (ms)> The timeout that can be set for some
use cases. For example, it can be
used when describing the group to
specify the maximum amount of time
in milliseconds to wait before the
group stabilizes (when the group is
just created, or is going through
some changes). (default: 30000)
--to-current Reset offsets to current offset.
--to-datetime <String: datetime> Reset offsets to offset from datetime.
Format: 'YYYY-MM-DDThh:mm:ss.sss'
--to-earliest Reset offsets to earliest offset.
--to-latest Reset offsets to latest offset.
--to-offset <Long: offset> Reset offsets to a specific offset.
--topic <String: topic> The topic whose consumer group
information should be deleted or
topic whose should be included in
the reset offset process. In `reset-
offsets` case, partitions can be
specified using this format: `topic1:
0,1,2`, where 0,1,2 are the
partition to be included in the
process. Reset-offsets also supports
multiple topic inputs.
--type [String] When specified with '--list', it
displays the types of all the
groups. It can also be used to list
groups with specific types.
Example: --bootstrap-server localhost:
9092 --list --type classic, consumer
This option may be used with the '--list'
option only.
--validate-regex <String: regex> Validate that the syntax of the
provided regular expression is valid
according to the RE2 format.
--verbose Provide additional information, if
any, when describing the group. This
option may be used with '--
offsets'/'--members'/'--state' and
'--bootstrap-server' options only.
Example: --bootstrap-server localhost:
9092 --describe --group group1 --
members --verbose
--version Display Kafka version.
kafka-consumer-perf-test.sh
This tool tests the consumer performance for the Kafka cluster.
Usage details
Option Description
------ -----------
--bootstrap-server <String: server to REQUIRED: The servers to connect to.
connect to>
--command-config <String: config file> Configuration properties file.
--command-property <String: prop1=val1> Kafka consumer related configuration
properties like client.id. These
configurations take precedence over those
passed via --command-config or --
consumer.config.
--consumer.config <String: config file> DEPRECATED: Consumer configuration
properties file. This option will be
removed in a future version. Use --
command-config instead.
--date-format <String: date format> The date format to use for formatting
the time field. See java.text.
SimpleDateFormat for options.
Default: yyyy-MM-dd HH:mm:ss:SSS.
--fetch-size <Integer: size> The maximum amount of data to fetch
from a single partition per request.
Default: 1048576.
--from-latest If the consumer does not already have
an established offset to consume
from, start with the latest record
present in the log rather than the
earliest record.
--group <String: gid> The group id to consume on. Default:
perf-consumer-91646.
--help Print usage information.
--hide-header If set, skips printing the header for
the stats.
--include <String: Java regex (String)> Regular expression specifying list of
topics to include for consumption.
--messages <Long: count> DEPRECATED: The number of records to
consume. This option will be removed
in a future version. Use --num-
records instead.
--num-records <Long: count> REQUIRED: The number of records to
consume.
--print-metrics Print out the metrics.
--reporting-interval <Long: Interval in milliseconds at which to
interval_ms> print progress info. Default: 5000.
--show-detailed-stats If set, stats are reported for each
reporting interval as configured by
reporting-interval.
--socket-buffer-size <Integer: size> The size of the tcp RECV size.
Default: 2097152.
--timeout [Long: milliseconds] The maximum allowed time in
milliseconds between returned
records. Default: 10000.
--topic <String: topic> The topic to consume from.
--version Display Kafka version.
Manage Kafka Connect
connect-distributed.sh
Use the connect-distributed tool to run Connect workers in Distributed mode, meaning on multiple, distributed, machines. Distributed mode handles automatic balancing of work, allows you to scale up (or down) dynamically, and offers fault tolerance both in the active tasks and for configuration and offset commit data.
Usage details
USAGE: ./connect-distributed.sh [-daemon] connect-distributed.properties
connect-standalone.sh
Use the connect-standalone tool to run Kafka Connect in standalone mode meaning all work is performed in a single process. This is good for getting started but lacks fault tolerance. For more information, see Kafka Connect.
Usage details
USAGE: ./connect-standalone.sh [-daemon] connect-standalone.properties
connect-mirror-maker.sh
Use the connect-mirror-maker tool to replicate topics from one cluster to another using the Connect framework. You must pass an an mm2.properties MM2 configuration file. For more information, see KIP-382: MirrorMaker 2.0 or Getting up to speed with MirrorMaker 2.
Usage details
usage: connect-mirror-maker [-h] [--clusters CLUSTER [CLUSTER ...]] mm2.properties
MirrorMaker 2.0 driver
positional arguments:
mm2.properties MM2 configuration file.
optional arguments:
-h, --help show this help message and exit
--clusters CLUSTER [CLUSTER ...]
Target cluster to use for this node.
Manage Kafka Streams
kafka-streams-application-reset.sh
For Kafka Streams applications, the kafka-streams-application-reset tool resets the application and forces it to reprocess its data from the beginning. Useful for debugging and testing.
For example, the following command would reset the my-streams-app application:
kafka-streams-application-reset.sh --application-id my-streams-app \
--input-topics my-input-topic \
--intermediate-topics rekeyed-topic
For more information, see Kafka Streams Application Reset Tool in the Confluent documentation.
Usage details
This tool helps to quickly reset an application in order to reprocess its data from scratch.
* This tool resets offsets of input topics to the earliest available offset (by default), or to a specific
defined position.
* This tool deletes the internal topics that were created by Kafka Streams (topics starting with "<application.id>-").
The tool finds these internal topics automatically. If the topics flagged automatically for deletion by
the dry-run are unsuitable, you can specify a subset with the "--internal-topics" option.
* This tool will not delete output topics (if you want to delete them, you need to do it yourself with the bin/kafka-topics.sh command).
* This tool will not clean up the local state on the stream application instances (the persisted stores
used to cache aggregation results). You need to call KafkaStreams#cleanUp() in your application
or manually delete them from the directory specified by "state.dir"
configuration (${java.io.tmpdir}/kafka-streams/<application.id> by default).
* When long session timeout has been configured, active members could take longer to get expired on the
broker thus blocking the reset job to complete. Use the "--force" option could remove those
left-over members immediately. Make sure to stop all stream applications when this option
is specified to avoid unexpected disruptions.
*** Important! You will get wrong output if you don't clean up the local stores after running the reset tool!
*** Warning! This tool makes irreversible changes to your application. It is strongly recommended that you run this once with "--dry-run" to preview your changes before making them.
Option (* = required) Description
--------------------- -----------
* --application-id <String: id> REQUIRED: The Kafka Streams
application ID (application.id).
--bootstrap-server <String: server to The servers to connect to. The
connect to> broker list string in the form HOST1:
PORT1,HOST2:PORT2. Default:
localhost:9092.
--by-duration <String> Reset offsets to offset by duration
from current timestamp. Format:
'PnDTnHnMnS'
--command-config <String: file name> Configuration properties file to be passed to
admin clients and embedded consumer.
--config-file <String: file name> DEPRECATED: Property file containing
configs to be passed to admin
clients and embedded consumer. This
option will be removed in a future
version. Use --command-config
instead.
--dry-run Display the actions that would be
performed without executing the reset
commands.
--force Force the removal of members of the
consumer group (intended to remove
stopped members if a long session
timeout was used). Make sure to shut
down all stream applications when this
option is specified to avoid unexpected
rebalances.
--from-file <String> Reset offsets to values defined in CSV
file.
--help Print usage information.
--input-topics <String: list> Comma-separated list of user input
topics. For these topics, the tool by
default will reset the offset to the
earliest available offset. Reset to
other offset position by appending
other reset offset option, ex: --input-
topics foo --shift-by 5
--intermediate-topics <String: list> [deprecated] Comma-separated list of
intermediate user topics (topics
that are input and output topics).
For these topics, the tool will skip
to the end.
--internal-topics <String: list> Comma-separated list of internal topics
to delete. Must be a subset of the
internal topics marked for deletion by
the default behaviour (do a dry-run
without this option to view these
topics).
--shift-by <Long: number-of-offsets> Reset offsets shifting current offset by
'n', where 'n' can be positive or
negative
--to-datetime <String> Reset offsets to offset from datetime.
Format: 'YYYY-MM-DDThh:mm:ss.sss'
--to-earliest Reset offsets to earliest offset.
--to-latest Reset offsets to latest offset.
--to-offset <Long> Reset offsets to a specific offset.
--version Display Kafka version.
kafka-streams-groups.sh
Use this tool to list, or describe streams groups.
Usage details
This tool helps to list, or describe streams groups.
Option Description
------ -----------
--all-groups Apply to all streams groups.
--all-input-topics Consider all source topics used in the
topology of the group. Supported
operations: delete-offsets, reset-
offsets.
--bootstrap-server <String: server to REQUIRED: The servers to connect to.
connect to>
--by-duration <String: duration> Reset offsets to offset by duration
from current timestamp. Format:
'PnDTnHnMnS'
--command-config <String: command Property file containing configurations to be
config property file> passed to Admin Client.
--delete Pass in groups to delete topic
partition offsets and ownership
information over the entire streams
group. For instance --group g1 --
group g2
--delete-all-internal-topics Delete all internal topics linked to
the streams group. Supported
operations: reset-offsets, delete.
With reset-offsets, this option is
applicable only when --execute is
used.
--delete-internal-topic <String> Delete specified internal topic of the
streams group. Supported operations:
reset-offsets.This option is
applicable only when --execute is
used.
--delete-offsets Delete offsets of streams group.
Supports one streams group at the
time, and multiple topics.
--describe Describe streams group and list offset
lag related to given group.
--dry-run Only show results without executing
changes on streams group. Supported
operations: reset-offsets.
--execute Execute operation. Supported
operations: reset-offsets.
--export Export operation execution to a CSV
file. Supported operations: reset-
offsets.
--from-file <String: path to CSV file> Reset offsets to values defined in CSV
file.
--group <String: streams group> The streams group we wish to act on.
--help Print usage information.
--input-topic <String: topic> The input topic whose committed offset
should be deleted or reset. In
`reset-offsets` case, partitions can
be specified using this format:
`topic1:0,1,2`, where 0,1,2 are the
partition to be included in the
process. Multiple input topics can
be specified. Supported operations:
delete-offsets, reset-offsets.
--list List all streams groups.
--members Describe members of the group. This
option may be used with the '--
describe' option only.
--offsets Describe the group and list all topic
partitions in the group along with
their offset information.This is the
default sub-action and may be used
with the '--describe' option only.
--reset-offsets Reset offsets of streams group. The
instances should be inactive.
Has 2 execution options: --dry-run to
plan which offsets to reset, and --
execute to update the offsets.
If you use --execute, all internal
topics linked to the group will also
be deleted.
You must choose one of the following
reset specifications: --to-datetime,
--by-duration, --to-earliest, --to-
latest, --shift-by, --from-file, --
to-current, --to-offset.
To define the scope use --all-input-
topics or --input-topic. One scope
must be specified unless you use '--
from-file'.
Fails if neither '--dry-run' nor '--
execute' is specified.
--shift-by <Long: number-of-offsets> Reset offsets shifting current offset
by 'n', where 'n' can be positive or
negative.
--state [String] When specified with '--list', it
displays the state of all groups. It
can also be used to list groups with
specific states. Valid values are
Empty, NotReady, Stable, Assigning,
Reconciling, and Dead.
--timeout <Long: timeout (ms)> The timeout that can be set for some
use cases. For example, it can be
used when describing the group to
specify the maximum amount of time
in milliseconds to wait before the
group stabilizes. Default: 30000.
--to-current Reset offsets to current offset.
--to-datetime <String: datetime> Reset offsets to offset from datetime.
Format: 'YYYY-MM-DDThh:mm:ss.sss'
--to-earliest Reset offsets to earliest offset.
--to-latest Reset offsets to latest offset.
--to-offset <Long: offset> Reset offsets to a specific offset.
--verbose Use with --describe --state to show
group epoch and target assignment
epoch.
Use with --describe --members to show
for each member the member epoch,
target assignment epoch, current
assignment, target assignment, and
whether member is still using the
classic rebalance protocol.
Use with --describe --offsets and --
describe to show leader epochs for
each partition.
--version Display Kafka version.
Manage security
kafka-acls
Use the kafka-acls tool to add, remove and list ACLs. For example, if you wanted to add two principal users, Jose and Jane to have read and write permissions on the user topic from specific IP addresses, you could use a command like the following:
bin/kafka-acls.sh --bootstrap-server localhost:9092 --add --allow-principal User:Jose --allow-principal User:Jane --allow-host 198.51.100.0 --allow-host 198.51.100.1 --operation Read --operation Write --topic user
Usage details
This tool helps to manage acls on kafka.
Option Description
------ -----------
--add Indicates you are trying to add ACLs.
--allow-host <String: allow-host> Host from which principals listed in --
allow-principal will have access. If
you have specified --allow-principal
then the default for this option
will be set to '*' which allows
access from all hosts.
--allow-principal <String: allow- principal is in principalType:name
principal> format. Note that principalType must
be supported by the Authorizer being
used. For example, User:'*' is the
wild card indicating all users.
--bootstrap-controller <String: A list of host/port pairs to use for
controller to connect to> establishing the connection to the
Kafka cluster. This list should be
in the form host1:port1,host2:
port2,... This config is required
for acl management using admin
client API.
--bootstrap-server <String: server to A list of host/port pairs to use for
connect to> establishing the connection to the
Kafka cluster. This list should be
in the form host1:port1,host2:
port2,... This config is required
for acl management using admin
client API.
--cluster Add/Remove cluster ACLs.
--command-config [String: command- A property file containing configs to
config] be passed to Admin Client.
--consumer Convenience option to add/remove ACLs
for consumer role. This will
generate ACLs that allows READ,
DESCRIBE on topic and READ on group.
--delegation-token <String: delegation- Delegation token to which ACLs should
token> be added or removed. A value of '*'
indicates ACL should apply to all
tokens.
--deny-host <String: deny-host> Host from which principals listed in --
deny-principal will be denied
access. If you have specified --deny-
principal then the default for this
option will be set to '*' which
denies access from all hosts.
--deny-principal <String: deny- principal is in principalType:name
principal> format. By default anyone not added
through --allow-principal is denied
access. You only need to use this
option as negation to already
allowed set. Note that principalType
must be supported by the Authorizer
being used. For example if you
wanted to allow access to all users
in the system but not test-user you
can define an ACL that allows access
to User:'*' and specify --deny-
principal=User:test@EXAMPLE.COM. AND
PLEASE REMEMBER DENY RULES TAKES
PRECEDENCE OVER ALLOW RULES.
--force Assume Yes to all queries and do not
prompt.
--group <String: group> Consumer Group to which the ACLs
should be added or removed. A value
of '*' indicates the ACLs should
apply to all groups.
--help Print usage information.
--idempotent Enable idempotence for the producer.
This should be used in combination
with the --producer option. Note
that idempotence is enabled
automatically if the producer is
authorized to a particular
transactional-id.
--list List ACLs for the specified resource,
use --topic <topic> or --group
<group> or --cluster to specify a
resource.
--operation <String> Operation that is being allowed or
denied. Valid operation names are:
Create
DescribeTokens
Delete
DescribeConfigs
IdempotentWrite
Alter
TwoPhaseCommit
ClusterAction
Write
Describe
All
AlterConfigs
Read
CreateTokens
(default: All)
--principal [String: principal] List ACLs for the specified principal.
principal is in principalType:name
format. Note that principalType must
be supported by the Authorizer being
used. Multiple --principal option
can be passed.
--producer Convenience option to add/remove ACLs
for producer role. This will
generate ACLs that allows WRITE,
DESCRIBE and CREATE on topic.
--remove Indicates you are trying to remove
ACLs.
--resource-pattern-type The type of the resource pattern or
<ANY|MATCH|LITERAL|PREFIXED> pattern filter. When adding acls,
this should be a specific pattern
type, e.g. 'literal' or 'prefixed'.
When listing or removing acls, a
specific pattern type can be used to
list or remove acls from specific
resource patterns, or use the filter
values of 'any' or 'match', where
'any' will match any pattern type,
but will match the resource name
exactly, where as 'match' will
perform pattern matching to list or
remove all acls that affect the
supplied resource(s). WARNING:
'match', when used in combination
with the '--remove' switch, should
be used with care. (default: LITERAL)
--topic <String: topic> topic to which ACLs should be added or
removed. A value of '*' indicates
ACL should apply to all topics.
--transactional-id <String: The transactionalId to which ACLs
transactional-id> should be added or removed. A value
of '*' indicates the ACLs should
apply to all transactionalIds.
--user-principal <String: user- Specifies a user principal as a
principal> resource in relation with the
operation. For instance one could
grant CreateTokens or DescribeTokens
permission on a given user principal.
--version Display Kafka version.
kafka-delegation-tokens.sh
Use the kafka-delegation-tokens tool to create, renew, expire and describe delegation tokens. Delegation tokens are shared secrets between Kafka brokers and clients, and are a lightweight authentication mechanism meant to complement existing SASL/SSL methods. For more information, see Authentication using Delegation Tokens in the Confluent Documentation.
Usage details
This tool helps to create, renew, expire, or describe delegation tokens.
Option Description
------ -----------
--bootstrap-server <String> REQUIRED: servers to use for bootstrapping.
--command-config <String> REQUIRED: A property file containing configs to
be passed to Admin Client. Token management
operations are allowed in secure mode only.
This config file is used to pass security
related configs.
--create Create a new delegation token. Use --renewer-
principal option to pass renewer principals.
--describe Describe delegation tokens for the given
principals. Use --owner-principal to pass
owner/renewer principals. If --owner-principal
option is not supplied, all the user-owned
tokens and tokens where the user has Describe
permissions will be returned.
--expire Expire delegation token. Use --expiry-time-
period option to expire the token.
--expiry-time-period [Long] Expiry time period in milliseconds. If the value
is -1, then the token will get invalidated
immediately.
--help Print usage information.
--hmac [String] HMAC of the delegation token
--max-life-time-period [Long] Max life period for the token in milliseconds.
If the value is -1, then token max life time
will default to the server side config value
of (delegation.token.max.lifetime.ms).
--owner-principal [String] owner is a Kafka principal. They should be in
principalType:name format.
--renew Renew delegation token. Use --renew-time-period
option to set renew time period.
--renew-time-period [Long] Renew time period in milliseconds. If the value
is -1, then the renew time period will default
to the server side config value of (delegation.
token.expiry.time.ms).
--renewer-principal [String] renewer is a Kafka principal. They should be in
principalType:name format.
--version Display Kafka version.
Test and troubleshoot
This section contains tools you can use for testing and troubleshooting your applications.
kafka-e2e-latency.sh
The kafka-e2e-latency tool is a performance testing tool used to measure end-to-end latency in Kafka. It works by sending messages to a Kafka topic and then consuming those messages from a Kafka consumer. The tool calculates the time difference between when a message was produced and when it was consumed, giving you an idea of the end-to-end latency for your Kafka cluster. This tool is useful for testing the performance of your Kafka cluster and identifying any bottlenecks or issues that may be affecting latency.
For more about end-to-end latency, see Configure Kafka to Minimize Latency.
For example:
kafka-e2e-latency.sh --bootstrap-server localhost:9092 --topic test --num-records 10000 --producer-acks 1 --record-size 20
Usage details
This tool measures end-to-end latency in Kafka by sending messages and timing their reception.
Option Description
------ -----------
--bootstrap-server <String: bootstrap- REQUIRED: The Kafka broker list string
server> in the form HOST1:PORT1,HOST2:PORT2.
--command-config [String: config-file] Optional: A property file for Kafka
producer/consumer/admin client
configuration.
--help Print usage information.
--num-headers [Integer: count] Optional: The number of headers to
include in each message. (default: 0)
--num-records <Integer: count> REQUIRED: The number of messages to
send.
--producer-acks <String: producer-acks> REQUIRED: Producer acknowledgements.
Must be '1' or 'all'.
--record-header-key-size [Integer: Optional: The size of the message
bytes] header key in bytes. Used together
with record-header-size. (default: 0)
--record-header-size [Integer: bytes] Optional: The size of message header
value in bytes. Use -1 for null
header value. (default: 0)
--record-key-size [Integer: bytes] Optional: The size of the message key
in bytes. If not set, messages are
sent without a key. (default: 0)
--record-size <Integer: bytes> REQUIRED: The size of each message
payload in bytes.
--topic <String: topic-name> REQUIRED: The topic to use for the
test.
--version Display Kafka version.
kafka-dump-log.sh
The kafka-dump-log tool can be used in KRaft mode to parse a metadata log file and output its contents to the console. Requires a comma-separated list of log files. The tool will scan the provided files and decode the metadata records.
The following example shows using the cluster-metadata-decoder argument to decode the metadata records in a log segment.
bin/kafka-dump-log.sh --cluster-metadata-decoder --files tmp/kraft-combined-logs/_cluster_metadata-0/00000000000000023946.log
Usage details
This tool helps to parse a log file and dump its contents to the console, useful for debugging a seemingly corrupt log segment.
Option Description
------ -----------
--cluster-metadata-decoder If set, log data will be parsed as cluster
metadata records.
--deep-iteration If set, uses deep instead of shallow
iteration. Automatically set if print-
data-log is enabled.
--files <String: file1, file2, ...> REQUIRED: The comma separated list of data
and index log files to be dumped.
--help Print usage information.
--index-sanity-check If set, just checks the index sanity
without printing its content. This is
the same check that is executed on
broker startup to determine if an index
needs rebuilding or not.
--key-decoder-class [String] If set, used to deserialize the keys. This
class should implement org.apache.kafka.
tools.api.Decoder trait. Custom jar
should be available in kafka/libs
directory. (default: org.apache.kafka.
tools.api.StringDecoder)
--max-bytes <Integer: size> Limit the amount of total batches read in
bytes avoiding reading the whole .log
file(s). (default: 2147483647)
--max-message-size <Integer: size> Size of largest message. (default: 5242880)
--offsets-decoder If set, log data will be parsed as offset
data from the __consumer_offsets topic.
--print-data-log If set, printing the messages content when
dumping data logs. Automatically set if
any decoder option is specified.
--remote-log-metadata-decoder If set, log data will be parsed as
TopicBasedRemoteLogMetadataManager
(RLMM) metadata records. Instead, the
value-decoder-class option can be used
if a custom RLMM implementation is
configured.
--share-group-state-decoder If set, log data will be parsed as share
group state data from the
__share_group_state topic.
--skip-record-metadata Skip metadata when printing records. This
flag also skips control records.
--transaction-log-decoder If set, log data will be parsed as
transaction metadata from the
__transaction_state topic.
--value-decoder-class [String] If set, used to deserialize the messages.
This class should implement org.apache.
kafka.tools.api.Decoder trait. Custom
jar should be available in kafka/libs
directory. (default: org.apache.kafka.
tools.api.StringDecoder)
--verify-index-only If set, just verify the index log without
printing its content.
--version Display Kafka version.
kafka-jmx.sh
The kafka-jmx tool enables you to read JMX metrics from a given endpoint. This tool only works reliably if the JmxServer is fully initialized prior to invoking the tool.
Usage details
Dump JMX values to standard output.
Option Description
------ -----------
--attributes <String: name> The list of attributes to include in
the query. This is a comma-separated
list. If no attributes are specified
all objects will be queried.
--date-format <String: format> The date format to use for formatting
the time field. See java.text.
SimpleDateFormat for options.
--help Print usage information.
--jmx-auth-prop <String: jmx-auth-prop> A mechanism to pass property in the
form 'username=password' when
enabling remote JMX with password
authentication.
--jmx-ssl-enable <Boolean: ssl-enable> Flag to enable remote JMX with SSL.
(default: false)
--jmx-url <String: service-url> The url to connect to poll JMX data.
See Oracle javadoc for JMXServiceURL
for details. (default: service:jmx:
rmi:///jndi/rmi://:9999/jmxrmi)
--object-name <String: name> A JMX object name to use as a query.
This can contain wild cards, and
this option can be given multiple
times to specify more than one
query. If no objects are specified
all objects will be queried.
--one-time [Boolean: one-time] Flag to indicate run once only.
(default: false)
--report-format <String: report-format> output format name: either 'original',
'properties', 'csv', 'tsv'
(default: original)
--reporting-interval <Integer: ms> Interval in MS with which to poll jmx
stats; default value is 2 seconds.
Value of -1 equivalent to setting
one-time to true (default: 2000)
--version Display Kafka version.
--wait Wait for requested JMX objects to
become available before starting
output. Only supported when the list
of objects is non-empty and contains
no object name patterns.
trogdor.sh
Trogdor is a test framework for Kafka. Trogdor can run benchmarks and other workloads. Trogdor can also inject faults in order to stress test the system. For more information, see Trogdor and TROGDOR.
Usage details
The Trogdor fault injector.
Usage:
./trogdor.sh [action] [options]
Actions:
agent: Run the trogdor agent.
coordinator: Run the trogdor coordinator.
client: Run the client which communicates with the trogdor coordinator.
agent-client: Run the client which communicates with the trogdor agent.
help: This help message.
kafka-run-class.sh
This kafka-run-class tool is a thin wrapper around the Kafka Java class. It is called by other tools, and should not be run or modified directly.
Usage details
USAGE: ./kafka-run-class.sh [-daemon] [-name servicename] [-loggc] classname [opts]