Quick Start for Auto Data Balancing in Confluent Platform¶
This Quick Start shows how to use the Auto Data Balancer to generate, run, and verify a data rebalancing plan on a multi-broker cluster.
Requirements and Limitations¶
To compute the rebalance plan, the tool relies on metrics collected from the Apache Kafka®
cluster. This data is published by the Confluent Metrics Reporter
to a configurable Kafka topic (_confluent-metrics
by default) in a configurable Kafka cluster.
To enable the Metrics Reporter, see the installation instructions.
Also, keep in mind the following limitations and run notes.
- Starting in Confluent Platform 6.0.0, the Manage Self-Balancing Kafka Clusters in Confluent Platform feature is the preferred alternative to Auto Data Balancer. For a detailed feature comparison, see Self-Balancing vs. Auto Data Balancer.
- Auto Data Balancer and Self-Balancing cannot be used together. If you want to run Auto Data Balancer, you must first make sure that Self-Balancing is off.
- If Configure Multi-Region Clusters in Confluent Platform is enabled and a topic is created with a replica placement policy, Auto Data Balancer will redistribute the preferred leaders among all racks that have replicas.
- As of Confluent Platform 7.5, ZooKeeper is deprecated for new deployments. Confluent recommends KRaft mode for new deployments. To learn more about running Kafka in KRaft mode, see KRaft Overview for Confluent Platform, the KRaft steps in the Platform Quick Start, and Settings for other Kafka and Confluent Platform components.
Confluent Auto Data Balancer Quick Start¶
Start a Kafka cluster¶
Start a ZooKeeper server. In this example, assume services run on localhost
.
# Start ZooKeeper. Run this command in its own terminal.
./bin/zookeeper-server-start ./etc/kafka/zookeeper.properties
Tip
These instructions are based on the assumption that you are installing Confluent Platform by using ZIP or TAR archives. For more information, see Install Confluent Platform On-Premises.
Copy the broker configuration file to a temporary location, enable the metrics reporter, and duplicate the config for additional brokers.
# Copy the configuration files to /tmp
cp ./etc/kafka/server.properties /tmp/server0.properties
# Add metrics reporter configurations (alternatively, you could uncomment the configurations)
echo "" >> /tmp/server0.properties
echo "metric.reporters=io.confluent.metrics.reporter.ConfluentMetricsReporter" >> /tmp/server0.properties
echo "confluent.metrics.reporter.bootstrap.servers=localhost:9092" >> /tmp/server0.properties
echo "confluent.metrics.reporter.topic.replicas=1" >> /tmp/server0.properties
# properties for broker.id=1
cp /tmp/server0.properties /tmp/server1.properties
sed -i'' -e "s/broker.id=0/broker.id=1/g" /tmp/server1.properties
sed -i'' -e "s/9092/9082/g" /tmp/server1.properties
sed -i'' -e "s/#listen/listen/g" /tmp/server1.properties
sed -i'' -e "s/kafka-logs/kafka-logs-1/g" /tmp/server1.properties
# properties for broker.id=2
cp /tmp/server0.properties /tmp/server2.properties
sed -i'' -e "s/broker.id=0/broker.id=2/g" /tmp/server2.properties
sed -i'' -e "s/9092/9072/g" /tmp/server2.properties
sed -i'' -e "s/#listen/listen/g" /tmp/server2.properties
sed -i'' -e "s/kafka-logs/kafka-logs-2/g" /tmp/server2.properties
# properties for broker.id=3
cp /tmp/server0.properties /tmp/server3.properties
sed -i'' -e "s/broker.id=0/broker.id=3/g" /tmp/server3.properties
sed -i'' -e "s/9092/9062/g" /tmp/server3.properties
sed -i'' -e "s/#listen/listen/g" /tmp/server3.properties
sed -i'' -e "s/kafka-logs/kafka-logs-3/g" /tmp/server3.properties
Start the Kafka brokers:
# Start Kafka. Run these commands in a separate terminal:
./bin/kafka-server-start /tmp/server0.properties &
./bin/kafka-server-start /tmp/server1.properties &
./bin/kafka-server-start /tmp/server2.properties &
./bin/kafka-server-start /tmp/server3.properties &
For complete details on getting these services up and running see the quickstart for Confluent Platform.
Create topics and produce data¶
Create a couple of topics, each with 4 partitions and a replication factor of 2. We intentionally create an unbalanced assignment:
./bin/kafka-topics --create --topic topic-a --replica-assignment 0:1,0:1,0:1,0:1 --bootstrap-server localhost:9092
./bin/kafka-topics --create --topic topic-b --replica-assignment 1:0,2:1,1:2,2:1 --bootstrap-server localhost:9092
Examine how they look:
./bin/kafka-topics --describe --topic topic-a --bootstrap-server localhost:9092
Topic:topic-a PartitionCount:4 ReplicationFactor:2 Configs:
Topic: topic-a Partition: 0 Leader: 0 Replicas: 0,1 Isr: 0,1
Topic: topic-a Partition: 1 Leader: 0 Replicas: 0,1 Isr: 0,1
Topic: topic-a Partition: 2 Leader: 0 Replicas: 0,1 Isr: 0,1
Topic: topic-a Partition: 3 Leader: 0 Replicas: 0,1 Isr: 0,1
./bin/kafka-topics --describe --topic topic-b --bootstrap-server localhost:9092
Topic:topic-b PartitionCount:4 ReplicationFactor:2 Configs:
Topic: topic-b Partition: 0 Leader: 1 Replicas: 1,0 Isr: 1,0
Topic: topic-b Partition: 1 Leader: 2 Replicas: 2,1 Isr: 2,1
Topic: topic-b Partition: 2 Leader: 1 Replicas: 1,2 Isr: 1,2
Topic: topic-b Partition: 3 Leader: 2 Replicas: 2,1 Isr: 2,1
Now produce some data:
./bin/kafka-producer-perf-test --topic topic-a --num-records 200000 --record-size 1000 --throughput 10000000 --producer-props bootstrap.servers=localhost:9092
./bin/kafka-producer-perf-test --topic topic-b --num-records 800000 --record-size 1000 --throughput 10000000 --producer-props bootstrap.servers=localhost:9092
And finally, force the creation of the offsets topic by running a consumer:
./bin/kafka-consumer-perf-test --topic topic-a --broker-list localhost:9092 --messages 10
Execute the rebalancer¶
Before starting, it’s worth mentioning that if you run ./bin/confluent-rebalancer
with no arguments, it will output a list of supported commands along with a description.
Begin with the execute
command. Specify the bootstrap servers for the Kafka cluster
containing the metrics topic and the maximum bandwidth (in bytes per second) allocated to
moving replicas. The verbose
flag includes per broker statistics in the CLI output,
which is useful unless the number of brokers is very high.
./bin/confluent-rebalancer execute --bootstrap-server localhost:9092 --metrics-bootstrap-server localhost:9092 --throttle 10000000 --verbose
You are presented with a rebalancing plan, which will be slightly different depending on whether the Kafka brokers are configured to use a single log directory (as shown in our example) or multiple log directories. In the former case, the rebalancer ensures that the volume containing the log directory has at least the specified percentage of free space during and after the rebalance (20% is the default).
Computing the rebalance plan (this may take a while) ...
You are about to move 17 replica(s) for 14 partitions to 4 broker(s) with total size 827.2 MB.
The preferred leader for 14 partition(s) will be changed.
In total, the assignment for 15 partitions will be changed.
The minimum free volume space is set to 20.0%.
The following brokers will have less than 40% of free volume space during the rebalance:
Broker Current Size (MB) Size During Rebalance (MB) Free % During Rebalance Size After Rebalance (MB) Free % After Rebalance
0 413.6 620.4 30.1 519.6 30.5
2 620.4 723.8 30.1 520.8 30.5
3 0 517 30.1 520.8 30.5
1 1,034 1,034 30.1 519.6 30.5
Min/max stats for brokers (before -> after):
Type Leader Count Replica Count Size (MB)
Min 12 (id: 3) -> 17 (id: 0) 37 (id: 3) -> 43 (id: 3) 0 (id: 3) -> 517 (id: 1)
Max 21 (id: 0) -> 17 (id: 0) 51 (id: 1) -> 45 (id: 0) 1,034 (id: 1) -> 517 (id: 3)
No racks are defined.
Broker stats (before -> after):
Broker Leader Count Replica Count Size (MB) Free Space (%)
0 21 -> 17 48 -> 45 413.6 -> 517 30.5 -> 30.5
1 20 -> 17 51 -> 44 1,034 -> 517 30.5 -> 30.5
2 15 -> 17 40 -> 44 620.4 -> 517 30.5 -> 30.5
3 12 -> 17 37 -> 43 0 -> 517 30.5 -> 30.5
Would you like to continue? (y/n):
Because you are running all brokers in the same volume, the free space numbers do not change after the rebalance. If configuring multiple log directories, pay close attention to the additional disk space requirements during and after the rebalance to ensure that it won’t cause a broker to run out of disk space.
In addition, the min/max
stats provide a quick summary of the data balance
improvement after the rebalance completes. The goal should be for the min
and
max
values to be closer to each other after the rebalance. In this case, we
achieve near optimal balance so the numbers are virtually identical.
If you proceed, you will see the following (don’t ignore the warning):
Rebalance started, its status can be checked via the status command.
Warning: You must run the status or finish command periodically, until the rebalance completes, to ensure the throttle is removed. You can also alter the throttle by re-running the execute command passing a new value.
At this point, the tool will exit and the rebalance will take place in the background (driven by the Kafka Controller).
Check status, finish and cancel¶
Check the status of the rebalance:
./bin/confluent-rebalancer status --bootstrap-server localhost:9092
Partitions being rebalanced:
Topic topic-a: 1,2
Topic topic-b: 0
Eventually, the rebalance will complete (finish
and status
are similar;
the latter has more concise output and returns a 0 exit status code when the
rebalance is finished):
./bin/confluent-rebalancer finish --bootstrap-server localhost:9092 ⏎
The rebalance has completed and throttling has been disabled
Verify that the replica assignment is balanced:
./bin/kafka-topics --describe --topic topic-a --bootstrap-server localhost:9092
Topic:topic-a PartitionCount:4 ReplicationFactor:2 Configs:
Topic: topic-a Partition: 0 Leader: 3 Replicas: 3,2 Isr: 2,3
Topic: topic-a Partition: 1 Leader: 2 Replicas: 2,3 Isr: 2,3
Topic: topic-a Partition: 2 Leader: 0 Replicas: 1,0 Isr: 0,1
Topic: topic-a Partition: 3 Leader: 0 Replicas: 0,1 Isr: 0,1
./bin/kafka-topics --describe --topic topic-b --bootstrap-server localhost:9092
Topic:topic-b PartitionCount:4 ReplicationFactor:2 Configs:
Topic: topic-b Partition: 0 Leader: 3 Replicas: 3,0 Isr: 0,3
Topic: topic-b Partition: 1 Leader: 0 Replicas: 0,3 Isr: 0,3
Topic: topic-b Partition: 2 Leader: 1 Replicas: 1,2 Isr: 1,2
Topic: topic-b Partition: 3 Leader: 2 Replicas: 2,1 Isr: 2,1
Partition sizes are communicated asynchronously (every 15 seconds
by default and configurable using the confluent.metrics.reporter.publish.ms
configuration), so there may be a delay before the tool reports the correct information after a
rebalance completes, which is particularly noticeable when conducting local tests such as this.
You can use the cancel
command to stop any reassignment currently in progress.
./bin/confluent-rebalancer cancel --bootstrap-server localhost:9092
Leader balance¶
If auto.leader.rebalance.enable
is disabled on your brokers, run the preferred
leader election tool after the rebalance completes. This will ensure that the
actual leaders are balanced (not just the preferred leaders).
./bin/kafka-leader-election --election-type PREFERRED --bootstrap-server localhost:9092
Created preferred replica election path with _confluent-metrics-7,_confluent-metrics-2,_confluent-metrics-5,_confluent-metrics-11,_confluent-metrics-10,topic-b-2,topic-a-1,_confluent-metrics-1,topic-a-3,__confluent.support.metrics-0,topic-a-0,_confluent-metrics-9,_confluent-metrics-0,topic-a-2,topic-b-3,_confluent-metrics-3,_confluent-metrics-4,_confluent-metrics-6,topic-b-1,_confluent-metrics-8,topic-b-0
Successfully started preferred replica election for partitions Set(_confluent-metrics-7, _confluent-metrics-2, _confluent-metrics-5, _confluent-metrics-11, _confluent-metrics-10, topic-b-2, topic-a-1, _confluent-metrics-1, topic-a-3, __confluent.support.metrics-0, topic-a-0, _confluent-metrics-9, _confluent-metrics-0, topic-a-2, topic-b-3, _confluent-metrics-3, _confluent-metrics-4, _confluent-metrics-6, topic-b-1, _confluent-metrics-8, topic-b-0)
# run again to see the completion
./bin/kafka-leader-election --election-type PREFERRED --bootstrap-server localhost:9092
Successfully completed preferred replica election for partitions topic-b-3, topic-b-0, _confluent-metrics-1, _confluent-metrics-5, _confluent-metrics-9, topic-a-1, _confluent-metrics-2, topic-b-1, _confluent-metrics-6, __confluent.support.metrics-0, _confluent-metrics-10, topic-a-2, _confluent-metrics-3, topic-b-2, topic-a-3, _confluent-metrics-7, _confluent-metrics-4, _confluent-metrics-11, _confluent-metrics-8, _confluent-metrics-0, topic-a-0
Decommissioning brokers¶
The confluent-rebalancer
tool automatically generates a rebalance plan for
decommissioning brokers using the --remove-broker-ids
option. The
cluster is balanced in the same execution.
./bin/confluent-rebalancer execute --bootstrap-server localhost:9092 --metrics-bootstrap-server localhost:9092 --throttle 100000 --remove-broker-ids 1
Computing the rebalance plan (this may take a while) ...
You are about to move 48 replica(s) for 48 partitions to 3 broker(s) with total size 775.5 MB.
The preferred leader for 20 partition(s) will be changed.
In total, the assignment for 49 partitions will be changed.
You have requested all replicas to be moved out of 1 broker(s) with ID(s): 1.
After the rebalance, these broker(s) will have no replicas.
The following brokers will require more disk space during the rebalance and, in some cases, after the rebalance:
Broker Current (MB) During Rebalance (MB) After Rebalance (MB)
0 517 775.5 568.7
2 517 775.5 723.8
3 517 775.5 775.5
Min/max stats for brokers (before -> after):
Type Leader Count Replica Count Size (MB)
Min 17 (id: 0) -> 0 (id: 1) 43 (id: 3) -> 0 (id: 1) 517 (id: 2) -> 0 (id: 1)
Max 17 (id: 0) -> 23 (id: 0) 45 (id: 0) -> 59 (id: 0) 517 (id: 1) -> 775.5 (id: 3)
No racks are defined.
Would you like to continue? (y/n):
Given that we have 8 partitions with roughly the same amount of data, it’s not possible for the 3 remaining brokers to have
the same amount of data. Broker 2
would have 2 partitions after the rebalance and therefore, less data.
Limiting bandwidth usage during data migration¶
Use the --throttle
option to limit the amount of bandwidth used for
replication. It is possible to update the value while a rebalance is in progress
by simply rerunning the tool.
./bin/confluent-rebalancer execute --bootstrap-server localhost:9092 --metrics-bootstrap-server localhost:9092 --throttle 100000
The throttle rate was updated to 100000 bytes/sec.
A rebalance is currently in progress for:
Topic topic-b: 0,1
See Throttling for more details.
Limiting the scope of the reassignment¶
You can use the --topics
and --exclude-internal-topics
flags to limit the set of topics that are eligible for reassignment.
You can use --replica-placement-only
to perform reassignment only on partitions that do not satisfy the replica placement
constraints. Assuming that topic topic-c
was configured to use replica placement constraints, the following command makes the necessary
reassignment to satisfy the constraints.
./bin/confluent-rebalancer execute --bootstrap-server localhost:9092 --metrics-bootstrap-server localhost:9092 --throttle 100000 --topics topic-c --replica-placement-only
For more details on replica placement constraints, see Replica placement.
Incremental reassignment¶
For Kafka clusters with a large number of topic partitions, performing a reassignment in one batch can result in degraded performance or
availability. To limit the maximum number of reassignments per leader, use the flag --incremental
and the configuration parameter
max.concurrent.moves.per.leader
.
./bin/confluent-rebalancer execute --incremental --bootstrap-server localhost:9092 --metrics-bootstrap-server localhost:9092 --throttle 100000
Use JMX for monitoring¶
Authentication is disabled for Java Management Extensions (JMX) by default in Kafka. You must use environment variables to override these defaults.
Use KAFKA_JMX_OPTS
for processes started using the CLI or set the appropriate Java system properties.
For each platform component, the options to override the JMX defaults take the form <component-name>_JMX_OPTS
.
For the confluent-rebalancer
, use the environment variable REBALANCER_JMX_OPTS
for processes started using the CLI or set the appropriate Java system properties.
Licensing¶
A license was not specified as part of the quick start. If you have a Confluent license, you can specify
it using the confluent.license
configuration. For more details about licenses, see Manage Confluent Platform Licenses Using Control Center.
Suggested Reading¶
- Tutorial: Automatic Data Balancing on Docker
- Auto Data Balancer Configuration and Command Reference for Confluent Platform
- Scaling the cluster (adding a node to a Kafka cluster)
- Monitor and Track Metrics in Docker with JMX in Confluent Platform, especially the section on how to override JMX defaults for the Rebalancer