Tutorial: Add and Remove Brokers with Self-Balancing in Confluent Platform¶
This example describes how to set up a local, multi-broker cluster running with proper configurations for replication factors, and test Self-Balancing by removing a broker and monitoring the rebalance.
Following broker removal, you add a broker to the cluster and monitor the redistribution of data to fill the empty broker.
Most tasks are performed on the command line but Confluent Control Center is used to verify where the controller is running, and as an additional view into the progress of the rebalance.
You can use this same workflow to set up a cluster on your cloud provider.
Simply set the host:port
names to represent your cloud nodes as appropriate.
Ready to get started?
- Sign up for Confluent Cloud, the fully managed cloud-native service for Apache Kafka® and get started for free using the Cloud quick start.
- Download Confluent Platform, the self managed, enterprise-grade distribution of Apache Kafka and get started using the Confluent Platform quick start.
KRaft and ZooKeeper¶
Important
As of Confluent Platform 7.5, ZooKeeper is deprecated for new deployments. Confluent recommends KRaft mode for new deployments. To learn more about running Kafka in KRaft mode, see KRaft Overview for Confluent Platform, the KRaft steps in the Platform Quick Start, and Settings for other components.
This tutorial provides examples for both KRaft mode and ZooKeeper mode.
For KRaft, the examples show an isolated mode configuration for a multi-broker cluster managed by a single controller. This maps to the deprecated ZooKeeper configuration, which uses one ZooKeeper and multiple brokers in a single cluster. To learn more about KRaft, see KRaft Overview for Confluent Platform and Kraft mode under Configure Confluent Platform for production.
In addition to some other differences noted in the steps below, note that:
- For KRaft mode, you will use
$CONFLUENT_HOME/etc/kafka/kraft/broker.properties
and$CONFLUENT_HOME/etc/kafka/kraft/controller.properties
. - For ZooKeeper mode, you will use
$CONFLUENT_HOME/etc/kafka/server.properties
and$CONFLUENT_HOME/etc/kafka/zookeeper.properties
.
Prerequisites¶
Before proceeding with the example, verify that you have the following installed on your local machine:
- Confluent Platform 6.0.0 or later
- Confluent CLI
- Java 8, 11, or 17 (recommended) to run Confluent Platform (For details on Java requirements, see Java in System Requirements for Confluent Platform.)
Configure Kafka brokers¶
This example demos a cluster with five brokers. These steps guide you through two passes on the broker properties files, first to configure some basics in the default properties file that will apply to all brokers, then to create four additional properties files based on the original and set up a multi-broker cluster. When you have completed the setup, you will have a total of five properties files, one per broker.
Tip
You can use this same model for a cluster with more or fewer brokers. Using more than 3 brokers simplifies configuration for a scenario where you will remove a broker, since many of the Confluent Platform internal topics default to a replication factor of 3.
In $CONFLUENT_HOME/etc/kafka/kraft/broker.properties
(KRaft) or $CONFLUENT_HOME/etc/kafka/server.properties
(ZooKeeper mode), make the following changes, and save the file.
Enable the Metrics Reporter for Control Center¶
When the Monitor Kafka with Metrics Reporter in Confluent Platform is enabled, it populates the Brokers Overview page in Control Center with metrics on all brokers, along with a clickable list where you can drill down on each individual broker to get a detail view of statistics and an option to remove a broker. If this setting is not enabled, Control Center will not display broker metrics or management options.
When uncommented, the following lines allow the brokers to send metrics to Control Center, and specify that the metrics cluster has a single broker. This configuration can apply to all brokers in the cluster. There is no need to change the port numbers for for the Metrics Reporter to match the listener port for each broker (although that configuration also works).
metric.reporters=io.confluent.metrics.reporter.ConfluentMetricsReporter
confluent.metrics.reporter.bootstrap.servers=localhost:9092
confluent.metrics.reporter.topic.replicas=1
Run these commands to update Metrics Reporter configurations in KRaft mode.
sed -i '' -e "s/#metric.reporters=/metric.reporters=/g" $CONFLUENT_HOME/etc/kafka/kraft/broker.properties
sed -i '' -e "s/#confluent.metrics.reporter/confluent.metrics.reporter/g" $CONFLUENT_HOME/etc/kafka/kraft/broker.properties
Run these commands to update Metrics Reporter configurations in ZooKeeper mode.
sed -i '' -e "s/#metric.reporters=/metric.reporters=/g" $CONFLUENT_HOME/etc/kafka/server.properties
sed -i '' -e "s/#confluent.metrics.reporter/confluent.metrics.reporter/g" $CONFLUENT_HOME/etc/kafka/server.properties
Configure replication factors for Self-Balancing¶
For Self-Balancing to work if you delete a broker, replication factors must be less than
the number of brokers in the cluster, but enough that replication can take
place. For example, you can’t have a topic with a replication factor of 1
because if the topic is on a broker that gets deleted there won’t be a replica
to use for the rebalancing. You will get an error when you try to delete a broker (Error while executing broker removal
).
The following steps show you how to reset replication factors and replicas to 2
, and uncomment the properties if needed so that your changes go into effect.
When you complete these steps, your file should show the following configs:
offsets.topic.replication.factor=2
transaction.state.log.replication.factor=2
confluent.license.topic.replication.factor=2
confluent.metadata.topic.replication.factor=2
confluent.balancer.topic.replication.factor=2
This tutorial does not demonstrate running Self-Balancing Clusters with security protocols enabled. If you want to run Self-Balancing in secure mode, see Security considerations.
Update replication factors and replicas for the broker.
Run these commands to update replication configurations in KRaft mode.
sed -i '' -e "s/replication.factor=1/replication.factor=2/g" $CONFLUENT_HOME/etc/kafka/kraft/broker.properties
sed -i '' -e "s/#confluent.metrics.reporter.topic.replicas=1/confluent.metrics.reporter.topic.replicas=1/g" $CONFLUENT_HOME/etc/kafka/kraft/broker.properties
Run these commands to update replication configurations in ZooKeeper mode.
sed -i '' -e "s/replication.factor=1/replication.factor=2/g" $CONFLUENT_HOME/etc/kafka/server.properties
sed -i '' -e "s/#confluent.metrics.reporter.topic.replicas=1/confluent.metrics.reporter.topic.replicas=1/g" $CONFLUENT_HOME/etc/kafka/server.properties
(Optional) If you want to run Connect, change replication factors in that properties file also. Search
$CONFLUENT_HOME/etc/kafka/connect-distributed.properties
for all instances ofreplication.factor
and set the values for these to a number that is less than the number of brokers but greater than 1. For this cluster, set allreplication.factor
’s to 2:offset.storage.replication.factor=2
config.storage.replication.factor=2
status.storage.replication.factor=2
Run this command to update replication configurations for Connect as shown above.
sed -i '' -e "s/replication.factor=1/replication.factor=2/g" $CONFLUENT_HOME/etc/kafka/connect-distributed.properties
Tip
- When you create your topics, make sure that they also have the needed replication factor, depending on the number of brokers. This is described in subsequent sections.
- Limiting replicas and replication factors to 2 provides the option to shrink the cluster to as few as 2 brokers and expand back up to 5.
Verify that Self-Balancing is enabled¶
Verify that Self-Balancing is enabled. The value for this property must be specified as true
and the line must be uncommented:
confluent.balancer.enable=true
If confluent.balancer.enable is not explicitly specified or if this line is commented out, it will default to false
(off).
Run this command to enable Self-Balancing in KRaft mode.
sed -i '' -e "s/#confluent.balancer.enable=true/confluent.balancer.enable=true/g" $CONFLUENT_HOME/etc/kafka/kraft/broker.properties
Run this command to enable Self-Balancing in ZooKeeper mode.
sed -i '' -e "s/#confluent.balancer.enable=true/confluent.balancer.enable=true/g" $CONFLUENT_HOME/etc/kafka/broker.properties
Save the file¶
You now have an updated version of broker.properties
(KRaft mode) or server.properties
(ZooKeeper mode) to use as the basis for your brokers.
If you updated the file using the copy-paste commands, you are ready to go. If you updated the file, manually, save it.
Create a basic configuration for a five-broker cluster¶
You will start with the broker/server properties file you updated for Metrics Reporter, replication factors, and Self-Balancing in the previous steps, then copy it and modify the configurations as shown below, renaming the new files to represent the other four brokers.
Configuration snapshot preview¶
The table below shows a summary of the configurations you will specify for each of these files, as a reference to check against if needed. The steps in the next sections guide you through a quick way to set up these files,
using existing the existing broker.properties
file (KRaft) or server.properties
file (ZooKeeper) as a basis for your specialized ones. To get started, skip to the next section: Configure the servers.
File | Configurations |
---|---|
controller.properties | You will update the values for these basic properties to make them unique for the controller:
|
broker.properties | You will update the values for these basic properties to make them unique:
Add the following listener configuration to specify the REST endpoint for this broker:
|
broker-1.properties | You will update the values for these basic properties to make them unique:
Provide the listener configuration to specify the REST endpoint unique to this broker:
|
broker-2.properties | You will update the values for these basic properties to make them unique:
Provide the listener configuration to specify the REST endpoint unique to this broker:
|
broker-3.properties | You will update the values for these basic properties to make them unique:
Provide the listener configuration to specify the REST endpoint unique to this broker:
|
broker-4.properties | You will update the values for these basic properties to make them unique:
Provide the listener configuration to specify the REST endpoint unique to this broker:
|
File | Configurations |
---|---|
zookeeper.properties | ZooKeeper keeps all defaults, no changes will be needed:
|
server.properties | With replication factors properly set in the previous step, no further changes are needed for this file. Use the defaults for these basics:
Add the following listener configuration to specify the REST endpoint for this broker:
|
server-1.properties | You will update the values for these basic properties to make them unique. (Be sure to uncomment
Provide the listener configuration to specify the REST endpoint unique to this broker:
|
server-2.properties | You will update the values for these basic properties to make them unique. (Be sure to uncomment
Provide the listener configuration to specify the REST endpoint unique to this broker:
|
server-3.properties | You will update the values for these basic properties to make them unique. (Be sure to uncomment
Provide the listener configuration to specify the REST endpoint unique to this broker:
|
server-4.properties | You will update the values for these basic properties to make them unique. (Be sure to uncomment
Provide the listener configuration to specify the REST endpoint unique to this broker:
|
Configure the servers¶
Start with the broker.properties
file you updated in the previous sections with regard to replication factors and enabling Self-Balancing Clusters.
You will make a few more changes to this file, then use it as the basis for the other servers.
Update the node ID, controller quorum voters and port for the first broker, and then add the REST endpoint listener configuration for this broker at the end of the file:
sed -i '' -e "s/node.id=2/node.id=0/g" $CONFLUENT_HOME/etc/kafka/kraft/broker.properties
sed -i '' -e "s/1@localhost:9093/5@localhost:9097/g" $CONFLUENT_HOME/etc/kafka/kraft/broker.properties
echo "confluent.http.server.listeners=http://localhost:8090" >> $CONFLUENT_HOME/etc/kafka/kraft/broker.properties
Copy the properties file for the first broker to use as a basis for the other four:
cp $CONFLUENT_HOME/etc/kafka/kraft/broker.properties $CONFLUENT_HOME/etc/kafka/kraft/broker-1.properties
cp $CONFLUENT_HOME/etc/kafka/kraft/broker.properties $CONFLUENT_HOME/etc/kafka/kraft/broker-2.properties
cp $CONFLUENT_HOME/etc/kafka/kraft/broker.properties $CONFLUENT_HOME/etc/kafka/kraft/broker-3.properties
cp $CONFLUENT_HOME/etc/kafka/kraft/broker.properties $CONFLUENT_HOME/etc/kafka/kraft/broker-4.properties
Update the node ID, listener, and data directories for broker-1, and then update the REST endpoint listener for this broker:
sed -i '' -e "s/node.id=0/node.id=1/g" $CONFLUENT_HOME/etc/kafka/kraft/broker-1.properties
sed -i '' -e "s/9092/9093/g" $CONFLUENT_HOME/etc/kafka/kraft/broker-1.properties
sed -i '' -e "s/kraft-broker-logs/kraft-broker-logs-1/g" $CONFLUENT_HOME/etc/kafka/kraft/broker-1.properties
sed -i '' -e "s/8090/8091/g" $CONFLUENT_HOME/etc/kafka/kraft/broker-1.properties
Update the node ID, listener, controller, and data directories for broker-2, and then update the REST endpoint listener for this broker:
sed -i '' -e "s/node.id=0/node.id=2/g" $CONFLUENT_HOME/etc/kafka/kraft/broker-2.properties
sed -i '' -e "s/9092/9094/g" $CONFLUENT_HOME/etc/kafka/kraft/broker-2.properties
sed -i '' -e "s/kraft-broker-logs/kraft-broker-logs-2/g" $CONFLUENT_HOME/etc/kafka/kraft/broker-2.properties
sed -i '' -e "s/8090/8092/g" $CONFLUENT_HOME/etc/kafka/kraft/broker-2.properties
Update the node ID, listener, controller, and data directories for broker-3, and then update the REST endpoint listener for this broker:
sed -i '' -e "s/node.id=0/node.id=3/g" $CONFLUENT_HOME/etc/kafka/kraft/broker-3.properties
sed -i '' -e "s/9092/9095/g" $CONFLUENT_HOME/etc/kafka/kraft/broker-3.properties
sed -i '' -e "s/kraft-broker-logs/kraft-broker-logs-3/g" $CONFLUENT_HOME/etc/kafka/kraft/broker-3.properties
sed -i '' -e "s/8090/8093/g" $CONFLUENT_HOME/etc/kafka/kraft/broker-3.properties
Update the node ID, listener, controller, and data directories for broker-4, and then update the REST endpoint listener for this broker.
sed -i '' -e "s/node.id=0/node.id=4/g" $CONFLUENT_HOME/etc/kafka/kraft/broker-4.properties
sed -i '' -e "s/9092/9096/g" $CONFLUENT_HOME/etc/kafka/kraft/broker-4.properties
sed -i '' -e "s/kraft-broker-logs/kraft-broker-logs-4/g" $CONFLUENT_HOME/etc/kafka/kraft/broker-4.properties
sed -i '' -e "s/8090/8094/g" $CONFLUENT_HOME/etc/kafka/kraft/broker-4.properties
Finally, update the controller node ID, quorum voters, and port:
sed -i '' -e "s/node.id=1/node.id=5/g" $CONFLUENT_HOME/etc/kafka/kraft/controller.properties
sed -i '' -e "s/9093/9097/g" $CONFLUENT_HOME/etc/kafka/kraft/controller.properties
sed -i '' -e "s/1@localhost/5@localhost/g" $CONFLUENT_HOME/etc/kafka/kraft/controller.properties
Start with the server.properties
file you updated in the previous sections with regard to replication factors and enabling Self-Balancing.
You will make a few more changes to this file, then use it as the basis for the other servers.
Uncomment the listener, and then add the REST endpoint listener configuration at the end of the file:
sed -i '' -e "s/#listeners=/listeners=/g" $CONFLUENT_HOME/etc/kafka/server.properties
echo "confluent.http.server.listeners=http://localhost:8090" >> $CONFLUENT_HOME/etc/kafka/server.properties
Copy the properties file for the first server to use as a basis for the other four servers. This is the file you updated in the previous sections with regard to replication factors and enabling Self-Balancing.
cp $CONFLUENT_HOME/etc/kafka/server.properties $CONFLUENT_HOME/etc/kafka/server-1.properties
cp $CONFLUENT_HOME/etc/kafka/server.properties $CONFLUENT_HOME/etc/kafka/server-2.properties
cp $CONFLUENT_HOME/etc/kafka/server.properties $CONFLUENT_HOME/etc/kafka/server-3.properties
cp $CONFLUENT_HOME/etc/kafka/server.properties $CONFLUENT_HOME/etc/kafka/server-4.properties
Update the broker ID, listener, and data directories for server-1, and then update the REST endpoint listener for this broker:
sed -i '' -e "s/broker.id=0/broker.id=1/g" $CONFLUENT_HOME/etc/kafka/server-1.properties
sed -i '' -e "s/9092/9093/g" $CONFLUENT_HOME/etc/kafka/server-1.properties
sed -i '' -e "s/kafka-logs/kafka-logs-1/g" $CONFLUENT_HOME/etc/kafka/server-1.properties
sed -i '' -e "s/8090/8091/g" $CONFLUENT_HOME/etc/kafka/server-1.properties
Update the broker ID, listener, and data directories for server-2, and then update the REST endpoint listener for this broker:
sed -i '' -e "s/broker.id=0/broker.id=2/g" $CONFLUENT_HOME/etc/kafka/server-2.properties
sed -i '' -e "s/9092/9094/g" $CONFLUENT_HOME/etc/kafka/server-2.properties
sed -i '' -e "s/kafka-logs/kafka-logs-2/g" $CONFLUENT_HOME/etc/kafka/server-2.properties
sed -i '' -e "s/8090/8092/g" $CONFLUENT_HOME/etc/kafka/server-2.properties
Update the broker ID, listener, and data directories for server-3, and then update the REST endpoint listener for this broker:
sed -i '' -e "s/broker.id=0/broker.id=3/g" $CONFLUENT_HOME/etc/kafka/server-3.properties
sed -i '' -e "s/9092/9095/g" $CONFLUENT_HOME/etc/kafka/server-3.properties
sed -i '' -e "s/kafka-logs/kafka-logs-3/g" $CONFLUENT_HOME/etc/kafka/server-3.properties
sed -i '' -e "s/8090/8093/g" $CONFLUENT_HOME/etc/kafka/server-3.properties
Update the broker ID, listener, and data directories for server-4, and then update the REST endpoint listener for this broker:
sed -i '' -e "s/broker.id=0/broker.id=4/g" $CONFLUENT_HOME/etc/kafka/server-4.properties
sed -i '' -e "s/9092/9096/g" $CONFLUENT_HOME/etc/kafka/server-4.properties
sed -i '' -e "s/kafka-logs/kafka-logs-4/g" $CONFLUENT_HOME/etc/kafka/server-4.properties
sed -i '' -e "s/8090/8094/g" $CONFLUENT_HOME/etc/kafka/server-4.properties
When you have completed this step, you will have five properties files that match the configurations shown in the Configuration snapshot preview:
broker.properties
(KRaft) orserver.properties
(ZooKeeper) which corresponds to node/broker 0broker-1.properties
(KRaft) orserver-1.properties
(ZooKeeper) which corresponds to node/broker 1broker-2.properties
(KRaft) orserver-2.properties
(ZooKeeper) which corresponds to node/broker 2broker-3.properties
(KRaft) orserver-3.properties
(ZooKeeper) which corresponds to node/broker 3broker-4.properties
(KRaft) orserver-4.properties
(ZooKeeper) which corresponds to node/broker 4
Run this command to list the files in KRaft mode:
ls $CONFLUENT_HOME/etc/kafka/kraft/
Run this command to list the files in ZooKeeper mode:
ls $CONFLUENT_HOME/etc/kafka/server*
Configure Control Center with REST endpoints and advertised listeners¶
Control Center will be useful for verifying your starting configuration and monitoring the progress of the rebalance, in addition to command line output. You must tell Control Center about the REST endpoints for all brokers in your cluster, and the advertised listeners for the other components you may want to run. Without these configurations, the brokers and components will not show up on Control Center.
Make the following changes to $CONFLUENT_HOME/etc/confluent-control-center/control-center-dev.properties
and save the file.
Open the file in an editor; for example, in
vi
:vi $CONFLUENT_HOME/etc/confluent-control-center/control-center-dev.properties
Configure REST endpoints for the brokers.
In
$CONFLUENT_HOME/etc/confluent-control-center/control-center-dev.properties
, replace the default value for the Kafka REST endpoint URL by a copy-paste of the following lines to match your multi-broker configuration:# Kafka REST endpoint URL confluent.controlcenter.streams.cprest.url=http://localhost:8090,http://localhost:8091,http://localhost:8092,http://localhost:8093,http://localhost:8094
Replace the configurations for Kafka Connect, ksqlDB, and Schema Registry to provide Control Center with the default advertised URLs to for the component clusters. You can delete the original configs and copy-paste the following into the file.
# A comma separated list of Connect host names confluent.controlcenter.connect.cluster=http://localhost:8083 # KSQL cluster URL confluent.controlcenter.ksql.ksqlDB.url=http://localhost:8088 # Schema Registry cluster URL confluent.controlcenter.schema.registry.url=http://localhost:8081
Save the file.
Tip
- Self-Balancing doesn’t need embedded REST Proxy, but Control Center depends on it to manage Self-Balancing. To learn more, Required Configurations for Control Center in Self-Balancing Configuration Options
and
confluent.controlcenter.streams.cprest.url
in the Control Center Configuration Reference. - If you are running Self-Balancing with role-based access control (RBAC) or other security protocols, additional configuration is needed in the Kafka brokers beyond what this tutorial shows. To learn more, see Security considerations.
Start Confluent Platform, create topics, and generate test data¶
Self-Balancing Clusters can take up to 30 minutes to initialize and collect metrics. Attempting to remove brokers during this startup process will result in an error due to insufficient metrics. Therefore, once you have started all the components, allow some time for Self-Balancing to initialize before beginning the tutorial steps below. To learn more, see Broker removal attempt fails during Self-Balancing initialization in the Troubleshooting section.
Follow these steps to start the servers in separate command windows, create topics, and generate data to the topics.
Start the controller and brokers¶
In KRaft mode, you must run the following commands from `$CONFLUENT_HOME
to generate a random cluster ID,
and format log directories for the controller and each broker in dedicated command windows. You will then start the controller and brokers
from those same dedicated windows.
The kafka-storage
command is run only once per broker/controller. You cannot use the kafka-storage
command to update an existing cluster.
If you make a mistake in configurations at that point, you must recreate the directories from scratch, and work through the steps again.
Controller
In a new dedicated command window, change directories into
$CONFLUENT_HOME
to run the KRaft setup commands and start the controller.cd $CONFLUENT_HOME
Generate a
random-uuid
for the cluster using the kafka-storage tool.KAFKA_CLUSTER_ID="$(bin/kafka-storage random-uuid)"
Get the value for KAFKA_CLUSTER_ID and add it to your
.bash_profile
,.bashrc
,.zsh
or similar so that it is available to you in new command windows for running the brokers. You will use this same cluster ID for all brokers.echo $KAFKA_CLUSTER_ID
Format the log directories for the controller:
./bin/kafka-storage format -t $KAFKA_CLUSTER_ID -c $CONFLUENT_HOME/etc/kafka/kraft/controller.properties --ignore-formatted
Start the controller:
kafka-server-start $CONFLUENT_HOME/etc/kafka/kraft/controller.properties
broker.properties (node 0)
In a new command window dedicated to running node 0, change directories into
$CONFLUENT_HOME
to run the KRaft setup commands and start your first broker.cd $CONFLUENT_HOME
Make sure that the KAFKA_CLUSTER_ID you generated for the controller is available in this shell as an environment variable.
(Optional Example) For example, if you added the value for KAFKA_CLUSTER_ID to your
.bash_profile
:source ~./bash_profile
echo $KAFKA_CLUSTER_ID
Format the log directories for this broker:
./bin/kafka-storage format -t $KAFKA_CLUSTER_ID -c $CONFLUENT_HOME/etc/kafka/kraft/broker.properties --ignore-formatted
Start the broker:
kafka-server-start $CONFLUENT_HOME/etc/kafka/kraft/broker.properties
broker-1.properties (node 1)
In a new command window dedicated to running node 1, change directories into
$CONFLUENT_HOME
to run the KRaft setup commands and start broker-1.cd $CONFLUENT_HOME
Make sure that the KAFKA_CLUSTER_ID you generated for the controller is available in this shell as an environment variable.
(Optional Example) For example, if you added the value for KAFKA_CLUSTER_ID to your
.bash_profile
:source ~./bash_profile
echo $KAFKA_CLUSTER_ID
Format the log directories for broker-1:
./bin/kafka-storage format -t $KAFKA_CLUSTER_ID -c $CONFLUENT_HOME/etc/kafka/kraft/broker-1.properties --ignore-formatted
Start the broker:
kafka-server-start $CONFLUENT_HOME/etc/kafka/kraft/broker-1.properties
broker-2.properties (node 2)
In a new command window dedicated to running node 2, change directories into
$CONFLUENT_HOME
to run the KRaft setup commands and start broker-2.cd $CONFLUENT_HOME
Make sure that the KAFKA_CLUSTER_ID you generated for the controller is available in this shell as an environment variable.
(Optional Example) For example, if you added the value for KAFKA_CLUSTER_ID to your
.bash_profile
:source ~./bash_profile
echo $KAFKA_CLUSTER_ID
Format the log directories for this broker-2:
./bin/kafka-storage format -t $KAFKA_CLUSTER_ID -c $CONFLUENT_HOME/etc/kafka/kraft/broker-2.properties --ignore-formatted
Start the broker:
kafka-server-start $CONFLUENT_HOME/etc/kafka/kraft/broker-2.properties
broker-3.properties (node 3)
In a new command window dedicated to running node 3, change directories into
$CONFLUENT_HOME
to run the KRaft setup commands and start broker-3.cd $CONFLUENT_HOME
Make sure that the KAFKA_CLUSTER_ID you generated for the controller is available in this shell as an environment variable.
(Optional Example) For example, if you added the value for KAFKA_CLUSTER_ID to your
.bash_profile
:source ~./bash_profile
echo $KAFKA_CLUSTER_ID
Format the log directories for this broker-3:
./bin/kafka-storage format -t $KAFKA_CLUSTER_ID -c $CONFLUENT_HOME/etc/kafka/kraft/broker-3.properties --ignore-formatted
Start the broker:
kafka-server-start $CONFLUENT_HOME/etc/kafka/kraft/broker-3.properties
broker-4.properties (node 4)
In a new command window dedicated to running node 4, change directories into
$CONFLUENT_HOME
to run the following KRaft setup commands for broker-4.cd $CONFLUENT_HOME
Make sure that the KAFKA_CLUSTER_ID you generated for the controller is available in this shell as an environment variable.
(Optional Example) For example, if you added the value for KAFKA_CLUSTER_ID to your
.bash_profile
:source ~./bash_profile
echo $KAFKA_CLUSTER_ID
Format the log directories for this broker-4:
./bin/kafka-storage format -t $KAFKA_CLUSTER_ID -c $CONFLUENT_HOME/etc/kafka/kraft/broker-4.properties --ignore-formatted
Start the broker:
kafka-server-start $CONFLUENT_HOME/etc/kafka/kraft/broker-4.properties
Start ZooKeeper in its own command window:
zookeeper-server-start $CONFLUENT_HOME/etc/kafka/zookeeper.properties
Start each of the brokers in separate command windows:
kafka-server-start $CONFLUENT_HOME/etc/kafka/server.properties
kafka-server-start $CONFLUENT_HOME/etc/kafka/server-1.properties
kafka-server-start $CONFLUENT_HOME/etc/kafka/server-2.properties
kafka-server-start $CONFLUENT_HOME/etc/kafka/server-3.properties
kafka-server-start $CONFLUENT_HOME/etc/kafka/server-4.properties
Start the other components¶
Start these other Confluent Platform components in separate windows.
-
kafka-rest-start $CONFLUENT_HOME/etc/kafka-rest/kafka-rest.properties
-
control-center-start $CONFLUENT_HOME/etc/confluent-control-center/control-center-dev.properties
-
(Optional) Configure and start these components.
If you want to run the following components, first edit their properties files to search and replace any
replication.factor
values to either 2 or 3 (to work with your five broker cluster). Ifreplication.factor
values are set to less than 2 or greater than 4, this will result in system topics with replication factors that prevent graceful broker removal with Self-Balancing, as shown in the next sections.(Optional) Kafka Connect
connect-distributed $CONFLUENT_HOME/etc/kafka/connect-distributed.properties
(Optional) ksqlDB
ksql-server-start $CONFLUENT_HOME/etc/ksqldb/ksql-server.properties
(Optional) Schema Registry overview
schema-registry-start $CONFLUENT_HOME/etc/schema-registry/schema-registry.properties
Create one or more topics with 3 partitions and a replication factor of 2.
kafka-topics --create --topic my-sbc-test --partitions 3 --replication-factor 2 --bootstrap-server localhost:9092
You should get a confirmation that the topic was successfully created. Also, you can get a list of existing topics as follows:
kafka-topics --list --bootstrap-server localhost:9092
Get detailed information on a particular topic with the
--describe
option:kafka-topics --describe --topic my-sbc-test --bootstrap-server localhost:9092
Or, get detailed information on all topics with the
--describe
option:kafka-topics --describe --bootstrap-server localhost:9092
Tip
- For Self-Balancing to work, topics must have a replication factor that maps properly to the number of brokers in the cluster. The replication factor must be greater than 1, but less than the total number of brokers. A five broker cluster requires a replication factor of between 2 and 4.
- Create topics with multiple partitions to distribute the data across all five brokers.
Generate data to topics.
In a separate command window, use the
kafka-producer-perf-test
command to produce data to the topicmy-sbc-test
.kafka-producer-perf-test \ --producer-props bootstrap.servers=localhost:9092 \ --topic my-sbc-test \ --record-size 1000 \ --throughput 1000 \ --num-records 3600000
If you created additional topics, you can use this command to send data to those topics, also.
At this point you are ready to monitor and test Self-Balancing by removing and then adding back in a broker, and monitoring the rebalance during these operations. The following sections provide two different ways of doing so (command line or Control Center). If you want to try both, you can proceed through these steps in order or use Confluent Control Center and the command line interchangeably, but the expectation is that you will prefer either the command line or Control Center as your primary method of working with Self-Balancing.
Use the command line¶
The following sections describe how to remove a broker and monitor the progress of the rebalance using the command kafka-remove-brokers. If you restart the broker (essentially, “adding” a broker), Self-Balancing redistributes the data again across all nodes.
Verify status of brokers and topic data¶
Use the command line to verify the current status of the deployment, including topics, topic data distribution, and number of brokers.
Use
kafka-broker-api-versions
andgrep
forid
to view the brokers online.kafka-broker-api-versions --bootstrap-server localhost:9092 | grep 'id: '
Your output should resemble:
localhost:9095 (id: 3 rack: null) -> ( localhost:9093 (id: 1 rack: null) -> ( localhost:9096 (id: 4 rack: null) -> ( localhost:9094 (id: 2 rack: null) -> ( localhost:9092 (id: 0 rack: null) -> (
Use
kafka-topics --describe
to view information about the test topic you created.kafka-topics --bootstrap-server localhost:9092 --topic my-sbc-test --describe
Your output should resemble:
Topic: my-sbc-test PartitionCount: 3 ReplicationFactor: 2 Configs: segment.bytes=1073741824 Topic: my-sbc-test Partition: 0 Leader: 0 Replicas: 0,4 Isr: 0,4 Offline: Topic: my-sbc-test Partition: 1 Leader: 4 Replicas: 4,1 Isr: 4,1 Offline: Topic: my-sbc-test Partition: 2 Leader: 1 Replicas: 1,2 Isr: 1,2 Offline:
Remove a broker¶
With Self-Balancing enabled, and Confluent Platform up and running, use kafka-remove-brokers to delete a broker and monitor the rebalancing.
Before you start on these steps, make sure that Confluent Platform and Self-Balancing have been running for at least 20 minutes to give Self-Balancing time to initialize. (To learn more, see Broker removal attempt fails during Self-Balancing initialization in Troubleshooting.)
Also, for this example, do not delete the controller which in this case is broker ID 0.
Important
- In practice, you can remove a lead broker. It may cause a short delay in cluster balancing, which is why we suggest not doing so for this example. To learn more, What happens if the lead broker (controller) is removed or lost?.
- If the broker you attempt to remove contains the only replica for a topic, the broker removal will fail. To learn more, see Limitations.
- Starting with Confluent Platform 7.3.0, Self-Balancing Clusters supports Apache Kafka® Raft Metadata mode (KRaft), see KRaft: Apache Kafka without ZooKeeper.
If you are using Self-balancing with Confluent for Kubernetes,
and using CFK to shut down the broker, there is no further action needed. However, if you are running Confluent Platform outside of CFK on a KRaft supported version of Confluent Platform,
and separately calling kafka-remove-brokers, you must take an additional action to call the
UnregisterBroker
API throughkafka-cluster.sh
after the broker removal is complete. If you call kafka-remove-brokers withshouldShutdown=false
, you must manually remove the broker; if you setshouldShutdown=true
, SBC will shut down the broker for you. In either case, on KRaft mode Confluent Platform deployments, you must callUnregisterBroker
after broker shutdown is complete. This will cleanly remove all traces of the broker on the cluster. In non-KRaft Confluent Platform, this step is not required; but in KRaft mode, simply calling kafka-remove-brokers is not enough to remove all lingering metadata internally within the controllers. TheUnregisterBroker
API does this cleanup.
Remove a broker.
For example, the following command removes broker 1 and moves its data to remaining brokers in the cluster..
kafka-remove-brokers --bootstrap-server localhost:9092,localhost:9093,localhost:9094,localhost:9095,localhost:9096 \ --broker-id 1 --delete 1>&2 | grep -v SLF4J
Self-Balancing acknowledges the command and provides feedback similar to the following.
Initiating remove broker call... Started remove broker task for broker 1. You can check its status by calling this command again with the `--describe` option.
Monitor the progress of the rebalance from the command line.
You can track the shutdown and rebalance operation by plugging in the
--describe
option to the above command in place of the--delete
:kafka-remove-brokers --bootstrap-server localhost:9092,localhost:9093,localhost:9094,localhost:9095,localhost:9096 \ --broker-id 1 --describe 1>&2 | grep -v SLF4J
On an in-progress rebalance, you will get feedback similar to the following.
Broker 1 removal status: Partition Reassignment: IN_PROGRESS Broker Shutdown: COMPLETE
When broker removal is complete, the
--describe
command will show the following.Broker 1 removal status: Partition Reassignment: COMPLETE Broker Shutdown: COMPLETE
Note
If you get the following error, it is likely that Self-Balancing is still initializing, which can take up to 30 minutes. If this happens, retry broker removal after several minutes, and it should succeed.
Broker 1 removal status: Partition Reassignment: FAILED Broker Shutdown: CANCELED
Rerun
kafka-broker-api-versions
to view the brokers online.kafka-broker-api-versions --bootstrap-server localhost:9092 | grep 'id: '
Your output should resemble:
localhost:9092 (id: 0 rack: null) -> ( localhost:9096 (id: 4 rack: null) -> ( localhost:9094 (id: 2 rack: null) -> ( localhost:9095 (id: 3 rack: null) -> (
You can see that broker 1 is now offline.
You can also rerun
kafka-topics --describe
on all topics or a specific topic with the following commands. These may or may not show changes related to the rebalance, but will verify that topics and topic data are still available.kafka-topics --describe --bootstrap-server localhost:9092 kafka-topics --bootstrap-server localhost:9092 --topic my-sbc-test --describe
Add a broker (restart)¶
You can restart the broker after the broker removal operation (previous section) is completed. (This provides an example of “adding a broker”, just using the same broker you removed to simplify the walkthrough.)
Restart the broker (for example broker 1) and watch the rebalance.
The easiest way to do this is to return to the command window where you started broker 1. You should see that the broker has been stopped. Hit the up arrow on your keyboard, and then press return to rerun the same command you started this with originally:
./bin/kafka-server-start etc/kafka/server-1.properties
Self-Balancing acknowledges the command and provides feedback similar to the following, as the broker reboots.
[2020-06-26 17:45:44,986] INFO BROKER Aggregator rolled out 1 new windows, reset 1 windows, current window range [1593213000000, 1593219000000], abandon 0 samples. (com.linkedin.cruisecontrol.monitor.sampling.aggregator.MetricSampleAggregator) [2020-06-26 17:46:06,314] INFO DataBalancer: Scheduling DataBalanceEngine broker addition: [1] (io.confluent.databalancer.ConfluentDataBalanceEngine) [2020-06-26 17:46:06,314] INFO DataBalancer: Starting addBrokers call (io.confluent.databalancer.ConfluentDataBalanceEngine)
When the broker is up, rerun
kafka-broker-api-versions
to view all brokers online.kafka-broker-api-versions --bootstrap-server localhost:9092 | grep 'id: ' Your output should resemble:
localhost:9095 (id: 3 rack: null) -> ( localhost:9093 (id: 1 rack: null) -> ( localhost:9096 (id: 4 rack: null) -> ( localhost:9094 (id: 2 rack: null) -> ( localhost:9092 (id: 0 rack: null) -> (
You can see that broker 1 is back online.
Use Control Center¶
The following sections describe how to remove a broker and monitor the progress of the rebalance using the Control Center. If you restart the broker, Self-Balancing redistributes the data again across all nodes.
To learn more about working with Self-Balancing on Control Center, see Self-balancing in the Control Center guide.
Verify status of brokers and topic data¶
Use Control Center to verify the current status of the deployment, including Self-Balancing settings, lead broker (controller), topic data, and number of brokers. For a local deployment, Control Center is available at http://localhost:9021/ in your web browser.
To verify where the controller is running, go to Control Center, select the cluster, and click Brokers. In this example, the controller is running on broker 0.
Tip
If broker metrics and the list of brokers are not showing on the Brokers overview page, verify that the Metrics Reporter is enabled, as described in Enable the Metrics Reporter for Control Center. If not, stop the brokers, edit the files, and restart.
To view the status of Self-Balancing broker tasks, click Brokers, then click the Self-Balancing card.
Tip
The Self-Balancing card on the Brokers overview indicates whether Self-Balancing is on, and also the status of the workload optimizer:
- When Self-Balancing is set to trigger Only when brokers are added or removed (the default), the Self-Balancing card shows “Workload optimizer” as “Disabled”.
- When Self-Balancing is set to trigger Anytime, the Self-Balancing card shows “Workload optimizer” as “Balanced” if no rebalancing is in progress, or one of the work-in-progress statuses during a rebalance.
To learn more, see Self-balancing in the Control Center guide.
Self-Balancing shows the task status for each broker in the cluster. (You’ll see this in action in the next sections on removing a broker and adding one back in.)
To view all brokers online, scroll to the bottom of the Brokers page to see the broker list, or click Cluster settings > Broker defaults tab.
To verify that Self-Balancing is enabled, click Cluster settings > Self-balancing tab.
To view the generated messages for a topic, select Topics > my-sbc-test > Messages tab.
Remove a broker¶
With Self-Balancing enabled, and Confluent Platform up and running, delete a broker and monitor the rebalancing. For this example, make sure that you do not delete the controller, which in example is broker ID 0.
Important
- In practice, you can remove a lead broker. It may cause a short delay in cluster balancing, which is why we suggest not doing so for this example. To learn more, What happens if the lead broker (controller) is removed or lost?.
- If the broker you attempt to remove contains the only replica for a topic, the broker removal will fail. To learn more, see Limitations.
- Starting with Confluent Platform 7.3.0, Self-Balancing Clusters supports Apache Kafka® Raft Metadata mode (KRaft), see KRaft: Apache Kafka without ZooKeeper.
If you are using Self-balancing with Confluent for Kubernetes,
and using CFK to shut down the broker, there is no further action needed. However, if you are running Confluent Platform outside of CFK on a KRaft supported version of Confluent Platform,
and separately calling kafka-remove-brokers, you must take an additional action to call the
UnregisterBroker
API throughkafka-cluster.sh
after the broker removal is complete. If you call kafka-remove-brokers withshouldShutdown=false
, you must manually remove the broker; if you setshouldShutdown=true
, SBC will shut down the broker for you. In either case, on KRaft mode Confluent Platform deployments, you must callUnregisterBroker
after broker shutdown is complete. This will cleanly remove all traces of the broker on the cluster. In non-KRaft Confluent Platform, this step is not required; but in KRaft mode, simply calling kafka-remove-brokers is not enough to remove all lingering metadata internally within the controllers. TheUnregisterBroker
API does this cleanup.
Remove a broker using the Control Center option on the Brokers overview page.
Select Brokers, scroll to the bottom of the Overview page to view the list of brokers currently online.
Click the broker you want to remove. (Clicking a broker drills down to broker details and also provides a remove option).
At the bottom of the broker details page, click Remove broker, then type REMOVE in the input field to verify that you want to take this action.
Click Continue to start the remove broker task.
Note
If you get an error message that broker removal failed due to insufficient metrics, Self-Balancing is still initializing, which can take up to 30 minutes. If this happens, retry broker removal after several minutes, and it should succeed.
Use the Control Center to monitor the rebalance.
On Control Center, click Brokers > Self-balancing to track the progress.
Self-Balancing shows the detailed status for each broker in the cluster. In this case, broker 1 shows an
in-progress
status under Remove broker tasks.While the remove operation is in progress, the broker being removed shows a red “failed” indicator on the brokers list at the bottom of the Brokers overview page.
When the rebalance is complete, both the Brokers overview page and Cluster settings > Broker defaults will show only 4 brokers in the list: 0, 2, 3 and 4.
Add a broker (restart)¶
Restart the broker (for example broker 1) and watch the rebalance. (This provides an example of “adding a broker”, just using the same broker you removed to simplify the walkthrough.)
To restart a broker, you must use the command line. Return to the command window where you started broker 1. You should see that the broker has been stopped. Hit the up arrow on your keyboard, and then press return to rerun the same command you started this with originally:
kafka-server-start etc/kafka/server-1.properties
Self-Balancing acknowledges the command and provides feedback similar to the following.
[2020-06-26 17:45:44,986] INFO BROKER Aggregator rolled out 1 new windows, reset 1 windows, current window range [1593213000000, 1593219000000], abandon 0 samples. (com.linkedin.cruisecontrol.monitor.sampling.aggregator.MetricSampleAggregator) [2020-06-26 17:46:06,314] INFO DataBalancer: Scheduling DataBalanceEngine broker addition: [1] (io.confluent.databalancer.ConfluentDataBalanceEngine) [2020-06-26 17:46:06,314] INFO DataBalancer: Starting addBrokers call (io.confluent.databalancer.ConfluentDataBalanceEngine)
Use Control Center to monitor the progress of Self-Balancing (Brokers > Self-balancing).
For example, as the broker is being added back in, you should see an in-progress indicator under Add broker tasks.
When the rebalance is complete, navigate to the broker list at the bottom of the Brokers page to verify that broker 1 is back online, for a total of five brokers.
Shutdown and cleanup tasks¶
Run the following shutdown and cleanup tasks.
- Stop the
kafka-producer-perf-test
with Ctl-C in its respective command window. - Stop the all of the other components with Ctl-C in their respective command windows, preferably in reverse order in which you started them. For example, stop Control Center first, then other components, followed by Kafka brokers, and finally ZooKeeper.
- Remove log directories from
/tmp
.