Kafka Command-Line Interface (CLI) Tools¶
Apache Kafka® provides a suite of command-line interface (CLI) tools that can be accessed from the /bin
directory
after downloading and extracting the Kafka files. These tools offer a range of capabilities, including starting
and stopping Kafka, managing topics, and handling partitions. To learn how to use each tool,
simply run it with no argument or use the --help
argument for detailed instructions.
Ready to get started?
- Sign up for Confluent Cloud, the fully managed cloud-native service for Apache Kafka® and get started for free using the Cloud quick start.
- Download Confluent Platform, the self managed, enterprise-grade distribution of Apache Kafka and get started using the Confluent Platform quick start.
The following sections group the tools by function and provide basic usage information. In some cases, a tool is listed in more than one section.
Confluent Tip
$CONFLUENT_HOME/bin
folder when you install Confluent Platform,
along with additional Confluent tools. Confluent has dropped the .sh
extensions,
so you do not need to use the extensions when calling the Confluent versions of these tools. In addition, when you pass a properties file, remember that Confluent Platform properties files are
stored under $CONFLUENT_HOME/etc
directory. For more information,
see CLI Tools for Confluent Platform.Search by tool name¶
Enter a string to search and filter tools by name.
Manage Kafka and configure metadata¶
This section contains tools to start Kafka running either ZooKeeper or in KRaft mode, and to manage brokers.
kafka-server-start.sh¶
Use the kafka-server-start
tool to start a Kafka server. You must pass the path to the properties file you want to use.
If you are using ZooKeeper for metadata management, you must start ZooKeeper first.
For KRaft mode, first generate a cluster ID and store it in the properties file.
For an example of how to start Kafka, see the Kafka quickstart.
Usage details
USAGE: ./kafka-server-start.sh [-daemon] server.properties [--override property=value]*
Option Description
------ -----------
--override <String> Optional property that should override values set in
server.properties file
--version Print version information and exit.
kafka-server-stop.sh¶
Use the kafka-server-stop
tool to stop the running Kafka server.
When you run this tool, you do not need to have any arguments, but starting with
Kafka 3.7 you can optionally specify either a process-role
value of broker
or controller
or a node-id
value indicating the node you want to stop.
For example, to stop all brokers, you would use the following command:
./bin/kafka-server-stop.sh --process-role=broker
To stop node 1234
, you would use the following command.
./bin/kafka-server-stop.sh --node-id=1234
Usage details
USAGE: ./kafka-server-stop.sh {[--process-role=value] | [--node-id=value]}
zookeeper-server-start.sh¶
Use the zookeeper-server-start
tool to start the ZooKeeper server. ZooKeeper is the default method for metadata management for Kafka versions prior to 3.4.
To run this tool, you must pass the path to the
ZooKeeper properties file. For an example of how to use this tool, see the Kafka quickstart .
Usage details
USAGE: ./zookeeper-server-start.sh [-daemon] zookeeper.properties
zookeeper-server-stop.sh¶
Use the zookeeper-server-stop
tool to stop the ZooKeeper server. Running this tool does not require arguments.
Usage details
USAGE: ./zookeeper-server-stop.sh
kafka-storage.sh¶
Use the kafka-storage
tool to generate a Cluster UUID and format storage with the generated UUID when running Kafka in KRaft mode.
You must explicitly create a cluster ID for a KRaft cluster, and format the storage specifying that ID.
For example, the following command generates a cluster ID and stores it in a variable named KAFKA_CLUSTER_ID
.
The next command formats storage with that ID.
KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"
bin/kafka-storage.sh format -t KAFKA_CLUSTER_ID -c config/kraft/server.properties
For another example of how to use this tool, see the Kafka quickstart .
Usage details
USAGE: kafka-storage.sh [-h] {info,format,random-uuid}
The Kafka storage tool.
positional arguments:
{info,format,random-uuid}
info Get information about the Kafka log directories on this node.
format Format the Kafka log directories on this node.
random-uuid Print a random UUID.
optional arguments:
-h, --help show this help message and exit
kafka-cluster.sh¶
Use the kafka-cluster
tool to get the ID of a cluster or unregister a cluster. The following example shows how to retrieve the cluster
ID, which requires a bootstrap-server
argument.
bin/kafka-cluster.sh cluster-id --bootstrap-server localhost:9092
The output for this command might look like the following.
Cluster ID: WZEKwK-b123oT3ZOSU0dgw
Usage details
USAGE: kafka-cluster.sh [-h] {cluster-id,unregister}
The Kafka cluster tool.
positional arguments:
{cluster-id,unregister}
cluster-id Get information about the ID of a cluster.
unregister Unregister a broker.
optional arguments:
-h, --help show this help message and exit
zookeeper-shell.sh¶
Use the zookeeper-shell
tool to connect to the interactive ZooKeeper shell.
Following is an example of how to connect to the ZooKeeper shell:
bin/zookeeper-shell.sh localhost:2181
Your results might look like the following:
Welcome to ZooKeeper!
JLine support is disabled
Usage details
USAGE: ./zookeeper-shell.sh zookeeper_host:port[/path] [-zk-tls-config-file file] [args...]
Once the shell has started, you can complete the following:
ZooKeeper -server host:port [-zk-tls-config-file <file>] cmd args
addWatch [-m mode] path # optional mode is one of [PERSISTENT, PERSISTENT_RECURSIVE] - default is PERSISTENT_RECURSIVE
addauth scheme auth
close
config [-c] [-w] [-s]
connect host:port
create [-s] [-e] [-c] [-t ttl] path [data] [acl]
delete [-v version] path
deleteall path [-b batch size]
delquota [-n|-b] path
get [-s] [-w] path
getAcl [-s] path
getAllChildrenNumber path
getEphemerals path
history
listquota path
ls [-s] [-w] [-R] path
printwatches on|off
quit
reconfig [-s] [-v version] [[-file path] | [-members serverID=host:port1:port2;port3[,...]*]] | [-add serverId=host:port1:port2;port3[,...]]* [-remove serverId[,...]*]
redo cmdno
removewatches path [-c|-d|-a] [-l]
set [-s] [-v version] path data
setAcl [-s] [-v version] [-R] path acl
setquota -n|-b val path
stat [-w] path
sync path
version
kafka-features.sh¶
Use the kafka-features
tool to manage feature flags to disable or enable functionality at runtime in Kafka.
Pass the describe
argument to describe the current active feature flags, upgrade
to upgrade one
or more feature flags, downgrade
to downgrade one or more, and disable
to disable
one or more feature flags, which is the same as downgrading the version to zero.
Usage details
usage: kafka-features [-h] [--command-config COMMAND_CONFIG] (--bootstrap-server BOOTSTRAP_SERVER |
--bootstrap-controller BOOTSTRAP_CONTROLLER) {describe,upgrade,downgrade,disable} ...
This tool manages feature flags in Kafka.
positional arguments:
{describe,upgrade,downgrade,disable}
describe Describes the current active feature flags.
upgrade Upgrade one or more feature flags.
downgrade Upgrade one or more feature flags.
disable Disable one or more feature flags. This is the same as downgrading the version to zero.
optional arguments:
-h, --help show this help message and exit
--bootstrap-server BOOTSTRAP_SERVER
A comma-separated list of host:port pairs to use for establishing the connection to the
Kafka cluster.
--bootstrap-controller BOOTSTRAP_CONTROLLER
A comma-separated list of host:port pairs to use for establishing the connection to the
KRaft quorum.
--command-config COMMAND_CONFIG
Property file containing configs to be passed to Admin Client.
kafka-broker-api-versions.sh¶
The kafka-broker-api-versions
tool retrieves and displays broker information.
For example, the following command outputs the version of Kafka that is running on the broker:
bin/kafka-broker-api-versions.sh --bootstrap-server host1:9092 --version
This command might have the following output:
3.3.1 (Commit:e23c59d00e687ff5)
Usage details
This tool helps to retrieve broker version information.
Option Description
------ -----------
--bootstrap-server <String: server(s) REQUIRED: The server to connect to.
to use for bootstrapping>
--command-config <String: command A property file containing configs to
config property file> be passed to Admin Client.
--help Print usage information.
--version Display Kafka version.
kafka-metadata-quorum.sh¶
Use the kafka-metadata-quorum
tool to query the metadata quorum status. This tool is useful when you are debugging a cluster in KRaft mode.
Pass the describe
command to describe the current state of the metadata quorum.
The following code example displays a summary of the metadata quorum:
bin/kafka-metadata-quorum.sh --bootstrap-server host1:9092 describe --status
The output for this command might look like the following.
ClusterId: fMCL8kv1SWm87L_Md-I2hg
LeaderId: 3002
LeaderEpoch: 2
HighWatermark: 10
MaxFollowerLag: 0
MaxFollowerLagTimeMs: -1
CurrentVoters: [3000,3001,3002]
CurrentObservers: [0,1,2]
Usage details
usage: kafka-metadata-quorum [-h] [--command-config COMMAND_CONFIG] (--bootstrap-server BOOTSTRAP_SERVER |
--bootstrap-controller BOOTSTRAP_CONTROLLER) {describe} ...
This tool describes kraft metadata quorum status.
positional arguments:
{describe}
describe Describe the metadata quorum info
optional arguments:
-h, --help show this help message and exit
--bootstrap-server BOOTSTRAP_SERVER
A comma-separated list of host:port pairs to use for establishing the connection to the
Kafka cluster.
--bootstrap-controller BOOTSTRAP_CONTROLLER
A comma-separated list of host:port pairs to use for establishing the connection to the
Kafka controllers.
--command-config COMMAND_CONFIG
Property file containing configs to be passed to Admin Client.
kafka-metadata-shell.sh¶
The kafka-metadata-shell
tool enables you to interactively examine the metadata stored in a KRaft cluster.
The following example shows how to open the shell:
kafka-metadata-shell.sh --directory tmp/kraft-combined-logs/_cluster-metadata-0/
After the shell loads, you can explore the contents of the metadata log, and exit. The following code shows an example of this.
Loading...
[ Kafka Metadata Shell ]
>> ls
brokers configs features linkIds links shell topicIds topics
>> ls /topics
test
>> cat /topics/test/0/data
{
"partitionId" : 0,
"topicId" : "5zoAlv-xEh9xRANKXt1Lbg",
"replicas" : [ 1 ],
"isr" : [ 1 ],
"removingReplicas" : null,
"addingReplicas" : null,
"leader" : 1,
"leaderEpoch" : 0,
"partitionEpoch" : 0
}
>> exit
For more information, see the Kafka Wiki.
Usage details
usage: metadata-shell-tool.sh [-h] [--cluster-id CLUSTER_ID] [--offset OFFSET] (--directory DIRECTORY |
--controllers CONTROLLERS) [command [command ...]]
The Apache Kafka metadata tool
positional arguments:
command The command to run.
optional arguments:
-h, --help show this help message and exit
--cluster-id CLUSTER_ID, -t CLUSTER_ID
The cluster id. Required when using --controllers
--directory DIRECTORY, -d DIRECTORY
The __cluster_metadata-0 directory to read.
--controllers CONTROLLERS, -q CONTROLLERS
The controller.quorum.voters.
--offset OFFSET, -o OFFSET
The offset to read up to
kafka-configs.sh¶
Use the kafka-configs
tool to change and describe topic, client, user, broker, IP configuration setting or
KRaft controller.
To describe or view a KRaft controller, use the --bootstrap-controller
option, and do not specify a bootstrap-server
.
To change a property, specify the entity-type
to the desired entity (topic, broker, user, etc), and use the
alter
option. The following example shows how you might add the delete.retention
configuration property for a topic with kafka-configs
.
/bin/kafka-configs.sh --bootstrap-server host1:9092 --entity-type topics --entity-default --alter --add-config delete.retention.ms=172800000
When you use the --add-config
flag to add multiple values, use square brackets around the comma-separated list
like the following example:
/bin/kafka-configs.sh --bootstrap-server host1:9092 --alter --add-config max.connections.per.ip.overrides=[host1:50,host2:9] --entity-type brokers --entity-default
The following example shows how you might check the cluster ID, by specifying the --bootstrap-controller
option.
/bin/kafka-cluster.sh cluster-id --bootstrap-controller localhost:9092
See Topic Operations for more examples of how to work with topics.
Usage details
This tool helps to manipulate and describe entity config for a topic, client, user, broker, ip or client-metrics
Option Description
------ -----------
--add-config <String> Key Value pairs of configs to add.
Square brackets can be used to group
values which contain commas: 'k1=v1,
k2=[v1,v2,v2],k3=v3'. The following
is a list of valid configurations:
For entity-type 'topics':
cleanup.policy
compression.type
delete.retention.ms
file.delete.delay.ms
flush.messages
flush.ms
follower.replication.throttled.
replicas
index.interval.bytes
leader.replication.throttled.replicas
local.retention.bytes
local.retention.ms
max.compaction.lag.ms
max.message.bytes
message.downconversion.enable
message.format.version
message.timestamp.after.max.ms
message.timestamp.before.max.ms
message.timestamp.difference.max.ms
message.timestamp.type
min.cleanable.dirty.ratio
min.compaction.lag.ms
min.insync.replicas
preallocate
remote.storage.enable
retention.bytes
retention.ms
segment.bytes
segment.index.bytes
segment.jitter.ms
segment.ms
unclean.leader.election.enable
For entity-type 'brokers':
advertised.listeners
background.threads
compression.type
follower.replication.throttled.rate
leader.replication.throttled.rate
listener.security.protocol.map
listeners
log.cleaner.backoff.ms
log.cleaner.dedupe.buffer.size
log.cleaner.delete.retention.ms
log.cleaner.io.buffer.load.factor
log.cleaner.io.buffer.size
log.cleaner.io.max.bytes.per.second
log.cleaner.max.compaction.lag.ms
log.cleaner.min.cleanable.ratio
log.cleaner.min.compaction.lag.ms
log.cleaner.threads
log.cleanup.policy
log.flush.interval.messages
log.flush.interval.ms
log.index.interval.bytes
log.index.size.max.bytes
log.local.retention.bytes
log.local.retention.ms
log.message.downconversion.enable
log.message.timestamp.after.max.ms
log.message.timestamp.before.max.ms
log.message.timestamp.difference.max.
ms
log.message.timestamp.type
log.preallocate
log.retention.bytes
log.retention.ms
log.roll.jitter.ms
log.roll.ms
log.segment.bytes
log.segment.delete.delay.ms
max.connection.creation.rate
max.connections
max.connections.per.ip
max.connections.per.ip.overrides
message.max.bytes
metric.reporters
min.insync.replicas
num.io.threads
num.network.threads
num.recovery.threads.per.data.dir
num.replica.fetchers
principal.builder.class
producer.id.expiration.ms
remote.log.index.file.cache.total.
size.bytes
replica.alter.log.dirs.io.max.bytes.
per.second
sasl.enabled.mechanisms
sasl.jaas.config
sasl.kerberos.kinit.cmd
sasl.kerberos.min.time.before.relogin
sasl.kerberos.principal.to.local.rules
sasl.kerberos.service.name
sasl.kerberos.ticket.renew.jitter
sasl.kerberos.ticket.renew.window.
factor
sasl.login.refresh.buffer.seconds
sasl.login.refresh.min.period.seconds
sasl.login.refresh.window.factor
sasl.login.refresh.window.jitter
sasl.mechanism.inter.broker.protocol
ssl.cipher.suites
ssl.client.auth
ssl.enabled.protocols
ssl.endpoint.identification.algorithm
ssl.engine.factory.class
ssl.key.password
ssl.keymanager.algorithm
ssl.keystore.certificate.chain
ssl.keystore.key
ssl.keystore.location
ssl.keystore.password
ssl.keystore.type
ssl.protocol
ssl.provider
ssl.secure.random.implementation
ssl.trustmanager.algorithm
ssl.truststore.certificates
ssl.truststore.location
ssl.truststore.password
ssl.truststore.type
transaction.partition.verification.
enable
unclean.leader.election.enable
For entity-type 'users':
SCRAM-SHA-256
SCRAM-SHA-512
consumer_byte_rate
controller_mutation_rate
producer_byte_rate
request_percentage
For entity-type 'clients':
consumer_byte_rate
controller_mutation_rate
producer_byte_rate
request_percentage
For entity-type 'ips':
connection_creation_rate
For entity-type 'client-metrics':
interval.ms
match
metrics
Entity types 'users' and 'clients' may
be specified together to update
config for clients of a specific
user.
--add-config-file <String> Path to a properties file with configs
to add. See add-config for a list of
valid configurations.
--all List all configs for the given topic,
broker, or broker-logger entity
(includes static configuration when
the entity type is brokers)
--alter Alter the configuration for the entity.
--bootstrap-controller <String: The Kafka controllers to connect to.
controller to connect to>
--bootstrap-server <String: server to The Kafka servers to connect to.
connect to>
--broker <String> The broker's ID.
--broker-defaults The config defaults for all brokers.
--broker-logger <String> The broker's ID for its logger config.
--client <String> The client's ID.
--client-defaults The config defaults for all clients.
--command-config <String: command Property file containing configs to be
config property file> passed to Admin Client. This is used
only with --bootstrap-server option
for describing and altering broker
configs.
--delete-config <String> config keys to remove 'k1,k2'
--describe List configs for the given entity.
--entity-default Default entity name for
clients/users/brokers/ips (applies
to corresponding entity type in
command line)
--entity-name <String> Name of entity (topic name/client
id/user principal name/broker
id/ip/client metrics)
--entity-type <String> Type of entity
(topics/clients/users/brokers/broker-
loggers/ips/client-metrics)
--force Suppress console prompts
--help Print usage information.
--ip <String> The IP address.
--ip-defaults The config defaults for all IPs.
--topic <String> The topic's name.
--user <String> The user's principal name.
--user-defaults The config defaults for all users.
--version Display Kafka version.
--zk-tls-config-file <String: Identifies the file where ZooKeeper
ZooKeeper TLS configuration> client TLS connectivity properties
are defined. Any properties other
than zookeeper.clientCnxnSocket,
zookeeper.ssl.cipher.suites,
zookeeper.ssl.client.enable,
zookeeper.ssl.crl.enable, zookeeper.
ssl.enabled.protocols, zookeeper.ssl.
endpoint.identification.algorithm,
zookeeper.ssl.keystore.location,
zookeeper.ssl.keystore.password,
zookeeper.ssl.keystore.type,
zookeeper.ssl.ocsp.enable, zookeeper.
ssl.protocol, zookeeper.ssl.
truststore.location, zookeeper.ssl.
truststore.password, zookeeper.ssl.
truststore.type are ignored.
--zookeeper <String: urls> DEPRECATED. The connection string for
the zookeeper connection in the form
host:port. Multiple URLS can be
given to allow fail-over. Required
when configuring SCRAM credentials
for users or dynamic broker configs
when the relevant broker(s) are
down. Not allowed otherwise.
zookeeper-security-migration.sh¶
Use the zookeeper-security-migration
tool to restrict or provide access to ZooKeeper metadata. The tool updates the ACLs of znodes.
Usage details
ZooKeeper Migration Tool Help. This tool updates the ACLs of znodes as part of the process of setting up ZooKeeper authentication.
Option Description
------ -----------
--enable.path.check Checks if all the root paths exist in
ZooKeeper before migration. If not,
exit the command.
--help Print usage information.
--version Display Kafka version.
--zk-tls-config-file <String: Identifies the file where ZooKeeper
ZooKeeper TLS configuration> client TLS connectivity properties
are defined. Any properties other
than zookeeper.ssl.keystore.
password, zookeeper.ssl.truststore.
location, zookeeper.ssl.truststore.
type, zookeeper.ssl.ocsp.enable,
zookeeper.ssl.crl.enable, zookeeper.
ssl.truststore.password, zookeeper.
ssl.protocol, zookeeper.ssl.keystore.
type, zookeeper.ssl.client.enable,
zookeeper.ssl.cipher.suites,
zookeeper.clientCnxnSocket,
zookeeper.ssl.endpoint.
identification.algorithm, zookeeper.
ssl.enabled.protocols, zookeeper.ssl.
keystore.location are ignored.
--zookeeper.acl <String> Indicates whether to make the Kafka
znodes in ZooKeeper secure or
unsecure. The options are 'secure'
and 'unsecure'
--zookeeper.connect <String> Sets the ZooKeeper connect string
(ensemble). This parameter takes a
comma-separated list of host:port
pairs. (default: localhost:2181)
--zookeeper.connection.timeout Sets the ZooKeeper connection timeout.
<Integer> (default: 30000)
--zookeeper.session.timeout <Integer> Sets the ZooKeeper session timeout.
(default: 30000)
Manage topics, partitions, and replication¶
kafka-topics.sh¶
Use the kafka-topics
tool to create or delete a topic. You can also use the tool to retrieve a list of topics associated with a Kafka cluster.
For more information, see Topic Operations.
To change a topic, see kafka-configs.sh, or how to modify a topic.
Example:
bin/kafka-topics.sh --bootstrap-server host1:9092 --topic test-topic --partitions 3
Usage details
This tool helps to create, delete, describe, or change a topic.
Option Description
------ -----------
--alter Alter the number of partitions and
replica assignment. Update the
configuration of an existing topic
via --alter is no longer supported
here (the kafka-configs CLI supports
altering topic configs with a --
bootstrap-server option).
--at-min-isr-partitions if set when describing topics, only
show partitions whose isr count is
equal to the configured minimum.
--bootstrap-server <String: server to REQUIRED: The Kafka server to connect
connect to> to.
--command-config <String: command Property file containing configs to be
config property file> passed to Admin Client. This is used
only with --bootstrap-server option
for describing and altering broker
configs.
--config <String: name=value> A topic configuration override for the
topic being created or altered. The
following is a list of valid
configurations:
cleanup.policy
compression.type
delete.retention.ms
file.delete.delay.ms
flush.messages
flush.ms
follower.replication.throttled.
replicas
index.interval.bytes
leader.replication.throttled.replicas
local.retention.bytes
local.retention.ms
max.compaction.lag.ms
max.message.bytes
message.downconversion.enable
message.format.version
message.timestamp.difference.max.ms
message.timestamp.type
min.cleanable.dirty.ratio
min.compaction.lag.ms
min.insync.replicas
preallocate
remote.storage.enable
retention.bytes
retention.ms
segment.bytes
segment.index.bytes
segment.jitter.ms
segment.ms
unclean.leader.election.enable
See the Kafka documentation for full
details on the topic configs. It is
supported only in combination with --
create if --bootstrap-server option
is used (the kafka-configs CLI
supports altering topic configs with
a --bootstrap-server option).
--create Create a new topic.
--delete Delete a topic
--delete-config <String: name> A topic configuration override to be
removed for an existing topic (see
the list of configurations under the
--config option). Not supported with
the --bootstrap-server option.
--describe List details for the given topics.
--exclude-internal exclude internal topics when running
list or describe command. The
internal topics will be listed by
default
--help Print usage information.
--if-exists if set when altering or deleting or
describing topics, the action will
only execute if the topic exists.
--if-not-exists if set when creating topics, the
action will only execute if the
topic does not already exist.
--list List all available topics.
--partitions <Integer: # of partitions> The number of partitions for the topic
being created or altered (WARNING:
If partitions are increased for a
topic that has a key, the partition
logic or ordering of the messages
will be affected). If not supplied
for create, defaults to the cluster
default.
--replica-assignment <String: A list of manual partition-to-broker
broker_id_for_part1_replica1 : assignments for the topic being
broker_id_for_part1_replica2 , created or altered.
broker_id_for_part2_replica1 :
broker_id_for_part2_replica2 , ...>
--replication-factor <Integer: The replication factor for each
replication factor> partition in the topic being
created. If not supplied, defaults
to the cluster default.
--topic <String: topic> The topic to create, alter, describe
or delete. It also accepts a regular
expression, except for --create
option. Put topic name in double
quotes and use the '\' prefix to
escape regular expression symbols; e.
g. "test\.topic".
--topic-id <String: topic-id> The topic-id to describe.This is used
only with --bootstrap-server option
for describing topics.
--topics-with-overrides if set when describing topics, only
show topics that have overridden
configs
--unavailable-partitions if set when describing topics, only
show partitions whose leader is not
available
--under-min-isr-partitions if set when describing topics, only
show partitions whose isr count is
less than the configured minimum.
--under-replicated-partitions if set when describing topics, only
show under replicated partitions
--version Display Kafka version.
kafka-configs.sh¶
Use the kafka-configs
tool to change and describe topic, client, user, broker, IP configuration setting or
KRaft controller.
To describe or view a KRaft controller, use the --bootstrap-controller
option, and do not specify a bootstrap-server
.
To change a property, specify the entity-type
to the desired entity (topic, broker, user, etc), and use the
alter
option. The following example shows how you might add the delete.retention
configuration property for a topic with kafka-configs
.
/bin/kafka-configs.sh --bootstrap-server host1:9092 --entity-type topics --entity-default --alter --add-config delete.retention.ms=172800000
When you use the --add-config
flag to add multiple values, use square brackets around the comma-separated list
like the following example:
/bin/kafka-configs.sh --bootstrap-server host1:9092 --alter --add-config max.connections.per.ip.overrides=[host1:50,host2:9] --entity-type brokers --entity-default
The following example shows how you might check the cluster ID, by specifying the --bootstrap-controller
option.
/bin/kafka-cluster.sh cluster-id --bootstrap-controller localhost:9092
See Topic Operations for more examples of how to work with topics.
Usage details
This tool helps to manipulate and describe entity config for a topic, client, user, broker, ip or client-metrics
Option Description
------ -----------
--add-config <String> Key Value pairs of configs to add.
Square brackets can be used to group
values which contain commas: 'k1=v1,
k2=[v1,v2,v2],k3=v3'. The following
is a list of valid configurations:
For entity-type 'topics':
cleanup.policy
compression.type
delete.retention.ms
file.delete.delay.ms
flush.messages
flush.ms
follower.replication.throttled.
replicas
index.interval.bytes
leader.replication.throttled.replicas
local.retention.bytes
local.retention.ms
max.compaction.lag.ms
max.message.bytes
message.downconversion.enable
message.format.version
message.timestamp.after.max.ms
message.timestamp.before.max.ms
message.timestamp.difference.max.ms
message.timestamp.type
min.cleanable.dirty.ratio
min.compaction.lag.ms
min.insync.replicas
preallocate
remote.storage.enable
retention.bytes
retention.ms
segment.bytes
segment.index.bytes
segment.jitter.ms
segment.ms
unclean.leader.election.enable
For entity-type 'brokers':
advertised.listeners
background.threads
compression.type
follower.replication.throttled.rate
leader.replication.throttled.rate
listener.security.protocol.map
listeners
log.cleaner.backoff.ms
log.cleaner.dedupe.buffer.size
log.cleaner.delete.retention.ms
log.cleaner.io.buffer.load.factor
log.cleaner.io.buffer.size
log.cleaner.io.max.bytes.per.second
log.cleaner.max.compaction.lag.ms
log.cleaner.min.cleanable.ratio
log.cleaner.min.compaction.lag.ms
log.cleaner.threads
log.cleanup.policy
log.flush.interval.messages
log.flush.interval.ms
log.index.interval.bytes
log.index.size.max.bytes
log.local.retention.bytes
log.local.retention.ms
log.message.downconversion.enable
log.message.timestamp.after.max.ms
log.message.timestamp.before.max.ms
log.message.timestamp.difference.max.
ms
log.message.timestamp.type
log.preallocate
log.retention.bytes
log.retention.ms
log.roll.jitter.ms
log.roll.ms
log.segment.bytes
log.segment.delete.delay.ms
max.connection.creation.rate
max.connections
max.connections.per.ip
max.connections.per.ip.overrides
message.max.bytes
metric.reporters
min.insync.replicas
num.io.threads
num.network.threads
num.recovery.threads.per.data.dir
num.replica.fetchers
principal.builder.class
producer.id.expiration.ms
remote.log.index.file.cache.total.
size.bytes
replica.alter.log.dirs.io.max.bytes.
per.second
sasl.enabled.mechanisms
sasl.jaas.config
sasl.kerberos.kinit.cmd
sasl.kerberos.min.time.before.relogin
sasl.kerberos.principal.to.local.rules
sasl.kerberos.service.name
sasl.kerberos.ticket.renew.jitter
sasl.kerberos.ticket.renew.window.
factor
sasl.login.refresh.buffer.seconds
sasl.login.refresh.min.period.seconds
sasl.login.refresh.window.factor
sasl.login.refresh.window.jitter
sasl.mechanism.inter.broker.protocol
ssl.cipher.suites
ssl.client.auth
ssl.enabled.protocols
ssl.endpoint.identification.algorithm
ssl.engine.factory.class
ssl.key.password
ssl.keymanager.algorithm
ssl.keystore.certificate.chain
ssl.keystore.key
ssl.keystore.location
ssl.keystore.password
ssl.keystore.type
ssl.protocol
ssl.provider
ssl.secure.random.implementation
ssl.trustmanager.algorithm
ssl.truststore.certificates
ssl.truststore.location
ssl.truststore.password
ssl.truststore.type
transaction.partition.verification.
enable
unclean.leader.election.enable
For entity-type 'users':
SCRAM-SHA-256
SCRAM-SHA-512
consumer_byte_rate
controller_mutation_rate
producer_byte_rate
request_percentage
For entity-type 'clients':
consumer_byte_rate
controller_mutation_rate
producer_byte_rate
request_percentage
For entity-type 'ips':
connection_creation_rate
For entity-type 'client-metrics':
interval.ms
match
metrics
Entity types 'users' and 'clients' may
be specified together to update
config for clients of a specific
user.
--add-config-file <String> Path to a properties file with configs
to add. See add-config for a list of
valid configurations.
--all List all configs for the given topic,
broker, or broker-logger entity
(includes static configuration when
the entity type is brokers)
--alter Alter the configuration for the entity.
--bootstrap-controller <String: The Kafka controllers to connect to.
controller to connect to>
--bootstrap-server <String: server to The Kafka servers to connect to.
connect to>
--broker <String> The broker's ID.
--broker-defaults The config defaults for all brokers.
--broker-logger <String> The broker's ID for its logger config.
--client <String> The client's ID.
--client-defaults The config defaults for all clients.
--command-config <String: command Property file containing configs to be
config property file> passed to Admin Client. This is used
only with --bootstrap-server option
for describing and altering broker
configs.
--delete-config <String> config keys to remove 'k1,k2'
--describe List configs for the given entity.
--entity-default Default entity name for
clients/users/brokers/ips (applies
to corresponding entity type in
command line)
--entity-name <String> Name of entity (topic name/client
id/user principal name/broker
id/ip/client metrics)
--entity-type <String> Type of entity
(topics/clients/users/brokers/broker-
loggers/ips/client-metrics)
--force Suppress console prompts
--help Print usage information.
--ip <String> The IP address.
--ip-defaults The config defaults for all IPs.
--topic <String> The topic's name.
--user <String> The user's principal name.
--user-defaults The config defaults for all users.
--version Display Kafka version.
--zk-tls-config-file <String: Identifies the file where ZooKeeper
ZooKeeper TLS configuration> client TLS connectivity properties
are defined. Any properties other
than zookeeper.clientCnxnSocket,
zookeeper.ssl.cipher.suites,
zookeeper.ssl.client.enable,
zookeeper.ssl.crl.enable, zookeeper.
ssl.enabled.protocols, zookeeper.ssl.
endpoint.identification.algorithm,
zookeeper.ssl.keystore.location,
zookeeper.ssl.keystore.password,
zookeeper.ssl.keystore.type,
zookeeper.ssl.ocsp.enable, zookeeper.
ssl.protocol, zookeeper.ssl.
truststore.location, zookeeper.ssl.
truststore.password, zookeeper.ssl.
truststore.type are ignored.
--zookeeper <String: urls> DEPRECATED. The connection string for
the zookeeper connection in the form
host:port. Multiple URLS can be
given to allow fail-over. Required
when configuring SCRAM credentials
for users or dynamic broker configs
when the relevant broker(s) are
down. Not allowed otherwise.
kafka-get-offsets.sh¶
Use the kafka-get-offsets
tool to retrieve topic-partition offsets.
Usage details
An interactive shell for getting topic-partition offsets.
Option Description
------ -----------
--bootstrap-server <String: HOST1: REQUIRED. The server(s) to connect to
PORT1,...,HOST3:PORT3> in the form HOST1:PORT1,HOST2:PORT2.
--broker-list <String: HOST1:PORT1,..., DEPRECATED, use --bootstrap-server
HOST3:PORT3> instead; ignored if --bootstrap-
server is specified. The server(s)
to connect to in the form HOST1:
PORT1,HOST2:PORT2.
--command-config <String: config file> Property file containing configs to be
passed to Admin Client.
--exclude-internal-topics By default, internal topics are
included. If specified, internal
topics are excluded.
--partitions <String: partition ids> Comma separated list of partition ids
to get the offsets for. If not
present, all partitions of the
authorized topics are queried.
Cannot be used if --topic-partitions
is present.
--time <String: <timestamp> / -1 or timestamp of the offsets before that.
latest / -2 or earliest / -3 or max- [Note: No offset is returned, if the
timestamp> timestamp greater than recently
committed record timestamp is
given.] (default: latest)
--topic <String: topic> The topic to get the offsets for. It
also accepts a regular expression.
If not present, all authorized
topics are queried. Cannot be used
if --topic-partitions is present.
--topic-partitions <String: topic1:1, Comma separated list of topic-
topic2:0-3,topic3,topic4:5-,topic5:-3 partition patterns to get the
> offsets for, with the format of:
([^:,]*)(?::(?:([0-9]*)|(?:([0-9]*)
-([0-9]*))))?. The first group is
an optional regex for the topic
name, if omitted, it matches any
topic name. The section after ':'
describes a partition pattern,
which can be: a number, a range in
the format of NUMBER-NUMBER (lower
inclusive, upper exclusive), an
inclusive lower bound in the format
of NUMBER-, an exclusive upper
bound in the format of -NUMBER or
may be omitted to accept all
partitions.
kafka-leader-election.sh¶
Use the kafka-leader-election
tool to attempt to elect a new leader for a set of topic partitions.
Run this tool manually to restore leadership if the auto.leader.rebalance.enable
property is set to false
.
Usage details
This tool attempts to elect a new leader for a set of topic partitions. The type of elections supported are preferred replicas and unclean replicas.
Option Description
------ -----------
--admin.config <String: config file> Configuration properties files to pass
to the admin client
--all-topic-partitions Perform election on all of the
eligible topic partitions based on
the type of election (see the --
election-type flag). Not allowed if
--topic or --path-to-json-file is
specified.
--bootstrap-server <String: host:port> A hostname and port for the broker to
connect to, in the form host:port.
Multiple comma separated URLs can be
given. REQUIRED.
--election-type <[PREFERRED,UNCLEAN]: Type of election to attempt. Possible
election type> values are "preferred" for preferred
leader election or "unclean" for
unclean leader election. If
preferred election is selection, the
election is only performed if the
current leader is not the preferred
leader for the topic partition. If
unclean election is selected, the
election is only performed if there
are no leader for the topic
partition. REQUIRED.
--help Print usage information.
--partition <Integer: partition id> Partition id for which to perform an
election. REQUIRED if --topic is
specified.
--path-to-json-file <String: Path to The JSON file with the list of
JSON file> partition for which leader elections
should be performed. This is an
example format.
{"partitions":
[{"topic": "foo", "partition": 1},
{"topic": "foobar", "partition": 2}]
}
Not allowed if --all-topic-partitions
or --topic flags are specified.
--topic <String: topic name> Name of topic for which to perform an
election. Not allowed if --path-to-
json-file or --all-topic-partitions
is specified.
--version Display Kafka version.
kafka-transactions.sh¶
Use the kafka-transactions
tool to list and describe transactions. Use to detect and abort hanging transactions.
For more information, see Detect and Abort Hanging Transactions
Usage details
usage: kafka-transactions.sh [-h] [-v] [--command-config FILE] --bootstrap-server host:port COMMAND ...
This tool is used to analyze the transactional state of producers in the cluster. It can be used to detect and
recover from hanging transactions.
optional arguments:
-h, --help show this help message and exit
-v, --version show the version of this Kafka distribution and exit
--command-config FILE property file containing configs to be passed to admin client
--bootstrap-server host:port
hostname and port for the broker to connect to, in the form `host:port`
(multiple comma-separated entries can be given)
commands:
list list transactions
describe describe the state of an active transactional-id
describe-producers describe the states of active producers for a topic partition
abort abort a hanging transaction (requires administrative privileges)
find-hanging find hanging transactions
kafka-reassign-partitions.sh¶
Use the kafka-reassign-partitions
to move topic partitions between replicas
You pass a JSON-formatted file to specify the new replicas. To learn more,
see Changing the replication factor
in the Confluent documentation.
Usage details
This tool helps to move topic partitions between replicas.
Option Description
------ -----------
--additional Execute this reassignment in addition
to any other ongoing ones. This
option can also be used to change
the throttle of an ongoing
reassignment.
--bootstrap-server <String: Server(s) REQUIRED: the server(s) to use for
to use for bootstrapping> bootstrapping.
--broker-list <String: brokerlist> The list of brokers to which the
partitions need to be reassigned in
the form "0,1,2". This is required
if --topics-to-move-json-file is
used to generate reassignment
configuration
--cancel Cancel an active reassignment.
--command-config <String: Admin client Property file containing configs to be
property file> passed to Admin Client.
--disable-rack-aware Disable rack aware replica assignment
--execute Kick off the reassignment as specified
by the --reassignment-json-file
option.
--generate Generate a candidate partition
reassignment configuration. Note
that this only generates a candidate
assignment, it does not execute it.
--help Print usage information.
--list List all active partition
reassignments.
--preserve-throttles Do not modify broker or topic
throttles.
--reassignment-json-file <String: The JSON file with the partition
manual assignment json file path> reassignment configurationThe format
to use is -
{"partitions":
[{"topic": "foo",
"partition": 1,
"replicas": [1,2,3,4],
"observers":[3,4],
"log_dirs": ["dir1","dir2","dir3","
dir4"] }],
"version":1
}
Note that "log_dirs" is optional. When
it is specified, its length must
equal the length of the replicas
list. The value in this list can be
either "any" or the absolution path
of the log directory on the broker.
If absolute log directory path is
specified, the replica will be moved
to the specified log directory on
the broker.
Note that "observers" is optional.
When it is specified it must be a
suffix of the replicas list.
--replica-alter-log-dirs-throttle The movement of replicas between log
<Long: replicaAlterLogDirsThrottle> directories on the same broker will
be throttled to this value
(bytes/sec). This option can be
included with --execute when a
reassignment is started, and it can
be altered by resubmitting the
current reassignment along with the
--additional flag. The throttle rate
should be at least 1 KB/s. (default:
-1)
--throttle <Long: throttle> The movement of partitions between
brokers will be throttled to this
value (bytes/sec). This option can
be included with --execute when a
reassignment is started, and it can
be altered by resubmitting the
current reassignment along with the
--additional flag. The throttle rate
should be at least 1 KB/s. (default:
-1)
--timeout <Long: timeout> The maximum time in ms to wait for log
directory replica assignment to
begin. (default: 10000)
--topics-to-move-json-file <String: Generate a reassignment configuration
topics to reassign json file path> to move the partitions of the
specified topics to the list of
brokers specified by the --broker-
list option. The format to use is -
{"topics":
[{"topic": "foo"},{"topic": "foo1"}],
"version":1
}
--verify Verify if the reassignment completed
as specified by the --reassignment-
json-file option. If there is a
throttle engaged for the replicas
specified, and the rebalance has
completed, the throttle will be
removed
--version Display Kafka version.
kafka-delete-records.sh¶
Use the kafka-delete-records
tool to delete partition records. Use this if a topic receives bad data.
Pass a JSON-formatted file that specifies the topic, partition, and offset for data deletion. Data will be deleted up to the offset specified.
Example:
bin/kafka-delete-records.sh --bootstrap-server host1:9092 --offset-json-file deleteme.json
Usage details
This tool helps to delete records of the given partitions down to the specified offset.
Option Description
------ -----------
--bootstrap-server <String: server(s) REQUIRED: The server to connect to.
to use for bootstrapping>
--command-config <String: command A property file containing configs to
config property file path> be passed to Admin Client.
--help Print usage information.
--offset-json-file <String: Offset REQUIRED: The JSON file with offset
json file path> per partition. The format to use is:
{"partitions":
[{"topic": "foo", "partition": 1,
"offset": 1}],
"version":1
}
--version Display Kafka version.
kafka-log-dirs.sh¶
Use the kafka-log-dirs
tool to get a list of replicas per log directory on a broker.
Usage details
This tool helps to query log directory usage on the specified brokers.
------ -----------
--bootstrap-server <String: The server REQUIRED: the server(s) to use for
(s) to use for bootstrapping> bootstrapping
--broker-list <String: Broker list> The list of brokers to be queried in
the form "0,1,2". All brokers in the
cluster will be queried if no broker
list is specified
--command-config <String: Admin client Property file containing configs to be
property file> passed to Admin Client.
--describe Describe the specified log directories
on the specified brokers.
--help Print usage information.
--topic-list <String: Topic list> The list of topics to be queried in
the form "topic1,topic2,topic3". All
topics will be queried if no topic
list is specified (default: )
--version Display Kafka version.
kafka-replica-verification.sh¶
Use the kafka-replica-verification
tool to verify that all replicas of a topic contain the same data.
Requires a broker-list
parameter that contains a comma-separated list of <hostname:port>
entries specifying the server/port to connect to.
Usage details
Validate that all replicas for a set of topics have the same data.
Option Description
------ -----------
--broker-list <String: hostname: REQUIRED: The list of hostname and
port,...,hostname:port> port of the server to connect to.
--fetch-size <Integer: bytes> The fetch size of each request.
(default: 1048576)
--help Print usage information.
--max-wait-ms <Integer: ms> The max amount of time each fetch
request waits. (default: 1000)
--report-interval-ms <Long: ms> The reporting interval. (default:
30000)
--time <Long: timestamp/-1(latest)/-2 Timestamp for getting the initial
(earliest)> offsets. (default: -1)
--topic-white-list <String: Java regex DEPRECATED use --topics-include
(String)> instead; ignored if --topics-include
specified. List of topics to verify
replica consistency. Defaults to '.
*' (all topics) (default: .*)
--topics-include <String: Java regex List of topics to verify replica
(String)> consistency. Defaults to '.*' (all
topics) (default: .*)
--version Print version information and exit.
kafka-mirror-maker.sh¶
DEPRECATED: For an alternative, see connect-mirror-maker.sh. Enables the creation of a replica of an existing Kafka cluster.
Example: bin/connect-mirror-maker.sh connect-mirror-maker.properties --clusters secondary
.
Learn more Kafka mirroring
Usage details
This tool helps to continuously copy data between two Kafka clusters.
Option Description
------ -----------
--abort.on.send.failure <String: Stop Configure the mirror maker to exit on
the entire mirror maker when a send a failed send. (default: true)
failure occurs>
--consumer.config <String: config file> Embedded consumer config for consuming
from the source cluster.
--consumer.rebalance.listener <String: The consumer rebalance listener to use
A custom rebalance listener of type for mirror maker consumer.
ConsumerRebalanceListener>
--help Print usage information.
--include <String: Java regex (String)> List of included topics to mirror.
--message.handler <String: A custom Message handler which will process
message handler of type every record in-between consumer and
MirrorMakerMessageHandler> producer.
--message.handler.args <String: Arguments used by custom message
Arguments passed to message handler handler for mirror maker.
constructor.>
--new.consumer DEPRECATED Use new consumer in mirror
maker (this is the default so this
option will be removed in a future
version).
--num.streams <Integer: Number of Number of consumption streams.
threads> (default: 1)
--offset.commit.interval.ms <Integer: Offset commit interval in ms.
offset commit interval in (default: 60000)
millisecond>
--producer.config <String: config file> Embedded producer config.
--rebalance.listener.args <String: Arguments used by custom rebalance
Arguments passed to custom rebalance listener for mirror maker consumer.
listener constructor as a string.>
--version Display Kafka version.
--whitelist <String: Java regex DEPRECATED, use --include instead;
(String)> ignored if --include specified. List
of included topics to mirror.
connect-mirror-maker.sh¶
Use the connect-mirror-maker
tool to replicate topics from one cluster to another using the Connect framework.
You must pass an an mm2.properties
MM2 configuration file. For more information, see
KIP-382: MirrorMaker 2.0
or Getting up to speed with MirrorMaker 2.
Usage details
usage: connect-mirror-maker [-h] [--clusters CLUSTER [CLUSTER ...]] mm2.properties
MirrorMaker 2.0 driver
positional arguments:
mm2.properties MM2 configuration file.
optional arguments:
-h, --help show this help message and exit
--clusters CLUSTER [CLUSTER ...]
Target cluster to use for this node.
Client, producer, and consumer tools¶
kafka-client-metrics.sh¶
Use the kafka-client-metrics
tool to manipulate and describe client metrics configurations for clusters
where client metrics are enabled. This tool
provides a simpler alternative to using kafka-configs.sh
to configure client metrics.
For example, to list all of the client metric configuration resource, use the following command:
kafka-client-metrics.sh --bootstrap-server HOST1:PORT1 --describe
To describe a specific configuration:
kafka-client-metrics.sh --bootstrap-server HOST1:PORT1 --describe --name MYMETRICS
You can use this tool to create a client metric configuration resource and generate a unique name.
In this example, --generate-name
is used to create a type-4 UUID to use as
the client metrics configuration resource name:
kafka-client-metrics.sh --bootstrap-server HOST1:PORT1 --alter --generate-name \
--metrics org.apache.kafka.producer.node.request.latency.,org.apache.kafka.consumer.node.request.latency. \
--interval 60000
Usage details
Option Description
------ -----------
--alter Alter the configuration for the client
metrics resource.
--bootstrap-server <String: server to REQUIRED: The Kafka server to connect
connect to> to.
--command-config <String: command Property file containing configs to be
config property file> passed to Admin Client.
--delete Delete the configuration for the
client metrics resource.
--describe List configurations for the client
metrics resource.
--generate-name Generate a UUID to use as the name.
--help Print usage information.
--interval <Integer: push interval> The metrics push interval in
milliseconds.
--list List the client metrics resources.
--match <String: k1=v1,k2=v2> Matching selector 'k1=v1,k2=v2'. The
following is a list of valid
selector names:
client_id
client_instance_id
client_software_name
client_software_version
client_source_address
client_source_port
--metrics <String: m1,m2> Telemetry metric name prefixes 'm1,m2'.
--name <String: name> Name of client metrics configuration
resource.
--version Display Kafka version.
kafka-verifiable-consumer.sh¶
The kafka-verifiable-consumer
tool consumes messages from a topic and emits consumer events as JSON objects to STDOUT. For example, group rebalances, received messages, and offsets committed.
Intended for internal testing.
Usage details
usage: verifiable-consumer [-h] --topic TOPIC --group-id GROUP_ID
[--group-instance-id GROUP_INSTANCE_ID]
[--max-messages MAX-MESSAGES] [--session-timeout TIMEOUT_MS]
[--verbose] [--enable-autocommit] [--send-offset-for-times-data]
[--reset-policy RESETPOLICY]
[--assignment-strategy ASSIGNMENTSTRATEGY]
[--consumer.config CONFIG_FILE]
(--bootstrap-server HOST1:PORT1[,HOST2:PORT2[...]] |
--broker-list HOST1:PORT1[,HOST2:PORT2[...]])
This tool consumes messages from a specific topic and emits consumer events (e.g. group
rebalances, received messages, and offsets committed) as JSON objects to STDOUT.
optional arguments:
-h, --help show this help message and exit
--topic TOPIC Consumes messages from this topic.
--group-id GROUP_ID The groupId shared among members of the consumer group
--group-instance-id GROUP_INSTANCE_ID
A unique identifier of the consumer instance
--max-messages MAX-MESSAGES
Consume this many messages. If -1 (the default), the consumer will
consume until the process is killed externally (default: -1)
--session-timeout TIMEOUT_MS
Set the consumer's session timeout (default: 30000)
--verbose Enable to log individual consumed records (default: false)
--enable-autocommit Enable offset auto-commit on consumer (default: false)
--send-offset-for-times-data
Consumer sends offsetForTimes() information for all the partitions it
has subscribed to. Use when version = DEV_BRANCH (default: false)
--reset-policy RESETPOLICY
Set reset policy (must be either 'earliest', 'latest', or 'none'
(default: earliest)
--assignment-strategy ASSIGNMENTSTRATEGY
Set assignment strategy (e.g. org.apache.kafka.clients.consumer.
RoundRobinAssignor) (default: org.apache.kafka.clients.consumer.
RangeAssignor)
--consumer.config CONFIG_FILE
Consumer config properties file (config options shared with command
line parameters will be overridden).
Connection Group:
Group of arguments for connection to brokers
--bootstrap-server HOST1:PORT1[,HOST2:PORT2[...]]
REQUIRED unless --broker-list(deprecated) is specified. The server(s)
to connect to. Comma-separated list of Kafka brokers in the form
HOST1:PORT1,HOST2:PORT2,...
--broker-list HOST1:PORT1[,HOST2:PORT2[...]]
DEPRECATED, use --bootstrap-server instead; ignored if --bootstrap-
server is specified. Comma-separated list of Kafka brokers in the
form HOST1:PORT1,HOST2:PORT2,...
kafka-configs.sh¶
Use the kafka-configs
tool to change and describe topic, client, user, broker, IP configuration setting or
KRaft controller.
To describe or view a KRaft controller, use the --bootstrap-controller
option, and do not specify a bootstrap-server
.
To change a property, specify the entity-type
to the desired entity (topic, broker, user, etc), and use the
alter
option. The following example shows how you might add the delete.retention
configuration property for a topic with kafka-configs
.
/bin/kafka-configs.sh --bootstrap-server host1:9092 --entity-type topics --entity-default --alter --add-config delete.retention.ms=172800000
When you use the --add-config
flag to add multiple values, use square brackets around the comma-separated list
like the following example:
/bin/kafka-configs.sh --bootstrap-server host1:9092 --alter --add-config max.connections.per.ip.overrides=[host1:50,host2:9] --entity-type brokers --entity-default
The following example shows how you might check the cluster ID, by specifying the --bootstrap-controller
option.
/bin/kafka-cluster.sh cluster-id --bootstrap-controller localhost:9092
See Topic Operations for more examples of how to work with topics.
Usage details
This tool helps to manipulate and describe entity config for a topic, client, user, broker, ip or client-metrics
Option Description
------ -----------
--add-config <String> Key Value pairs of configs to add.
Square brackets can be used to group
values which contain commas: 'k1=v1,
k2=[v1,v2,v2],k3=v3'. The following
is a list of valid configurations:
For entity-type 'topics':
cleanup.policy
compression.type
delete.retention.ms
file.delete.delay.ms
flush.messages
flush.ms
follower.replication.throttled.
replicas
index.interval.bytes
leader.replication.throttled.replicas
local.retention.bytes
local.retention.ms
max.compaction.lag.ms
max.message.bytes
message.downconversion.enable
message.format.version
message.timestamp.after.max.ms
message.timestamp.before.max.ms
message.timestamp.difference.max.ms
message.timestamp.type
min.cleanable.dirty.ratio
min.compaction.lag.ms
min.insync.replicas
preallocate
remote.storage.enable
retention.bytes
retention.ms
segment.bytes
segment.index.bytes
segment.jitter.ms
segment.ms
unclean.leader.election.enable
For entity-type 'brokers':
advertised.listeners
background.threads
compression.type
follower.replication.throttled.rate
leader.replication.throttled.rate
listener.security.protocol.map
listeners
log.cleaner.backoff.ms
log.cleaner.dedupe.buffer.size
log.cleaner.delete.retention.ms
log.cleaner.io.buffer.load.factor
log.cleaner.io.buffer.size
log.cleaner.io.max.bytes.per.second
log.cleaner.max.compaction.lag.ms
log.cleaner.min.cleanable.ratio
log.cleaner.min.compaction.lag.ms
log.cleaner.threads
log.cleanup.policy
log.flush.interval.messages
log.flush.interval.ms
log.index.interval.bytes
log.index.size.max.bytes
log.local.retention.bytes
log.local.retention.ms
log.message.downconversion.enable
log.message.timestamp.after.max.ms
log.message.timestamp.before.max.ms
log.message.timestamp.difference.max.
ms
log.message.timestamp.type
log.preallocate
log.retention.bytes
log.retention.ms
log.roll.jitter.ms
log.roll.ms
log.segment.bytes
log.segment.delete.delay.ms
max.connection.creation.rate
max.connections
max.connections.per.ip
max.connections.per.ip.overrides
message.max.bytes
metric.reporters
min.insync.replicas
num.io.threads
num.network.threads
num.recovery.threads.per.data.dir
num.replica.fetchers
principal.builder.class
producer.id.expiration.ms
remote.log.index.file.cache.total.
size.bytes
replica.alter.log.dirs.io.max.bytes.
per.second
sasl.enabled.mechanisms
sasl.jaas.config
sasl.kerberos.kinit.cmd
sasl.kerberos.min.time.before.relogin
sasl.kerberos.principal.to.local.rules
sasl.kerberos.service.name
sasl.kerberos.ticket.renew.jitter
sasl.kerberos.ticket.renew.window.
factor
sasl.login.refresh.buffer.seconds
sasl.login.refresh.min.period.seconds
sasl.login.refresh.window.factor
sasl.login.refresh.window.jitter
sasl.mechanism.inter.broker.protocol
ssl.cipher.suites
ssl.client.auth
ssl.enabled.protocols
ssl.endpoint.identification.algorithm
ssl.engine.factory.class
ssl.key.password
ssl.keymanager.algorithm
ssl.keystore.certificate.chain
ssl.keystore.key
ssl.keystore.location
ssl.keystore.password
ssl.keystore.type
ssl.protocol
ssl.provider
ssl.secure.random.implementation
ssl.trustmanager.algorithm
ssl.truststore.certificates
ssl.truststore.location
ssl.truststore.password
ssl.truststore.type
transaction.partition.verification.
enable
unclean.leader.election.enable
For entity-type 'users':
SCRAM-SHA-256
SCRAM-SHA-512
consumer_byte_rate
controller_mutation_rate
producer_byte_rate
request_percentage
For entity-type 'clients':
consumer_byte_rate
controller_mutation_rate
producer_byte_rate
request_percentage
For entity-type 'ips':
connection_creation_rate
For entity-type 'client-metrics':
interval.ms
match
metrics
Entity types 'users' and 'clients' may
be specified together to update
config for clients of a specific
user.
--add-config-file <String> Path to a properties file with configs
to add. See add-config for a list of
valid configurations.
--all List all configs for the given topic,
broker, or broker-logger entity
(includes static configuration when
the entity type is brokers)
--alter Alter the configuration for the entity.
--bootstrap-controller <String: The Kafka controllers to connect to.
controller to connect to>
--bootstrap-server <String: server to The Kafka servers to connect to.
connect to>
--broker <String> The broker's ID.
--broker-defaults The config defaults for all brokers.
--broker-logger <String> The broker's ID for its logger config.
--client <String> The client's ID.
--client-defaults The config defaults for all clients.
--command-config <String: command Property file containing configs to be
config property file> passed to Admin Client. This is used
only with --bootstrap-server option
for describing and altering broker
configs.
--delete-config <String> config keys to remove 'k1,k2'
--describe List configs for the given entity.
--entity-default Default entity name for
clients/users/brokers/ips (applies
to corresponding entity type in
command line)
--entity-name <String> Name of entity (topic name/client
id/user principal name/broker
id/ip/client metrics)
--entity-type <String> Type of entity
(topics/clients/users/brokers/broker-
loggers/ips/client-metrics)
--force Suppress console prompts
--help Print usage information.
--ip <String> The IP address.
--ip-defaults The config defaults for all IPs.
--topic <String> The topic's name.
--user <String> The user's principal name.
--user-defaults The config defaults for all users.
--version Display Kafka version.
--zk-tls-config-file <String: Identifies the file where ZooKeeper
ZooKeeper TLS configuration> client TLS connectivity properties
are defined. Any properties other
than zookeeper.clientCnxnSocket,
zookeeper.ssl.cipher.suites,
zookeeper.ssl.client.enable,
zookeeper.ssl.crl.enable, zookeeper.
ssl.enabled.protocols, zookeeper.ssl.
endpoint.identification.algorithm,
zookeeper.ssl.keystore.location,
zookeeper.ssl.keystore.password,
zookeeper.ssl.keystore.type,
zookeeper.ssl.ocsp.enable, zookeeper.
ssl.protocol, zookeeper.ssl.
truststore.location, zookeeper.ssl.
truststore.password, zookeeper.ssl.
truststore.type are ignored.
--zookeeper <String: urls> DEPRECATED. The connection string for
the zookeeper connection in the form
host:port. Multiple URLS can be
given to allow fail-over. Required
when configuring SCRAM credentials
for users or dynamic broker configs
when the relevant broker(s) are
down. Not allowed otherwise.
kafka-verifiable-producer.sh¶
The kafka-verifiable-producer
tool produces increasing integers to the specified topic and prints JSON metadata to STDOUT on each send
request.
This tool shows which messages have been acked and which have not. This tool is intended for internal testing.
Usage details
usage: verifiable-producer [-h] --topic TOPIC [--max-messages MAX-MESSAGES]
[--throughput THROUGHPUT] [--acks ACKS]
[--producer.config CONFIG_FILE] [--message-create-time CREATETIME]
[--value-prefix VALUE-PREFIX] [--repeating-keys REPEATING-KEYS]
(--bootstrap-server HOST1:PORT1[,HOST2:PORT2[...]] |
--broker-list HOST1:PORT1[,HOST2:PORT2[...]])
This tool produces increasing integers to the specified topic and prints JSON metadata to
stdout on each "send" request, making externally visible which messages have been acked and
which have not.
optional arguments:
-h, --help show this help message and exit
--topic TOPIC Produce messages to this topic.
--max-messages MAX-MESSAGES
Produce this many messages. If -1, produce messages until the process
is killed externally. (default: -1)
--throughput THROUGHPUT
If set >= 0, throttle maximum message throughput to *approximately*
THROUGHPUT messages/sec. (default: -1)
--acks ACKS Acks required on each produced message. See Kafka docs on acks for
details. (default: -1)
--producer.config CONFIG_FILE
Producer config properties file.
--message-create-time CREATETIME
Send messages with creation time starting at the arguments value, in
milliseconds since epoch (default: -1)
--value-prefix VALUE-PREFIX
If specified, each produced value will have this prefix with a dot
separator
--repeating-keys REPEATING-KEYS
If specified, each produced record will have a key starting at 0
increment by 1 up to the number specified (exclusive), then the key
is set to 0 again
Connection Group:
Group of arguments for connection to brokers
--bootstrap-server HOST1:PORT1[,HOST2:PORT2[...]]
REQUIRED: The server(s) to connect to. Comma-separated list of Kafka
brokers in the form HOST1:PORT1,HOST2:PORT2,...
--broker-list HOST1:PORT1[,HOST2:PORT2[...]]
DEPRECATED, use --bootstrap-server instead; ignored if --bootstrap-
server is specified. Comma-separated list of Kafka brokers in the
form HOST1:PORT1,HOST2:PORT2,...
kafka-console-consumer.sh¶
Use the kafka-console-consumer
tool to consume records from a topic.
Requires bootstrap-server
parameter that contains a comma-separated list of <hostname:port>
entries specifying the server/port to connect to.
Confluent Tip
If you are using Confluent, you can use the Confluent CLI
and the kafka topic
command to produce and consume from a topic.
Example:
bin/kafka-console-consumer.sh --bootstrap-server HOST1:PORT1,HOST2:PORT2 --consumer.config config.properties --topic testTopic --property "print.key=true"
Usage details
This tool helps to read data from Kafka topics and outputs it to standard output.
Option Description
------ -----------
--bootstrap-server <String: server to REQUIRED: The server(s) to connect to.
connect to>
--consumer-property <String: A mechanism to pass user-defined
consumer_prop> properties in the form key=value to
the consumer.
--consumer.config <String: config file> Consumer config properties file. Note
that [consumer-property] takes
precedence over this config.
--enable-systest-events Log lifecycle events of the consumer
in addition to logging consumed
messages. (This is specific for
system tests.)
--formatter <String: class> The name of a class to use for
formatting kafka messages for
display. (default: kafka.tools.
DefaultMessageFormatter)
--from-beginning If the consumer does not already have
an established offset to consume
from, start with the earliest
message present in the log rather
than the latest message.
--group <String: consumer group id> The consumer group id of the consumer.
--help Print usage information.
--include <String: Java regex (String)> Regular expression specifying list of
topics to include for consumption.
--isolation-level <String> Set to read_committed in order to
filter out transactional messages
which are not committed. Set to
read_uncommitted to read all
messages. (default: read_uncommitted)
--key-deserializer <String:
deserializer for key>
--max-messages <Integer: num_messages> The maximum number of messages to
consume before exiting. If not set,
consumption is continual.
--offset <String: consume offset> The offset to consume from (a non-
negative number), or 'earliest'
which means from beginning, or
'latest' which means from end
(default: latest)
--partition <Integer: partition> The partition to consume from.
Consumption starts from the end of
the partition unless '--offset' is
specified.
--property <String: prop> The properties to initialize the
message formatter. Default
properties include:
print.timestamp=true|false
print.key=true|false
print.offset=true|false
print.partition=true|false
print.headers=true|false
print.value=true|false
key.separator=<key.separator>
line.separator=<line.separator>
headers.separator=<line.separator>
null.literal=<null.literal>
key.deserializer=<key.deserializer>
value.deserializer=<value.
deserializer>
header.deserializer=<header.
deserializer>
Users can also pass in customized
properties for their formatter; more
specifically, users can pass in
properties keyed with 'key.
deserializer.', 'value.
deserializer.' and 'headers.
deserializer.' prefixes to configure
their deserializers.
--skip-message-on-error If there is an error when processing a
message, skip it instead of halt.
--timeout-ms <Integer: timeout_ms> If specified, exit if no message is
available for consumption for the
specified interval.
--topic <String: topic> The topic to consume on.
--value-deserializer <String:
deserializer for values>
--version Display Kafka version.
--whitelist <String: Java regex DEPRECATED, use --include instead;
(String)> ignored if --include specified.
Regular expression specifying list
of topics to include for consumption.
kafka-console-producer.sh¶
Use the kafka-console-producer
tool to produce records to a topic.
Requires a bootstrap-server
parameter that contains a comma-separated list of <hostname:port> entries specifying the server/port to connect to.
Example:
kafka-console-producer.sh --bootstrap-server HOST1:PORT1,HOST2:PORT2 --producer.config config.properties --topic testTopic --property "parse.key=true" --property "key.separator=:"
Confluent Tip
If you are using Confluent, you can use the Confluent CLI
and the kafka topic
command to produce and consume from a topic.
Usage details
This tool helps to read data from standard input and publish it to Kafka.
Option Description
------ -----------
--batch-size <Integer: size> Number of messages to send in a single
batch if they are not being sent
synchronously. please note that this
option will be replaced if max-
partition-memory-bytes is also set
(default: 16384)
--bootstrap-server <String: server to REQUIRED unless --broker-list
connect to> (deprecated) is specified. The server
(s) to connect to. The broker list
string in the form HOST1:PORT1,HOST2:
PORT2.
--broker-list <String: broker-list> DEPRECATED, use --bootstrap-server
instead; ignored if --bootstrap-
server is specified. The broker
list string in the form HOST1:PORT1,
HOST2:PORT2.
--compression-codec [String: The compression codec: either 'none',
compression-codec] 'gzip', 'snappy', 'lz4', or 'zstd'.
If specified without value, then it
defaults to 'gzip'
--help Print usage information.
--line-reader <String: reader_class> The class name of the class to use for
reading lines from standard in. By
default each line is read as a
separate message. (default: kafka.
tools.
ConsoleProducer$LineMessageReader)
--max-block-ms <Long: max block on The max time that the producer will
send> block for during a send request.
(default: 60000)
--max-memory-bytes <Long: total memory The total memory used by the producer
in bytes> to buffer records waiting to be sent
to the server. This is the option to
control `buffer.memory` in producer
configs. (default: 33554432)
--max-partition-memory-bytes <Integer: The buffer size allocated for a
memory in bytes per partition> partition. When records are received
which are smaller than this size the
producer will attempt to
optimistically group them together
until this size is reached. This is
the option to control `batch.size`
in producer configs. (default: 16384)
--message-send-max-retries <Integer> Brokers can fail receiving the message
for multiple reasons, and being
unavailable transiently is just one
of them. This property specifies the
number of retries before the
producer give up and drop this
message. This is the option to
control `retries` in producer
configs. (default: 3)
--metadata-expiry-ms <Long: metadata The period of time in milliseconds
expiration interval> after which we force a refresh of
metadata even if we haven't seen any
leadership changes. This is the
option to control `metadata.max.age.
ms` in producer configs. (default:
300000)
--producer-property <String: A mechanism to pass user-defined
producer_prop> properties in the form key=value to
the producer.
--producer.config <String: config file> Producer config properties file. Note
that [producer-property] takes
precedence over this config.
--property <String: prop> A mechanism to pass user-defined
properties in the form key=value to
the message reader. This allows
custom configuration for a user-
defined message reader.
Default properties include:
parse.key=false
parse.headers=false
ignore.error=false
key.separator=\t
headers.delimiter=\t
headers.separator=,
headers.key.separator=:
null.marker= When set, any fields
(key, value and headers) equal to
this will be replaced by null
Default parsing pattern when:
parse.headers=true and parse.key=true:
"h1:v1,h2:v2...\tkey\tvalue"
parse.key=true:
"key\tvalue"
parse.headers=true:
"h1:v1,h2:v2...\tvalue"
--request-required-acks <String: The required `acks` of the producer
request required acks> requests (default: -1)
--request-timeout-ms <Integer: request The ack timeout of the producer
timeout ms> requests. Value must be non-negative
and non-zero. (default: 1500)
--retry-backoff-ms <Long> Before each retry, the producer
refreshes the metadata of relevant
topics. Since leader election takes
a bit of time, this property
specifies the amount of time that
the producer waits before refreshing
the metadata. This is the option to
control `retry.backoff.ms` in
producer configs. (default: 100)
--socket-buffer-size <Integer: size> The size of the tcp RECV size. This is
the option to control `send.buffer.
bytes` in producer configs.
(default: 102400)
--sync If set message send requests to the
brokers are synchronously, one at a
time as they arrive.
--timeout <Long: timeout_ms> If set and the producer is running in
asynchronous mode, this gives the
maximum amount of time a message
will queue awaiting sufficient batch
size. The value is given in ms. This
is the option to control `linger.ms`
in producer configs. (default: 1000)
--topic <String: topic> REQUIRED: The topic id to produce
messages to.
--version Display Kafka version.
kafka-producer-perf-test.sh¶
The kafka-producer-perf-test
tool enables you to produce a large quantity of data to test producer performance for the Kafka cluster.
Example:
bin/kafka-producer-perf-test.sh --topic topic-a --num-records 200000 --record-size 1000 --throughput 10000000 --producer-props bootstrap.servers=host1:9092
Usage details
usage: producer-perf-test [-h] --topic TOPIC --num-records NUM-RECORDS
[--payload-delimiter PAYLOAD-DELIMITER] --throughput THROUGHPUT
[--producer-props PROP-NAME=PROP-VALUE [PROP-NAME=PROP-VALUE ...]]
[--producer.config CONFIG-FILE] [--print-metrics]
[--transactional-id TRANSACTIONAL-ID]
[--transaction-duration-ms TRANSACTION-DURATION]
(--record-size RECORD-SIZE | --payload-file PAYLOAD-FILE)
This tool is used to verify the producer performance.
optional arguments:
-h, --help show this help message and exit
--topic TOPIC produce messages to this topic
--num-records NUM-RECORDS
number of messages to produce
--payload-delimiter PAYLOAD-DELIMITER
provides delimiter to be used when --payload-file is provided.
Defaults to new line. Note that this parameter will be ignored if --
payload-file is not provided. (default: \n)
--throughput THROUGHPUT
throttle maximum message throughput to *approximately* THROUGHPUT
messages/sec. Set this to -1 to disable throttling.
--producer-props PROP-NAME=PROP-VALUE [PROP-NAME=PROP-VALUE ...]
kafka producer related configuration properties like bootstrap.
servers, client.id etc. These configs take precedence over those
passed via --producer.config.
--producer.config CONFIG-FILE
producer config properties file.
--print-metrics print out metrics at the end of the test. (default: false)
--transactional-id TRANSACTIONAL-ID
The transactionalId to use if transaction-duration-ms is > 0. Useful
when testing the performance of concurrent transactions. (default:
performance-producer-default-transactional-id)
--transaction-duration-ms TRANSACTION-DURATION
The max age of each transaction. The commitTransaction will be called
after this time has elapsed. Transactions are only enabled if this
value is positive. (default: 0)
either --record-size or --payload-file must be specified but not both.
--record-size RECORD-SIZE
message size in bytes. Note that you must provide exactly one of --
record-size or --payload-file.
--payload-file PAYLOAD-FILE
file to read the message payloads from. This works only for UTF-8
encoded text files. Payloads will be read from this file and a
payload will be randomly selected when sending messages. Note that
you must provide exactly one of --record-size or --payload-file.
kafka-consumer-groups.sh¶
Use the kafka-consumer-groups
tool to get a list of the active groups in the cluster.
For example, to show the position of all consumers in a group named user-group
, you might use the following command.
bin/kafka-consumer-groups.sh \
--bootstrap-server localhost:9092 \
--describe --group user-group
This would result in output like the following (CONSUMER-ID
entries truncated for readability).
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
user 0 2 4 2 consumer-1-... /127.0.0.1 consumer-1
user 1 2 3 1 consumer-1-... /127.0.0.1 consumer-1
user 2 2 3 1 consumer-2-... /127.0.0.1 consumer-2
For more examples, see View Consumer Group Info.
Usage details
Option Description
------ -----------
--all-groups Apply to all consumer groups.
--all-topics Consider all topics assigned to a
group in the `reset-offsets` process.
--bootstrap-server <String: server to REQUIRED: The server(s) to connect to.
connect to>
--by-duration <String: duration> Reset offsets to offset by duration
from current timestamp. Format:
'PnDTnHnMnS'
--command-config <String: command Property file containing configs to be
config property file> passed to Admin Client and Consumer.
--delete Pass in groups to delete topic
partition offsets and ownership
information over the entire consumer
group. For instance --group g1 --
group g2
--delete-offsets Delete offsets of consumer group.
Supports one consumer group at the
time, and multiple topics.
--describe Describe consumer group and list
offset lag (number of messages not
yet processed) related to given
group.
--dry-run Only show results without executing
changes on Consumer Groups.
Supported operations: reset-offsets.
--execute Execute operation. Supported
operations: reset-offsets.
--export Export operation execution to a CSV
file. Supported operations: reset-
offsets.
--from-file <String: path to CSV file> Reset offsets to values defined in CSV
file.
--group <String: consumer group> The consumer group we wish to act on.
--help Print usage information.
--list List all consumer groups.
--members Describe members of the group. This
option may be used with '--describe'
and '--bootstrap-server' options
only.
Example: --bootstrap-server localhost:
9092 --describe --group group1 --
members
--offsets Describe the group and list all topic
partitions in the group along with
their offset lag. This is the
default sub-action of and may be
used with '--describe' and '--
bootstrap-server' options only.
Example: --bootstrap-server localhost:
9092 --describe --group group1 --
offsets
--reset-offsets Reset offsets of consumer group.
Supports one consumer group at the
time, and instances should be
inactive
Has 2 execution options: --dry-run
(the default) to plan which offsets
to reset, and --execute to update
the offsets. Additionally, the --
export option is used to export the
results to a CSV format.
You must choose one of the following
reset specifications: --to-datetime,
--by-period, --to-earliest, --to-
latest, --shift-by, --from-file, --
to-current.
To define the scope use --all-topics
or --topic. One scope must be
specified unless you use '--from-
file'.
--shift-by <Long: number-of-offsets> Reset offsets shifting current offset
by 'n', where 'n' can be positive or
negative.
--state [String] When specified with '--describe',
includes the state of the group.
Example: --bootstrap-server localhost:
9092 --describe --group group1 --
state
When specified with '--list', it
displays the state of all groups. It
can also be used to list groups with
specific states.
Example: --bootstrap-server localhost:
9092 --list --state stable,empty
This option may be used with '--
describe', '--list' and '--bootstrap-
server' options only.
--timeout <Long: timeout (ms)> The timeout that can be set for some
use cases. For example, it can be
used when describing the group to
specify the maximum amount of time
in milliseconds to wait before the
group stabilizes (when the group is
just created, or is going through
some changes). (default: 5000)
--to-current Reset offsets to current offset.
--to-datetime <String: datetime> Reset offsets to offset from datetime.
Format: 'YYYY-MM-DDTHH:mm:SS.sss'
--to-earliest Reset offsets to earliest offset.
--to-latest Reset offsets to latest offset.
--to-offset <Long: offset> Reset offsets to a specific offset.
--topic <String: topic> The topic whose consumer group
information should be deleted or
topic whose should be included in
the reset offset process. In `reset-
offsets` case, partitions can be
specified using this format: `topic1:
0,1,2`, where 0,1,2 are the
partition to be included in the
process. Reset-offsets also supports
multiple topic inputs.
--verbose Provide additional information, if
any, when describing the group. This
option may be used with '--
offsets'/'--members'/'--state' and
'--bootstrap-server' options only.
Example: --bootstrap-server localhost:
9092 --describe --group group1 --
members --verbose
--version Display Kafka version.
kafka-consumer-perf-test.sh¶
This tool tests the consumer performance for the Kafka cluster.
Usage details
Option Description
------ -----------
--bootstrap-server <String: server to REQUIRED unless --broker-list
connect to> (deprecated) is specified. The server
(s) to connect to.
--broker-list <String: broker-list> DEPRECATED, use --bootstrap-server
instead; ignored if --bootstrap-
server is specified. The broker
list string in the form HOST1:PORT1,
HOST2:PORT2.
--consumer.config <String: config file> Consumer config properties file.
--date-format <String: date format> The date format to use for formatting
the time field. See java.text.
SimpleDateFormat for options.
(default: yyyy-MM-dd HH:mm:ss:SSS)
--fetch-size <Integer: size> The amount of data to fetch in a
single request. (default: 1048576)
--from-latest If the consumer does not already have
an established offset to consume
from, start with the latest message
present in the log rather than the
earliest message.
--group <String: gid> The group id to consume on. (default:
perf-consumer-50334)
--help Print usage information.
--hide-header If set, skips printing the header for
the stats
--messages <Long: count> REQUIRED: The number of messages to
send or consume
--num-fetch-threads <Integer: count> DEPRECATED AND IGNORED: Number of
fetcher threads. (default: 1)
--print-metrics Print out the metrics.
--reporting-interval <Integer: Interval in milliseconds at which to
interval_ms> print progress info. (default: 5000)
--show-detailed-stats If set, stats are reported for each
reporting interval as configured by
reporting-interval
--socket-buffer-size <Integer: size> The size of the tcp RECV size.
(default: 2097152)
--threads <Integer: count> DEPRECATED AND IGNORED: Number of
processing threads. (default: 10)
--timeout [Long: milliseconds] The maximum allowed time in
milliseconds between returned
records. (default: 10000)
--topic <String: topic> REQUIRED: The topic to consume from.
--version Display Kafka version.
Manage Kafka Connect¶
connect-distributed.sh¶
Use the connect-distributed
tool to run Connect workers in Distributed mode, meaning on multiple, distributed, machines.
Distributed mode handles automatic balancing of work, allows you to scale up
(or down) dynamically, and offers fault tolerance both in the active tasks and for configuration and offset commit data.
Usage details
USAGE: ./connect-distributed.sh [-daemon] connect-distributed.properties
connect-standalone.sh¶
Use the connect-standalone
tool to run Kafka Connect in standalone mode meaning all work is performed in a single process. This is good for
getting started but lacks fault tolerance.
For more information, see Kafka Connect
Usage details
USAGE: ./connect-standalone.sh [-daemon] connect-standalone.properties
connect-mirror-maker.sh¶
Use the connect-mirror-maker
tool to replicate topics from one cluster to another using the Connect framework.
You must pass an an mm2.properties
MM2 configuration file. For more information, see
KIP-382: MirrorMaker 2.0
or Getting up to speed with MirrorMaker 2.
Usage details
usage: connect-mirror-maker [-h] [--clusters CLUSTER [CLUSTER ...]] mm2.properties
MirrorMaker 2.0 driver
positional arguments:
mm2.properties MM2 configuration file.
optional arguments:
-h, --help show this help message and exit
--clusters CLUSTER [CLUSTER ...]
Target cluster to use for this node.
Manage Kafka Streams¶
kafka-streams-application-reset.sh¶
For Kafka Streams applications, the kafka-streams-application-reset
tool resets the application and forces it to reprocess its data from the beginning. Useful for debugging and testing.
For example, the following command would reset the my-streams-app
application:
kafka-streams-application-reset.sh --application-id my-streams-app \
--input-topics my-input-topic \
--intermediate-topics rekeyed-topic
For more information, see Kafka Streams Application Reset Tool in the Confluent documentation.
Usage details
This tool helps to quickly reset an application in order to reprocess its data from scratch.
* This tool resets offsets of input topics to the earliest available offset (by default), or to a specific
defined position and it skips to the end of intermediate topics (topics that are input and output topics,
e.g., used by deprecated through() method).
* This tool deletes the internal topics that were created by Kafka Streams (topics starting with "<application.id>-").
The tool finds these internal topics automatically. If the topics flagged automatically for deletion by
the dry-run are unsuitable, you can specify a subset with the "--internal-topics" option.
* This tool will not delete output topics (if you want to delete them, you need to do it yourself with the bin/kafka-topics.sh command).
* This tool will not clean up the local state on the stream application instances (the persisted stores
used to cache aggregation results). You need to call KafkaStreams#cleanUp() in your application
or manually delete them from the directory specified by "state.dir"
configuration (${java.io.tmpdir}/kafka-streams/<application.id> by default).
* When long session timeout has been configured, active members could take longer to get expired on the
broker thus blocking the reset job to complete. Use the "--force" option could remove those
left-over members immediately. Make sure to stop all stream applications when this option
is specified to avoid unexpected disruptions.
*** Important! You will get wrong output if you don't clean up the local stores after running the reset tool!
*** Warning! This tool makes irreversible changes to your application. It is strongly recommended that you run this first with "--dry-run" to preview your changes before making them.
Option (* = required) Description
--------------------- -----------
* --application-id <String: id> The Kafka Streams application ID
(application.id).
--bootstrap-servers <String: urls> Comma-separated list of broker urls with
format: HOST1:PORT1,HOST2:PORT2
(default: localhost:9092)
--by-duration <String> Reset offsets to offset by duration from
current timestamp. Format: 'PnDTnHnMnS'
--config-file <String: file name> Property file containing configs to be
passed to admin clients and embedded
consumer.
--dry-run Display the actions that would be
performed without executing the reset
commands.
--force Force the removal of members of the
consumer group (intended to remove
stopped members if a long session
timeout was used). Make sure to shut
down all stream applications when this
option is specified to avoid unexpected
rebalances.
--from-file <String> Reset offsets to values defined in CSV
file.
--help Print usage information.
--input-topics <String: list> Comma-separated list of user input
topics. For these topics, the tool by
default will reset the offset to the
earliest available offset. Reset to
other offset position by appending
other reset offset option, ex: --input-
topics foo --shift-by 5
--intermediate-topics <String: list> Comma-separated list of intermediate user
topics (topics that are input and
output topics, e.g., used in the
deprecated through() method). For these
topics, the tool will skip to the end.
--internal-topics <String: list> Comma-separated list of internal topics
to delete. Must be a subset of the
internal topics marked for deletion by
the default behaviour (do a dry-run
without this option to view these
topics).
--shift-by <Long: number-of-offsets> Reset offsets shifting current offset by
'n', where 'n' can be positive or
negative
--to-datetime <String> Reset offsets to offset from datetime.
Format: 'YYYY-MM-DDTHH:mm:SS.sss'
--to-earliest Reset offsets to earliest offset.
--to-latest Reset offsets to latest offset.
--to-offset <Long> Reset offsets to a specific offset.
--version Print version information and exit.
Manage security¶
kafka-acls¶
Use the kafka-acls
tool to add, remove and list ACLs. For example, if you wanted to
add two principal users, Jose and Jane to have read and write permissions on
the user
topic from specific IP addresses, you could use a command like the following:
bin/kafka-acls.sh --bootstrap-server localhost:9092 --add --allow-principal User:Jose --allow-principal User:Jane --allow-host 198.51.100.0 --allow-host 198.51.100.1 --operation Read --operation Write --topic user
For more information, see ACL Command-line interface.
Usage details
This tool helps to manage acls on kafka.
Option Description
------ -----------
--add Indicates you are trying to add ACLs.
--allow-host <String: allow-host> Host from which principals listed in --
allow-principal will have access. If
you have specified --allow-principal
then the default for this option
will be set to '*' which allows
access from all hosts.
--allow-principal <String: allow- principal is in principalType:name
principal> format. Note that principalType must
be supported by the Authorizer being
used. For example, User:'*' is the
wild card indicating all users.
--authorizer <String: authorizer> DEPRECATED: Fully qualified class name
of the authorizer, which defaults to
kafka.security.authorizer.
AclAuthorizer if --bootstrap-server
is not provided. Warning: support
for ACL configuration directly
through the authorizer is deprecated
and will be removed in a future
release. Please use --bootstrap-
server instead to set ACLs through
the admin client.
--authorizer-properties <String: DEPRECATED: The properties required to
authorizer-properties> configure an instance of the
Authorizer specified by --
authorizer. These are key=val pairs.
For the default authorizer, example
values are: zookeeper.
connect=localhost:2181. Warning:
support for ACL configuration
directly through the authorizer is
deprecated and will be removed in a
future release. Please use --
bootstrap-server instead to set ACLs
through the admin client.
--bootstrap-server <String: server to A list of host/port pairs to use for
connect to> establishing the connection to the
Kafka cluster. This list should be
in the form host1:port1,host2:
port2,... This config is required
for acl management using admin
client API.
--cluster Add/Remove cluster ACLs.
--command-config [String: command- A property file containing configs to
config] be passed to Admin Client.
--consumer Convenience option to add/remove ACLs
for consumer role. This will
generate ACLs that allows READ,
DESCRIBE on topic and READ on group.
--delegation-token <String: delegation- Delegation token to which ACLs should
token> be added or removed. A value of '*'
indicates ACL should apply to all
tokens.
--deny-host <String: deny-host> Host from which principals listed in --
deny-principal will be denied
access. If you have specified --deny-
principal then the default for this
option will be set to '*' which
denies access from all hosts.
--deny-principal <String: deny- principal is in principalType:name
principal> format. By default anyone not added
through --allow-principal is denied
access. You only need to use this
option as negation to already
allowed set. Note that principalType
must be supported by the Authorizer
being used. For example if you
wanted to allow access to all users
in the system but not test-user you
can define an ACL that allows access
to User:'*' and specify --deny-
principal=User:test@EXAMPLE.COM. AND
PLEASE REMEMBER DENY RULES TAKES
PRECEDENCE OVER ALLOW RULES.
--force Assume Yes to all queries and do not
prompt.
--group <String: group> Consumer Group to which the ACLs
should be added or removed. A value
of '*' indicates the ACLs should
apply to all groups.
--help Print usage information.
--idempotent Enable idempotence for the producer.
This should be used in combination
with the --producer option. Note
that idempotence is enabled
automatically if the producer is
authorized to a particular
transactional-id.
--list List ACLs for the specified resource,
use --topic <topic> or --group
<group> or --cluster to specify a
resource.
--operation <String> Operation that is being allowed or
denied. Valid operation names are:
Describe
DescribeConfigs
Alter
IdempotentWrite
Read
Delete
Create
ClusterAction
All
CreateTokens
DescribeTokens
Write
AlterConfigs
(default: All)
--principal [String: principal] List ACLs for the specified principal.
principal is in principalType:name
format. Note that principalType must
be supported by the Authorizer being
used. Multiple --principal option
can be passed.
--producer Convenience option to add/remove ACLs
for producer role. This will
generate ACLs that allows WRITE,
DESCRIBE and CREATE on topic.
--remove Indicates you are trying to remove
ACLs.
--resource-pattern-type The type of the resource pattern or
<ANY|MATCH|LITERAL|PREFIXED> pattern filter. When adding acls,
this should be a specific pattern
type, e.g. 'literal' or 'prefixed'.
When listing or removing acls, a
specific pattern type can be used to
list or remove acls from specific
resource patterns, or use the filter
values of 'any' or 'match', where
'any' will match any pattern type,
but will match the resource name
exactly, where as 'match' will
perform pattern matching to list or
remove all acls that affect the
supplied resource(s). WARNING:
'match', when used in combination
with the '--remove' switch, should
be used with care. (default: LITERAL)
--topic <String: topic> topic to which ACLs should be added or
removed. A value of '*' indicates
ACL should apply to all topics.
--transactional-id <String: The transactionalId to which ACLs
transactional-id> should be added or removed. A value
of '*' indicates the ACLs should
apply to all transactionalIds.
--user-principal <String: user- Specifies a user principal as a
principal> resource in relation with the
operation. For instance one could
grant CreateTokens or DescribeTokens
permission on a given user principal.
--version Display Kafka version.
--zk-tls-config-file <String: DEPRECATED: Identifies the file where
Authorizer ZooKeeper TLS ZooKeeper client TLS connectivity
configuration> properties are defined for the
default authorizer kafka.security.
authorizer.AclAuthorizer. Any
properties other than the following
(with or without an "authorizer."
prefix) are ignored: zookeeper.
clientCnxnSocket, zookeeper.ssl.
cipher.suites, zookeeper.ssl.client.
enable, zookeeper.ssl.crl.enable,
zookeeper.ssl.enabled.protocols,
zookeeper.ssl.endpoint.
identification.algorithm, zookeeper.
ssl.keystore.location, zookeeper.ssl.
keystore.password, zookeeper.ssl.
keystore.type, zookeeper.ssl.ocsp.
enable, zookeeper.ssl.protocol,
zookeeper.ssl.truststore.location,
zookeeper.ssl.truststore.password,
zookeeper.ssl.truststore.type. Note
that if SASL is not configured and
zookeeper.set.acl is supposed to be
true due to mutual certificate
authentication being used then it is
necessary to explicitly specify --
authorizer-properties zookeeper.set.
acl=true. Warning: support for ACL
configuration directly through the
authorizer is deprecated and will be
removed in a future release. Please
use --bootstrap-server instead to
set ACLs through the admin client.
kafka-delegation-tokens.sh¶
Use the kafka-delegation-tokens
tool to create, renew, expire and describe delegation tokens. Delegation tokens
are shared secrets between Kafka brokers and clients, and are a lightweight authentication mechanism meant to complement
existing SASL/SSL methods.
For more information, see Authentication using Delegation Tokens in the
Confluent Documentation.
Usage details
This tool helps to create, renew, expire, or describe delegation tokens.
Option Description
------ -----------
--bootstrap-server <String> REQUIRED: server(s) to use for bootstrapping.
--command-config <String> REQUIRED: A property file containing configs to
be passed to Admin Client. Token management
operations are allowed in secure mode only.
This config file is used to pass security
related configs.
--create Create a new delegation token. Use --renewer-
principal option to pass renewers principals.
--describe Describe delegation tokens for the given
principals. Use --owner-principal to pass
owner/renewer principals. If --owner-principal
option is not supplied, all the user owned
tokens and tokens where user have Describe
permission will be returned.
--expire Expire delegation token. Use --expiry-time-
period option to expire the token.
--expiry-time-period [Long] Expiry time period in milliseconds. If the value
is -1, then the token will get invalidated
immediately.
--help Print usage information.
--hmac [String] HMAC of the delegation token
--max-life-time-period [Long] Max life period for the token in milliseconds.
If the value is -1, then token max life time
will default to a server side config value
(delegation.token.max.lifetime.ms).
--owner-principal [String] owner is a kafka principal. It is should be in
principalType:name format.
--renew Renew delegation token. Use --renew-time-period
option to set renew time period.
--renew-time-period [Long] Renew time period in milliseconds. If the value
is -1, then the renew time period will default
to a server side config value (delegation.
token.expiry.time.ms).
--renewer-principal [String] renewer is a kafka principal. It is should be in
principalType:name format.
--version Display Kafka version.
zookeeper-security-migration.sh¶
Use the zookeeper-security-migration
tool to restrict or provide access to ZooKeeper metadata. The tool updates the ACLs of znodes.
Usage details
ZooKeeper Migration Tool Help. This tool updates the ACLs of znodes as part of the process of setting up ZooKeeper authentication.
Option Description
------ -----------
--enable.path.check Checks if all the root paths exist in
ZooKeeper before migration. If not,
exit the command.
--help Print usage information.
--version Display Kafka version.
--zk-tls-config-file <String: Identifies the file where ZooKeeper
ZooKeeper TLS configuration> client TLS connectivity properties
are defined. Any properties other
than zookeeper.ssl.keystore.
password, zookeeper.ssl.truststore.
location, zookeeper.ssl.truststore.
type, zookeeper.ssl.ocsp.enable,
zookeeper.ssl.crl.enable, zookeeper.
ssl.truststore.password, zookeeper.
ssl.protocol, zookeeper.ssl.keystore.
type, zookeeper.ssl.client.enable,
zookeeper.ssl.cipher.suites,
zookeeper.clientCnxnSocket,
zookeeper.ssl.endpoint.
identification.algorithm, zookeeper.
ssl.enabled.protocols, zookeeper.ssl.
keystore.location are ignored.
--zookeeper.acl <String> Indicates whether to make the Kafka
znodes in ZooKeeper secure or
unsecure. The options are 'secure'
and 'unsecure'
--zookeeper.connect <String> Sets the ZooKeeper connect string
(ensemble). This parameter takes a
comma-separated list of host:port
pairs. (default: localhost:2181)
--zookeeper.connection.timeout Sets the ZooKeeper connection timeout.
<Integer> (default: 30000)
--zookeeper.session.timeout <Integer> Sets the ZooKeeper session timeout.
(default: 30000)
Test and troubleshoot¶
This section contains tools you can use for testing and troubleshooting your applications.
kafka-e2e-latency.sh¶
The kafka-e2e-latency
tool is a performance testing tool used to measure end-to-end latency in Kafka.
It works by sending messages to a Kafka topic and then consuming those messages from a Kafka consumer.
The tool calculates the time difference between when a message was produced and when it was consumed,
giving you an idea of the end-to-end latency for your Kafka cluster. This tool is useful for testing the
performance of your Kafka cluster and identifying any bottlenecks or issues that may be affecting latency.
To run the tool, you provide details such as the message size, number of messages and acks
setting for the producer.
For more about end-to-end latency, see Configure Kafka to Minimize Latency.
- Following are the required arguments
broker_list
: The location of the bootstrap broker for both the producer and the consumertopic
: The topic name used by both the producer and the consumer to send/receive messagesnum_messages
: The number of messages to sendproducer_acks
: The producer setting for acks.message_size_bytes
: size of each message in bytes
For example:
kafka-e2e-latency.sh localhost:9092 test 10000 1 20
Usage details
USAGE: java org.apache.kafka.tools.EndToEndLatency broker_list topic num_messages producer_acks message_size_bytes [optional] properties_file
kafka-dump-log.sh¶
The kafka-dump-log
tool can be used in KRaft mode to parse a metadata log file and output its contents to the console.
Requires a comma-separated list of log files. The tool will scan the provided files and decode the metadata records.
The following example shows using the cluster-metadata-decoder
argument to decode
the metadata records in a log segment.
bin/kafka-dump-log.sh --cluster-metadata-decoder --files tmp/kraft-combined-logs/_cluster_metadata-0/00000000000000023946.log
Usage details
This tool helps to parse a log file and dump its contents to the console, useful for debugging a seemingly corrupt log segment.
Option Description
------ -----------
--cluster-metadata-decoder if set, log data will be parsed as cluster
metadata records.
--deep-iteration if set, uses deep instead of shallow
iteration. Automatically set if print-
data-log is enabled.
--files <String: file1, file2, ...> REQUIRED: The comma separated list of data
and index log files to be dumped.
--help Print usage information.
--index-sanity-check if set, just checks the index sanity
without printing its content. This is
the same check that is executed on
broker startup to determine if an index
needs rebuilding or not.
--key-decoder-class [String] if set, used to deserialize the keys. This
class should implement kafka.serializer.
Decoder trait. Custom jar should be
available in kafka/libs directory.
(default: kafka.serializer.StringDecoder)
--max-bytes <Integer: size> Limit the amount of total batches read in
bytes avoiding reading the whole .log
file(s). (default: 2147483647)
--max-message-size <Integer: size> Size of largest message. (default: 5242880)
--offsets-decoder if set, log data will be parsed as offset
data from the __consumer_offsets topic.
--print-data-log if set, printing the messages content when
dumping data logs. Automatically set if
any decoder option is specified.
--skip-record-metadata whether to skip printing metadata for each
record.
--transaction-log-decoder if set, log data will be parsed as
transaction metadata from the
__transaction_state topic.
--value-decoder-class [String] if set, used to deserialize the messages.
This class should implement kafka.
serializer.Decoder trait. Custom jar
should be available in kafka/libs
directory. (default: kafka.serializer.
StringDecoder)
--verify-index-only if set, just verify the index log without
printing its content.
--version Display Kafka version.
kafka-jmx.sh¶
The kafka-jmx
tool enables you to read JMX metrics from a given endpoint.
This tool only works reliably if the JmxServer is fully initialized prior to invoking the tool.
Usage details
Dump JMX values to standard output.
Option Description
------ -----------
--attributes <String: name> The list of attributes to include in
the query. This is a comma-separated
list. If no attributes are specified
all objects will be queried.
--date-format <String: format> The date format to use for formatting
the time field. See java.text.
SimpleDateFormat for options.
--help Print usage information.
--jmx-auth-prop <String: jmx-auth-prop> A mechanism to pass property in the
form 'username=password' when
enabling remote JMX with password
authentication.
--jmx-ssl-enable <Boolean: ssl-enable> Flag to enable remote JMX with SSL.
(default: false)
--jmx-url <String: service-url> The url to connect to poll JMX data.
See Oracle javadoc for JMXServiceURL
for details. (default: service:jmx:
rmi:///jndi/rmi://:9999/jmxrmi)
--object-name <String: name> A JMX object name to use as a query.
This can contain wild cards, and
this option can be given multiple
times to specify more than one
query. If no objects are specified
all objects will be queried.
--one-time [Boolean: one-time] Flag to indicate run once only.
(default: false)
--report-format <String: report-format> output format name: either 'original',
'properties', 'csv', 'tsv'
(default: original)
--reporting-interval <Integer: ms> Interval in MS with which to poll jmx
stats; default value is 2 seconds.
Value of -1 equivalent to setting
one-time to true (default: 2000)
--version Display Kafka version.
--wait Wait for requested JMX objects to
become available before starting
output. Only supported when the list
of objects is non-empty and contains
no object name patterns.
trogdor.sh¶
Trogdor is a test framework for Kafka. Trogdor can run benchmarks and other workloads. Trogdor can also inject faults in order to stress test the system. For more information, see Trogdor and TROGDOR.
Usage details
The Trogdor fault injector.
Usage:
./trogdor.sh [action] [options]
Actions:
agent: Run the trogdor agent.
coordinator: Run the trogdor coordinator.
client: Run the client which communicates with the trogdor coordinator.
agent-client: Run the client which communicates with the trogdor agent.
help: This help message.
kafka-run-class.sh¶
This kafka-run-class
tool is a thin wrapper around the Kafka Java class.
It is called by other tools, and should not be run or modified directly.
Usage details
USAGE: ./kafka-run-class.sh [-daemon] [-name servicename] [-loggc] classname [opts]
Note
This website includes content developed at the Apache Software Foundation under the terms of the Apache License v2.