Tutorial: Set Up a Multi-Broker Kafka Cluster¶
The following tutorial shows how to run a multi-broker cluster and provides example configurations for both KRaft mode and ZooKeeper mode. This tutorial will also show you how to do basic command-line operations and how to use Confluent Control Center to view your cluster.
Important
As of Confluent Platform 7.5, ZooKeeper is deprecated for new deployments. Confluent recommends KRaft mode for new deployments. To learn more about running Kafka in KRaft mode, see KRaft Overview for Confluent Platform, the KRaft steps in the Platform Quick Start, and Settings for other Kafka and Confluent Platform components.
For KRaft, the examples show an isolated mode configuration for a multi-broker cluster managed by a single controller. This maps to the deprecated ZooKeeper configuration, which uses one ZooKeeper instance and multiple brokers in a single cluster. To learn more about KRaft, see KRaft Overview for Confluent Platform and Kraft mode under Configure Confluent Platform for production.
In addition to some other differences noted in the steps below, note that:
- For KRaft mode, you will use
$CONFLUENT_HOME/etc/kafka/kraft/broker.properties
and$CONFLUENT_HOME/etc/kafka/kraft/controller.properties
. - For ZooKeeper mode, you will use
$CONFLUENT_HOME/etc/kafka/server.properties
and$CONFLUENT_HOME/etc/kafka/zookeeper.properties
.
You must have Confluent Platform installed to run the examples. Verify that you have the following Confluent Platform prerequisites, and Confluent Platform 7.0.0 or later installed on your local machine.
Prerequisites¶
- Internet connectivity.
- Operating system currently supported by Confluent Platform. For a supported list, see Operating systems.
- A supported version of Java downloaded and installed. Java 17, Java 11 are supported in this version of Confluent Platform. Java 8 has been deprecated and Java 9 and 10 are not supported. For more information, see Java supported versions. For a full list of system requirements, see Confluent Platform System Requirements.
- Confluent Platform 7.0.0 or later installed on your local machine. For more details on installation, see Install Confluent Platform using ZIP and TAR Archives.
What you will configure¶
In this tutorial, you will configure three brokers and one controller, either a KRaft controller or ZooKeeper node.
To run a single cluster with multiple brokers you need:
- 1 controller properties file (KRaft mode) or 1 ZooKeeper properties file (ZooKeeper mode)
- 3 Kafka broker properties files with unique broker IDs, listener ports (to surface details for all brokers on Control Center), and log file directories.
- Control Center properties file with the REST endpoints for
controlcenter.cluster
mapped to your brokers. - Metrics Reporter JAR file installed and enabled on the brokers. (If you start Confluent Platform as described below, from
$CONFLUENT_HOME/bin/
, the Metrics Reporter is automatically installed on the broker. Otherwise, you would need to add the path to the Metrics Reporter JAR file to your CLASSPATH.) - Properties files for any other Confluent Platform components you want to run, with default settings to start with.
All of this is described in the sections that follow.
Configure replication factors¶
The broker.properties
(KRaft) and server.properties
(ZooKeeper) files that ships with Confluent Platform have replication factors set
to 1
on several system topics to support development test environments and Quick Start for Confluent Platform scenarios. For real-world scenarios, however, a replication
factor greater than 1
is preferable to support fail-over and auto-balancing capabilities on both system and user-created topics.
For the purposes of this example, set the replication factors to 2
, which is one less than the number of brokers (3
).
When you create your topics, make sure that they also have the needed replication factor, depending on the number of brokers.
Run these commands to update replication configurations in KRaft mode.
sed -i '' -e "s/replication.factor=1/replication.factor=2/g" $CONFLUENT_HOME/etc/kafka/kraft/broker.properties
sed -i '' -e "s/#confluent.metrics.reporter.topic.replicas=1/confluent.metrics.reporter.topic.replicas=1/g" $CONFLUENT_HOME/etc/kafka/kraft/broker.properties
Run these commands to update replication configurations in ZooKeeper mode.
sed -i '' -e "s/replication.factor=1/replication.factor=2/g" $CONFLUENT_HOME/etc/kafka/server.properties
sed -i '' -e "s/#confluent.metrics.reporter.topic.replicas=1/confluent.metrics.reporter.topic.replicas=1/g" $CONFLUENT_HOME/etc/kafka/server.properties
When you complete these steps, your file should show the following configs:
offsets.topic.replication.factor=2
transaction.state.log.replication.factor=2
confluent.license.topic.replication.factor=2
confluent.metadata.topic.replication.factor=2
confluent.balancer.topic.replication.factor=2
Configuration snapshot preview: Basic configuration for a three-broker cluster¶
The following table shows a summary of the configurations to specify for each of these files, as a reference to check against if needed.
The steps in the next sections guide you through a quick way to set up these files, using existing the existing broker.properties
file (KRaft) or
server.properties
file (ZooKeeper) as a basis for your specialized ones.
Ready to get started? Skip to Configure the servers.
File | Configurations |
---|---|
controller.properties | The values for these basic properties must be unique for the controller:
|
broker.properties | The values for these basic properties must be them unique per broker (except, all use the same Controller quorum):
Add the following listener configuration to specify the REST endpoint for this broker:
|
broker-1.properties | The values for these basic properties must be unique per broker (except, all use the same Controller quorum):
Provide the listener configuration to specify the REST endpoint unique to this broker:
|
broker-2.properties | The values for these basic properties must be unique per broker (except, all use the same Controller quorum):
Provide the listener configuration to specify the REST endpoint unique to this broker:
|
File | Configurations |
---|---|
zookeeper.properties | ZooKeeper keeps all defaults, no changes will be needed:
|
server.properties | With replication factors properly set in the previous step, no further changes are needed for this file.
Add the following listener configuration to specify the REST endpoint for this broker:
|
server-1.properties | The values for these basic properties must be unique per broker:
Provide the listener configuration to specify the REST endpoint unique to this broker:
|
server-2.properties | The values for these basic properties must be unique per broker:
Provide the listener configuration to specify the REST endpoint unique to this broker:
|
Tip
In server.properties
and other configuration files, commented out properties or those not listed at all, take the default values.
For example, the commented out line for listeners
on broker 0 has the effect of setting a single listener to PLAINTEXT://:9092
.
Configure the servers¶
Start with the broker.properties
file you updated in the previous sections with regard to replication factors and enabling Self-Balancing Clusters.
You will make a few more changes to this file, then use it as the basis for the other servers.
Update the node ID, controller quorum voters and port for the first broker, and then add the REST endpoint listener configuration for this broker at the end of the file:
sed -i '' -e "s/node.id=2/node.id=0/g" $CONFLUENT_HOME/etc/kafka/kraft/broker.properties
sed -i '' -e "s/1@localhost:9093/5@localhost:9097/g" $CONFLUENT_HOME/etc/kafka/kraft/broker.properties
echo "confluent.http.server.listeners=http://localhost:8090" >> $CONFLUENT_HOME/etc/kafka/kraft/broker.properties
Copy the properties file for the first broker to use as a basis for the other two:
cp $CONFLUENT_HOME/etc/kafka/kraft/broker.properties $CONFLUENT_HOME/etc/kafka/kraft/broker-1.properties
cp $CONFLUENT_HOME/etc/kafka/kraft/broker.properties $CONFLUENT_HOME/etc/kafka/kraft/broker-2.properties
Update the node ID, listener, and data directories for broker-1, and then update the REST endpoint listener for this broker:
sed -i '' -e "s/node.id=0/node.id=1/g" $CONFLUENT_HOME/etc/kafka/kraft/broker-1.properties
sed -i '' -e "s/9092/9093/g" $CONFLUENT_HOME/etc/kafka/kraft/broker-1.properties
sed -i '' -e "s/kraft-broker-logs/kraft-broker-logs-1/g" $CONFLUENT_HOME/etc/kafka/kraft/broker-1.properties
sed -i '' -e "s/8090/8091/g" $CONFLUENT_HOME/etc/kafka/kraft/broker-1.properties
Update the node ID, listener, controller, and data directories for broker-2, and then update the REST endpoint listener for this broker:
sed -i '' -e "s/node.id=0/node.id=2/g" $CONFLUENT_HOME/etc/kafka/kraft/broker-2.properties
sed -i '' -e "s/9092/9094/g" $CONFLUENT_HOME/etc/kafka/kraft/broker-2.properties
sed -i '' -e "s/kraft-broker-logs/kraft-broker-logs-2/g" $CONFLUENT_HOME/etc/kafka/kraft/broker-2.properties
sed -i '' -e "s/8090/8092/g" $CONFLUENT_HOME/etc/kafka/kraft/broker-2.properties
Finally, update the controller node ID, quorum voters, and port:
sed -i '' -e "s/node.id=1/node.id=5/g" $CONFLUENT_HOME/etc/kafka/kraft/controller.properties
sed -i '' -e "s/9093/9097/g" $CONFLUENT_HOME/etc/kafka/kraft/controller.properties
sed -i '' -e "s/1@localhost/5@localhost/g" $CONFLUENT_HOME/etc/kafka/kraft/controller.properties
Start with the server.properties
file you updated in the previous sections with regard to replication factors and enabling Self-Balancing.
You will make a few more changes to this file, then use it as the basis for the other servers.
Uncomment the listener, and then add the REST endpoint listener configuration at the end of the file:
sed -i '' -e "s/#listeners=/listeners=/g" $CONFLUENT_HOME/etc/kafka/server.properties
echo "confluent.http.server.listeners=http://localhost:8090" >> $CONFLUENT_HOME/etc/kafka/server.properties
Copy the properties file for the first server to use as a basis for the other four servers. This is the file you updated in the previous sections with regard to replication factors and enabling Self-Balancing.
cp $CONFLUENT_HOME/etc/kafka/server.properties $CONFLUENT_HOME/etc/kafka/server-1.properties
cp $CONFLUENT_HOME/etc/kafka/server.properties $CONFLUENT_HOME/etc/kafka/server-2.properties
Update the broker ID, listener, and data directories for server-1, and then update the REST endpoint listener for this broker:
sed -i '' -e "s/broker.id=0/broker.id=1/g" $CONFLUENT_HOME/etc/kafka/server-1.properties
sed -i '' -e "s/9092/9093/g" $CONFLUENT_HOME/etc/kafka/server-1.properties
sed -i '' -e "s/kafka-logs/kafka-logs-1/g" $CONFLUENT_HOME/etc/kafka/server-1.properties
sed -i '' -e "s/8090/8091/g" $CONFLUENT_HOME/etc/kafka/server-1.properties
Update the broker ID, listener, and data directories for server-2, and then update the REST endpoint listener for this broker:
sed -i '' -e "s/broker.id=0/broker.id=2/g" $CONFLUENT_HOME/etc/kafka/server-2.properties
sed -i '' -e "s/9092/9094/g" $CONFLUENT_HOME/etc/kafka/server-2.properties
sed -i '' -e "s/kafka-logs/kafka-logs-2/g" $CONFLUENT_HOME/etc/kafka/server-2.properties
sed -i '' -e "s/8090/8092/g" $CONFLUENT_HOME/etc/kafka/server-2.properties
When you have completed this step, you will have three properties files that match the configurations shown in the Configuration snapshot preview: Basic configuration for a three-broker cluster:
broker.properties
(KRaft) orserver.properties
(ZooKeeper) which corresponds to node/broker 0broker-1.properties
(KRaft) orserver-1.properties
(ZooKeeper) which corresponds to node/broker 1broker-2.properties
(KRaft) orserver-2.properties
(ZooKeeper) which corresponds to node/broker 2
Run this command to list the files in KRaft mode:
ls $CONFLUENT_HOME/etc/kafka/kraft/
Run this command to list the files in ZooKeeper mode:
ls $CONFLUENT_HOME/etc/kafka/server*
(Optional) Configure Control Center with REST endpoints¶
This is an optional step, only needed if you want to use Confluent Control Center. It gives you a
similar starting point as you get in the Quick Start for Confluent Platform, and an alternate
way to work with and verify the topics and data you will create on the command
line with kafka-topics
.
You must tell Control Center about the REST endpoints for all brokers in your cluster, and the advertised listeners for the other components you may want to run. Without these configurations, the brokers and components will not show up on Control Center.
Make the following changes to $CONFLUENT_HOME/etc/confluent-control-center/control-center-dev.properties
and save the file.
Open the file in an editor; for example, in
vi
:vi $CONFLUENT_HOME/etc/confluent-control-center/control-center-dev.properties
Configure REST endpoints for the brokers.
In
$CONFLUENT_HOME/etc/confluent-control-center/control-center-dev.properties
, replace the default value for the Kafka REST endpoint URL by a copy-paste of the following lines to match your multi-broker configuration:# Kafka REST endpoint URL confluent.controlcenter.streams.cprest.url=http://localhost:8090,http://localhost:8091,http://localhost:8092
See also
Required Configurations for Control Center in Self-Balancing Configuration Options and
confluent.controlcenter.streams.cprest.url
in the Control Center Configuration Reference.Replace the configurations for Kafka Connect, ksqlDB, and Schema Registry to provide Control Center with the default advertised URLs to for the component clusters. You can delete the original configs and copy-paste the following into the file.
# A comma separated list of Connect host names confluent.controlcenter.connect.cluster=http://localhost:8083 # KSQL cluster URL confluent.controlcenter.ksql.ksqlDB.url=http://localhost:8088 # Schema Registry cluster URL confluent.controlcenter.schema.registry.url=http://localhost:8081
(Optional) Install the Datagen Connector¶
Install the Kafka Connect Datagen source connector using the confluent connect plugin install command, or by using Confluent Hub. This connector generates mock data for demonstration purposes and is not suitable for production.
To install with the confluent connect plugin install
command:
confluent connect plugin install confluentinc/kafka-connect-datagen:latest
Confluent Hub provides an online library of pre-packaged and ready-to-install extensions or add-ons for Confluent Platform and Kafka. To install with Confluent Hub:
confluent-hub install \
--no-prompt confluentinc/kafka-connect-datagen:latest
This is an optional step, but useful, as it gives you a similar starting point as you get in the Quick Start for Confluent Platform.
Start the controller and brokers¶
In KRaft mode, you must run the following commands from `$CONFLUENT_HOME
to generate a random cluster ID,
and format log directories for the controller and each broker in dedicated command windows. You will then start the controller and brokers
from those same dedicated windows.
The kafka-storage
command is run only once per broker/controller. You cannot use the kafka-storage
command to update an existing cluster.
If you make a mistake in configurations at that point, you must recreate the directories from scratch, and work through the steps again.
Controller
In a new dedicated command window, change directories into
$CONFLUENT_HOME
to run the KRaft setup commands and start the controller.cd $CONFLUENT_HOME
Generate a
random-uuid
for the cluster using the kafka-storage tool.KAFKA_CLUSTER_ID="$(bin/kafka-storage random-uuid)"
Get the value for KAFKA_CLUSTER_ID and add it to your
.bash_profile
,.bashrc
,.zsh
or similar so that it is available to you in new command windows for running the brokers. You will use this same cluster ID for all brokers.echo $KAFKA_CLUSTER_ID
Format the log directories for the controller:
./bin/kafka-storage format -t $KAFKA_CLUSTER_ID -c $CONFLUENT_HOME/etc/kafka/kraft/controller.properties --ignore-formatted
Start the controller:
kafka-server-start $CONFLUENT_HOME/etc/kafka/kraft/controller.properties
broker.properties (node 0)
In a new command window dedicated to running node 0, change directories into
$CONFLUENT_HOME
to run the KRaft setup commands and start your first broker.cd $CONFLUENT_HOME
Make sure that the KAFKA_CLUSTER_ID you generated for the controller is available in this shell as an environment variable.
(Optional Example) For example, if you added the value for KAFKA_CLUSTER_ID to your
.bash_profile
:source ~./bash_profile
echo $KAFKA_CLUSTER_ID
Format the log directories for this broker:
./bin/kafka-storage format -t $KAFKA_CLUSTER_ID -c $CONFLUENT_HOME/etc/kafka/kraft/broker.properties --ignore-formatted
Start the broker:
kafka-server-start $CONFLUENT_HOME/etc/kafka/kraft/broker.properties
broker-1.properties (node 1)
In a new command window dedicated to running node 1, change directories into
$CONFLUENT_HOME
to run the KRaft setup commands and start broker-1.cd $CONFLUENT_HOME
Make sure that the KAFKA_CLUSTER_ID you generated for the controller is available in this shell as an environment variable.
(Optional Example) For example, if you added the value for KAFKA_CLUSTER_ID to your
.bash_profile
:source ~./bash_profile
echo $KAFKA_CLUSTER_ID
Format the log directories for broker-1:
./bin/kafka-storage format -t $KAFKA_CLUSTER_ID -c $CONFLUENT_HOME/etc/kafka/kraft/broker-1.properties --ignore-formatted
Start the broker:
kafka-server-start $CONFLUENT_HOME/etc/kafka/kraft/broker-1.properties
broker-2.properties (node 2)
In a new command window dedicated to running node 2, change directories into
$CONFLUENT_HOME
to run the KRaft setup commands and start broker-2.cd $CONFLUENT_HOME
Make sure that the KAFKA_CLUSTER_ID you generated for the controller is available in this shell as an environment variable.
(Optional Example) For example, if you added the value for KAFKA_CLUSTER_ID to your
.bash_profile
:source ~./bash_profile
echo $KAFKA_CLUSTER_ID
Format the log directories for this broker-2:
./bin/kafka-storage format -t $KAFKA_CLUSTER_ID -c $CONFLUENT_HOME/etc/kafka/kraft/broker-2.properties --ignore-formatted
Start the broker:
kafka-server-start $CONFLUENT_HOME/etc/kafka/kraft/broker-2.properties
Start ZooKeeper in its own command window:
zookeeper-server-start $CONFLUENT_HOME/etc/kafka/zookeeper.properties
Start each of the brokers in separate command windows:
kafka-server-start $CONFLUENT_HOME/etc/kafka/server.properties
kafka-server-start $CONFLUENT_HOME/etc/kafka/server-1.properties
kafka-server-start $CONFLUENT_HOME/etc/kafka/server-2.properties
Start the other components¶
Start each of these components in separate windows.
Tip
For this example, it is not necessary to start all of these. At a minimum, you will need ZooKeeper and the brokers (already started), and Kafka REST. However, it is useful to have all components running if you are just getting started with the platform, and want to explore everything. This gives you a similar starting point as you get in Quick Start for Confluent Platform, and enables you to work through the examples in that Quick Start in addition to the Kafka command examples provided here.
Start Kafka REST
kafka-rest-start $CONFLUENT_HOME/etc/kafka-rest/kafka-rest.properties
(Optional) Start Kafka Connect
connect-distributed $CONFLUENT_HOME/etc/kafka/connect-distributed.properties
(Optional) Start ksqlDB
ksql-server-start $CONFLUENT_HOME/etc/ksqldb/ksql-server.properties
(Optional) Start Schema Registry
schema-registry-start $CONFLUENT_HOME/etc/schema-registry/schema-registry.properties
(Optional) Finally, start Control Center in a separate command window.
control-center-start $CONFLUENT_HOME/etc/confluent-control-center/control-center-dev.properties
(Optional) Explore Control Center¶
Confluent Control Center is a web-based tool for managing and monitoring Kafka in Confluent Platform. To view your cluster running locally in Control Center, open a browser and navigate to http://localhost:9021/.
- To learn about managing clusters with Confluent Control Center, see Manage Kafka Clusters Using Control Center for Confluent Platform
- To view brokers in Confluent Control Center, see Manage Kafka Brokers Using Control Center for Confluent Platform
- To manage topics in Confluent Control Center, see Manage Topics Using Control Center for Confluent Platform
Click either the Brokers card or Brokers on the menu to view broker metrics. From the brokers list at the bottom of the page, you can view detailed metrics and drill down on each broker.
Click Topics on the navigation menu.
Note that only system (internal) topics are available at this point because you haven’t created any topics of your own yet. The
default_ksql_processing_log
will show up as a topic if you configured and started ksqlDB.
There is a lot more to Confluent Control Center, but it is not the focus of this tutorial. To complete similar steps using Confluent Control Center, see the Quick Start for Confluent Platform.
Use the command line tools¶
After you have Confluent Platform running, the next step is to learn some basic Kafka command-line operations to create topics and work with producers and consumers. These provide a means of testing and working with basic functionality, as well as configuring and monitoring deployments.
A few things to note:
- Confluent Platform ships with Kafka tools and utilities in
$CONFLUENT_HOME/bin
. Thisbin/
directory includes both Confluent proprietary and open source Kafka utilities. A full list is provided in CLI Tools Shipped With Confluent Platform. Those in the list that begin withkafka-
are the Kafka open source command utilities. A reference for Confluent proprietary commands is provided in CLI Tools for Confluent Platform. - With Confluent Platform installed and running on your system, you can run Kafka commands from anywhere;
for example, from your
$HOME
(~/
) directory. You do not have to run these from within$CONFLUENT_HOME
. - Command line help is available by typing any of the commands with no arguments; for example,
kafka-topics
orkafka-producer-perf-test
.
To help get you started, the sections below provide examples for some of the most fundamental and widely-used Kafka scripts.
Create, list and describe topics¶
You can use kafka-topics
for operations on topics (create, list, describe,
alter, delete, and so forth).
In a command window, run the following commands to experiment with topics.
Create three topics,
cool-topic
,warm-topic
,hot-topic
.kafka-topics --create --topic cool-topic --bootstrap-server localhost:9092
kafka-topics --create --topic warm-topic --bootstrap-server localhost:9092
kafka-topics --create --topic hot-topic --partitions 2 --replication-factor 2 --bootstrap-server localhost:9092
List all topics.
kafka-topics --list --bootstrap-server localhost:9092
Tip
Internal system topics are prefaced by an underscore in the output. The topics you created are listed at the end.
Describe a topic.
This shows partitions, replication factor, and in-sync replicas for the topic.
kafka-topics --describe --topic cool-topic --bootstrap-server localhost:9092
Your output should resemble the following:
Topic: cool-topic PartitionCount: 1 ReplicationFactor: 1 Configs: segment.bytes=1073741824 Topic: cool-topic Partition: 0 Leader: 0 Replicas: 0 Isr: 0 Offline:
Tip
If you run
kafka-topics --describe
with no specified topic, you get a detailed description of every topic on the cluster (system and user topics).Describe another topic, using one of the other brokers in the cluster as the bootstrap server.
kafka-topics --describe --topic hot-topic --bootstrap-server localhost:9094
Here is that example output:
Topic: hot-topic PartitionCount: 2 ReplicationFactor: 2 Configs: segment.bytes=1073741824 Topic: hot-topic Partition: 0 Leader: 1 Replicas: 1,0 Isr: 1,0 Offline: Topic: hot-topic Partition: 1 Leader: 0 Replicas: 0,2 Isr: 0,2 Offline:
You can connect to any of the brokers in the cluster to run these commands because they all have the same data.
Alter a topic’s configuration.
For this example, change the partition count on hot-topic from
2
to9
.kafka-topics --alter --topic hot-topic --partitions 9 --bootstrap-server localhost:9092
Tip
Dynamic topic modification is limited by the current configurations. For example, you cannot decrease the number of partitions or modify the replication factor for a topic, as that would require partition reassignment.
Rerun
--describe
on the same topic.kafka-topics --describe --topic hot-topic --bootstrap-server localhost:9092
Here is that example output, and verify that the partition count is updated to
9
:Topic: hot-topic PartitionCount: 9 ReplicationFactor: 2 Configs: segment.bytes=1073741824 Topic: hot-topic Partition: 0 Leader: 2 Replicas: 2,1 Isr: 2,1 Offline: Topic: hot-topic Partition: 1 Leader: 1 Replicas: 1,0 Isr: 1,0 Offline: Topic: hot-topic Partition: 2 Leader: 1 Replicas: 1,2 Isr: 1,2 Offline: Topic: hot-topic Partition: 3 Leader: 2 Replicas: 2,1 Isr: 2,1 Offline: Topic: hot-topic Partition: 4 Leader: 0 Replicas: 0,2 Isr: 0,2 Offline: Topic: hot-topic Partition: 5 Leader: 1 Replicas: 1,0 Isr: 1,0 Offline: Topic: hot-topic Partition: 6 Leader: 2 Replicas: 2,0 Isr: 2,0 Offline: Topic: hot-topic Partition: 7 Leader: 0 Replicas: 0,1 Isr: 0,1 Offline: Topic: hot-topic Partition: 8 Leader: 1 Replicas: 1,2 Isr: 1,2 Offline:
Delete a topic.
kafka-topics --delete --topic warm-topic --bootstrap-server localhost:9092
List all topics.
kafka-topics --list --bootstrap-server localhost:9092
Run producers and consumers to send and read messages¶
The command utilities kafka-console-producer
and kafka-console-consumer
allow you to manually produce messages to and consume from a topic.
Open two new command windows, one for a producer, and the other for a consumer.
Run a producer to produce to
cool-topic
.kafka-console-producer --topic cool-topic --bootstrap-server localhost:9092
Send some messages.
Type your messages at the prompt (
>
), and hit Return after each one.Your command window will resemble the following:
$ kafka-console-producer --bootstrap-server localhost:9092 --topic cool-topic >hi cool topic >did you get this message? >first >second >third >yes! I love you cool topic >
Tip
You can use the
--broker-list
flag in place of--bootstrap-server
for the producer, typically used to send data to specific brokers; shown here as an example.In the other command window, run a consumer to read messages from
cool-topic
. Specify that you want to start consuming from the beginning, as shown.kafka-console-consumer --topic cool-topic --from-beginning --bootstrap-server localhost:9092
Your output will resemble the following:
$ kafka-console-consumer --bootstrap-server localhost:9092 --from-beginning --topic cool-topic hi cool topic on origin cluster is this getting to your replica? first second third yes! I love you cool topic
When you want to stop the producer and consumer, type Ctl-C in their respective command windows. However, you may want to leave at least the producer running for now, to view the topics with Control Center, which is described in (Optional) Revisit Control Center section.
Produce auto-generated message data to topics¶
You can use kafka-producer-perf-test
in its own command window to generate test data to topics.
For example, open a new command window and type the following command to send data to
hot-topic
, with the specified throughput and record size.kafka-producer-perf-test \ --producer-props bootstrap.servers=localhost:9092 \ --topic hot-topic \ --record-size 1000 \ --throughput 1000 \ --num-records 3600000
The command provides status output on messages sent, as shown:
4999 records sent, 999.8 records/sec (0.95 MB/sec), 1.1 ms avg latency, 240.0 ms max latency. 5003 records sent, 1000.2 records/sec (0.95 MB/sec), 0.5 ms avg latency, 4.0 ms max latency. 5003 records sent, 1000.2 records/sec (0.95 MB/sec), 0.6 ms avg latency, 5.0 ms max latency. 5001 records sent, 1000.2 records/sec (0.95 MB/sec), 0.3 ms avg latency, 3.0 ms max latency. 5001 records sent, 1000.0 records/sec (0.95 MB/sec), 0.3 ms avg latency, 4.0 ms max latency. 5000 records sent, 1000.0 records/sec (0.95 MB/sec), 0.8 ms avg latency, 24.0 ms max latency. 5001 records sent, 1000.2 records/sec (0.95 MB/sec), 0.6 ms avg latency, 3.0 ms max latency. ...
Open a new command window to consume the messages from hot-topic as they are sent (not from the beginning).
kafka-console-consumer --topic hot-topic --bootstrap-server localhost:9092
Type Ctl-C to stop the consumer.
Tip
You may want to leave the producer running for a moment, as you are about to revisit Topics on the Control Center.
To learn more, check out Benchmark Commands, Let’s Load test, Kafka!, and How to do Performance testing of Kafka Cluster
(Optional) Revisit Control Center¶
Now that you have created some topics and produced message data to a topic (both manually and with auto-generated), you can optionally inspect topics using Control Center.
Open a web browser and go to http://localhost:9021/, the default URL for Control Center on a local system.
Select the cluster, and click Topics from the menu.
Choose
cool-topic
, then select the Messages tab.Select Jump to offset and type
1
,2
, or3
to display previous messages.These messages do not show in the order they were sent because the consumer here is not reading
--from-beginning
.Try manually typing some more messages to
cool-topic
with your command line producer, and watch them show up here.Navigate to Topics >
hot-topic
> Messages tab.Auto-generated messages from your
kafka-producer-perf-test
are shown here as they arrive.
Shutdown and cleanup tasks¶
Run the following shutdown and cleanup tasks.
Stop the
kafka-producer-perf-test
with Ctl-C in its respective command window.Stop the all of the other components with Ctl-C in their respective command windows, in reverse order in which you started them. For example, stop Control Center first, then other components, followed by Kafka brokers, and finally ZooKeeper.
Remove log files from
/tmp
. For example, if you were running in KRaft mode:ls /tmp
rm -rf /tmp/kraft-broker-logs*.*
rm -rf /tmp/kraft-controller-logs
Run multiple clusters¶
Another option to experiment with is a multi-cluster deployment. This is relevant for trying out features like Replicator, Cluster Linking, and multi-cluster Schema Registry, where you want to share or replicate topic data across two clusters, often modeled as the origin and the destination cluster.
These configurations can be used for data sharing across data centers and regions and are often modeled as source and destination clusters. An example configuration for cluster linking is shown in the diagram below. (A full guide to this setup is available in the Tutorial: Share Data Across Topics Using Cluster Linking for Confluent Platform.)
Multi-cluster configurations are described in context under the relevant use cases. Since these configurations will vary depending on what you want to accomplish, the best way to test out multi-cluster is to choose a use case, and follow the feature-specific tutorial. The specifics of these configurations vary depending on whether you are using KRaft in combined or isolated mode, or ZooKeeper.
- Tutorial: Share Data Across Topics Using Cluster Linking for Confluent Platform (requires Confluent Platform 6.0.0 or newer, recommended as the best getting started example)
- Tutorial: Replicate Data Across Kafka Clusters in Confluent Platform
- Enabling Multi-Cluster Schema Registry
Code examples and demo apps¶
Following are links to examples of Confluent Platform distributed applications that uses Kafka topics, along with producers, and consumers that subscribe to those topics, in an event subscription model. The idea is to complete the picture of how Kafka and Confluent Platform can be used to accomplish a task or provide a service.