Self-Balancing Cluster Configuration and Command Reference for Confluent Platform¶
This section covers Self-Balancing configuration options and commands, and provides examples.
See Tutorial: Add and Remove Brokers with Self-Balancing in Confluent Platform for an example of how to set up and run a quick test of Self-Balancing by removing a broker and monitoring the rebalance. The example provides guidance on proper configuration of replication factors and demos the Self-Balancing specific command, kafka-remove-brokers.
Self-Balancing configurations on the brokers¶
The following configuration options specific to Self-Balancing Clusters are available in:
- ZooKeeper mode:
$CONFLUENT_HOME/etc/kafka/server.properties
- KRaft mode:
$CONFLUENT_HOME/etc/kafka/kraft/broker.properties
,$CONFLUENT_HOME/etc/kafka/kraft/controller.properties
,$CONFLUENT_HOME/etc/kafka/kraft/server.properties
If you are using Self-Balancing in combination with Configure Multi-Region Clusters in Confluent Platform , you must also specify the rack location for each broker with
broker rack in each respective server.properties
file.
To learn more, see Replica placement and multi-region clusters.
confluent.balancer.enable¶
Enables self balancing, meaning load across the Kafka cluster is measured and data is rebalanced as needed, depending on multiple goals and factors.
If this property is specified as false
, or not explicitly specified at all in the properties file, the value is inferred to be false
or off.
- ZooKeeper mode: In the example
server.properties
file for ZooKeeper mode,confluent.balancer.enable
is set totrue
, which means Self-Balancing is on. In addition, for Docker images that run in ZooKeeper mode, andconfluent.balancer.enable
is set totrue
, which means Self-Balancing is on. For ZooKeeper mode, if you want Self-Balancing on, you do not need to make any changes. - KRaft mode: In the example
server.properties
,controller.properties
andbroker.properties
files for KRaft mode,confluent.balancer.enable=true
is commented out, which means Self-Balancing is off. You will need to uncomment or addconfluent.balancer.enable=true
to the properties file for every broker and controller in the cluster to turn on Self-Balancing. In addition, you must setinter.broker.listener.name
for each broker and controller in the properties file. For more about configuring KRaft mode, see Configuration options.
The Self-Balancing feature is dynamic meaning can toggle Self-Balancing on or off while the cluster is running.
Do not disable Self-Balancing while an add or remove broker operation is in progress; wait until the add or remove completes. Disabling Self-Balancing during either operation will fence off the broker from future partition distribution.
- Type: boolean
- Default: false
- Importance: high
confluent.balancer.heal.uneven.load.trigger¶
Sets the conditions upon which to trigger a rebalance. Valid values are:
EMPTY_BROKER
: Move data to rebalance the cluster only when an empty broker (one with no partitions on it) is added to the cluster.ANY_UNEVEN_LOAD
: Balance the load across the cluster whenever an imbalance is detected. This includes empty brokers and any other uneven load condition caused by ongoing cluster activity. For example, load peaks on specific partitions could trigger a rebalance.
This is a dynamic option, meaning you can change the trigger condition for Self-Balancing while the cluster is running.
- Type: string
- Default:
EMPTY_BROKER
- Importance: high
confluent.balancer.heal.broker.failure.threshold.ms¶
Sets the default time to declare a broker has permanently failed.
To adjust the threshold for the duration of time before a broker is declared “failed”,
edit $CONFLUENT_HOME/etc/kafka/server.properties
.
Uncomment the line for confluent.balancer.heal.broker.failure.threshold.ms
and change its value to a positive number of your choosing. To turn off broker failure detection
uncomment the line that sets confluent.balancer.heal.broker.failure.threshold.ms
to -1
.
When a broker disappears from the cluster, Self-Balancing will wait this length of time before declaring the broker permanently failed and rebalancing its data onto other brokers. This is independent of what value the confluent.balancer.heal.uneven.load.trigger value is set to.
A broker marked as permanently failed by Self-Balancing after being down for longer than confluent.balancer.heal.broker.failure.threshold.ms
,
which then later recovers, will not be considered in self-balancing operations for 14 days (or until the controller changes).
The default is 1 hour (3600000 ms).
- Type: int
- Default: 3600000
- Importance: medium
confluent.balancer.throttle.bytes.per.second¶
Specifies the maximum network bandwidth available for use by data balancing operations. The value is specified in bytes/sec/broker. This is a dynamic option option. You can adjust the throttle while the cluster is running.
The default is 10MB per second.
- Type: int
- Default: 10485760 (10MB/sec)
- Importance: medium
How to choose a throttle setting¶
Self-Balancing uses the Apache Kafka® replication throttle to limit the rate at
which data is moved when reassigning partitions as part of a rebalance. Because
each cluster’s workload and hardware capabilities are different, it is difficult to
provide generally applicable guidance for choosing a throttle. Normal
replication traffic counts against this throttle, so in order for a rebalance to
make progress, the throttle must be larger than the highest inbound or outbound
replication rate of any broker in the cluster. These values are exposed as the
kafka.server:type=BrokerTopicMetrics,name=ReplicationBytesInPerSec
and
kafka.server:type=BrokerTopicMetrics,name=ReplicationBytesOutPerSec
broker metrics.
A safe method to choose a throttle is to start with a small multiple (3 to 5x) of this maximum replication traffic, and then dynamically increase the throttle in small increments, while monitoring cluster health. Produce request latency is a good metric to monitor, because it will increase if too much network throughput is used for reassignment traffic or if too much data is being written to disk as partitions are reassigned. This starting value is a conservative one, and for some clusters, a much larger throttle may be safe.
Self-Balancing automatically removes the replication throttle when it is not reassigning partitions, so as a cluster administrator you do not need to manually remove it.
confluent.balancer.disk.max.load¶
Limits a specified fraction of the log disk (0-1.0) to be used before rebalancing.
The default is 85% of the log disk.
- Type: int
- Default: 0.85
- Importance: medium
confluent.balancer.disk.max.replicas¶
Specifies a maximum number of replicas allowed per broker.
The default is 10,000 replicas, which is also the maximum allowed. The recommended value for this option is 4,000 replicas/partitions per broker.
- Type: int
- Default: 10000
- Importance: medium
confluent.balancer.exclude.topic.names¶
Specifies topics that should not be moved by Self-Balancing. You can specify multiple topics in a comma-separated list.
- Removal operations always move topics, regardless of this setting.
- As a best practice, keep excluded topics to a minimum, and use this configuration only as necessary. Excluding too many topics can interfere with Self-Balancing. To learn more, see Too many excluded topics causes problems with Self-Balancing in Troubleshooting.
For example, the following configuration would exclude the topics pizza
and popcorn
:
confluent.balancer.exclude.topic.names=pizza,popcorn
- Type: string
- Default: “”
- Importance: medium
confluent.balancer.exclude.topic.prefixes¶
Specifies topics that should not be moved by Self-Balancing based on topic prefixes. You can specify multiple topic prefixes in a comma-separated list.
- Removal operations always move topics, regardless of this setting.
- As a best practice, keep excluded topics to a minimum, and use this configuration only as necessary. Excluding too many topics can interfere with Self-Balancing. To learn more, see Too many excluded topics causes problems with Self-Balancing in Troubleshooting.
For example, the following configuration would exclude all topics prefixed with pizza.sales.ny
and pizza.sales.tx
:
confluent.balancer.exclude.topic.prefixes=pizza.sales.ny,pizza.sales.tx
- Type: string
- Default: “”
- Importance: medium
confluent.balancer.topic.replication.factor¶
Specifies the replication factor for the topics the Self-Balancing uses to store internal state.
- For anything other than development testing, a value greater than 1 is recommended for
confluent.balancer.topic.replication.factor
to ensure availability. - Replication factors can never be greater than the total number of brokers (regardless of Self-Balancing).
- When Self-Balancing is enabled, the value for
confluent.balancer.topic.replication.factor
and all other instances ofreplication.factor
inserver.properties
must be set to a value greater than 1, but less than the total number of brokers in the cluster. If the value for a replication factor is equal to the total number of brokers deployed and a broker goes down or is removed, the replication factor will prevent rebalancing.
The default value is 3.
- Type: int
- Default: 3
- Importance: medium
Self-Balancing internal topics¶
Self-Balancing creates the following system or internal topic, which it uses to manage cluster auto-balancing.
_confluent_balancer_api_state
: Stores state information used by the Self-Balancing APIs.
Additionally, Self-Balancing requires metrics on cluster performance from the Configure Telemetry Reporter for Confluent Platform, which is enabled by default
(confluent.reporters.telemetry.auto.enable = true
) when Self-Balancing is enabled.
Metrics needed specifically for Self-Balancing on processing loads, message queue size,
response times, replication data, and more are sent to the internal topic
_confluent-telemetry-metrics
. Self-Balancing reads from this internal topic and uses the metrics in
its balancing algorithm. With Self-Balancing running, _confluent-telemetry-metrics
can collect a large amount of data in a short time. The default retention period
for internal topics is 3 days, however you can use kafka-topics --config
to
modify this. A retention period of 1 day is typically sufficient for this topic
with regards to Self-Balancing.
By convention, Confluent Platform system topics are prefixed with an underscore. The Self-Balancing topic is prefixed with _confluent_balancer_
You can get a list of existing topics (user-created and system topics) as follows:
kafka-topics --list --bootstrap-server localhost:9092
Get detailed information on all topics or a specific topic with the --describe
option:
kafka-topics --describe --bootstrap-server localhost:9092
kafka-topics --describe --topic <topic> --bootstrap-server localhost:9092
For example, run kafka-topics --describe
on the _confluent_balancer_api_state
topic to view details.
Required Configurations for Control Center¶
Self-Balancing requires embedded Confluent REST Proxy to communicate with Confluent Control Center monitoring.
Configure REST Endpoints in the Control Center properties file¶
If you want to use Control Center with Self-Balancing for Configuration and monitoring, you must configure the Control Center cluster with REST endpoints to enable HTTP servers on the brokers. If this is not configured properly for all brokers, Self-Balancing will not be accessible from Confluent Control Center.
In the appropriate Control Center properties file, use confluent.controlcenter.streams.cprest.url
to define the REST endpoints for controlcenter.cluster
. The default is http://localhost:8090
,
as shown below.
# Kafka REST endpoint URL
confluent.controlcenter.streams.cprest.url="http://localhost:8090"
Identify the associated URL for each broker. If you have multiple brokers in the cluster, use a comma-separated list.
See also
- An example of configuring this on a five-broker cluster in Configure Control Center with REST endpoints and advertised listeners in the Self-Balancing Tutorial
confluent.controlcenter.streams.cprest.url
in the Control Center Configuration Reference- Self-Balancing options do not show up on Control Center in Troubleshooting
Configure authentication for REST endpoints on Kafka brokers (Secure Setup)¶
Tip
- Self-Balancing does not require the Metadata Service (MDS) or security to run, but if you want to configure security, you can get started with the following example which shows an MDS client configuration for RBAC.
- You can use
confluent.metadata.server.listeners
(which will enable the Metadata Service) instead ofconfluent.http.server.listeners
to listen for API requests. Use eitherconfluent.metadata.server.listeners
orconfluent.http.server.listeners
, but not both. If a listener uses HTTPS, then appropriate TLS configuration parameters must also be set. To learn more, see Admin REST APIs Configuration Options for Confluent Server.
To run Self-Balancing in a secure setup, you must configure authentication for REST endpoints in each of the Kafka broker server.properties
files.
If the Kafka broker files are missing these configs, Control Center will not be able to access Self-Balancing in a secure setup.
At a minimum, you will need the following configurations.
# EmbeddedKafkaRest: HTTP Auth Configuration
kafka.rest.kafka.rest.resource.extension.class=io.confluent.kafkarest.security.KafkaRestSecurityResourceExtension
kafka.rest.rest.servlet.initializor.classes=io.confluent.common.security.jetty.initializer.InstallBearerOrBasicSecurityHandler
Here is an example of an MDS client configuration for Kafka RBAC in a broker server.properties
file .
# EmbeddedKafkaRest: Kafka Client Configuration
kafka.rest.bootstrap.servers=<host:port>,<host:port>,<host:port>
kafka.rest.client.security.protocol=SASL_PLAINTEXT
# EmbeddedKafkaRest: HTTP Auth Configuration
kafka.rest.kafka.rest.resource.extension.class=io.confluent.kafkarest.security.KafkaRestSecurityResourceExtension
kafka.rest.rest.servlet.initializor.classes=io.confluent.common.security.jetty.initializer.InstallBearerOrBasicSecurityHandler
kafka.rest.public.key.path=<rbac_enabled_public_pem_path>
# EmbeddedKafkaRest: MDS Client configuration
kafka.rest.confluent.metadata.bootstrap.server.urls=<host:port>,<host:port>,<host:port>
kafka.rest.ssl.truststore.location=<truststore_location>
kafka.rest.ssl.truststore.password=<password>
kafka.rest.confluent.metadata.http.auth.credentials.provider=BASIC
kafka.rest.confluent.metadata.basic.auth.user.info=<user:password>
kafka.rest.confluent.metadata.server.urls.max.age.ms=60000
kafka.rest.client.confluent.metadata.server.urls.max.age.ms=60000
See also
- Security considerations
- Configure Security for the Admin REST APIs for Confluent Server
- Deploy Secure Standalone REST Proxy in Confluent Platform
- Scripted Confluent Platform Demo, On-Prem Tutorial, Security section provides examples of different types of configurations:
Examples: Update broker configurations on the fly¶
To update most Self-Balancing settings, you must stop the brokers and shut down the cluster. However, the following cluster default settings are dynamic, meaning they can be performed while the cluster is running.
You can modify the values for these dynamic properties through Confluent Control Center or at the command line with the kafka-configs command and --entity-default
flag, as shown in the examples below.
Tip
To run the examples, you must specify a bootstrap <host:port>
. If your cluster is running on a local host, the default for --bootstrap-server
is localhost:9092
.
Enable or disable Self-Balancing¶
Use confluent.balancer.enable (turns Self-Balancing on or off). Do not disable Self-Balancing while an add or remove broker operation is in progress; wait until the add or remove completes. Disabling Self-Balancing during either operation will fence off the broker from future partition distribution.
To turn Self-Balancing on:
kafka-configs --bootstrap-server <host:port> --entity-type brokers --entity-default --alter --add-config confluent.balancer.enable=true
To turn Self-Balancing off:
kafka-configs --bootstrap-server <host:port> --entity-type brokers --entity-default --alter --add-config confluent.balancer.enable=false
Set trigger condition for rebalance¶
Use confluent.balancer.heal.uneven.load.trigger to rebalance only when brokers are added (or removed) or anytime for any uneven load.
Set Self-Balancing to rebalance on any uneven load (including a change in available brokers):
kafka-configs --bootstrap-server <host:port> --entity-type brokers --entity-default --alter --add-config confluent.balancer.heal.uneven.load.trigger=ANY_UNEVEN_LOAD
Set Self-Balancing to rebalance only when brokers are added or removed:
kafka-configs --bootstrap-server <host:port> --entity-type brokers --entity-default --alter --add-config confluent.balancer.heal.uneven.load.trigger=EMPTY_BROKER
Set or remove a custom throttle¶
Use confluent.balancer.throttle.bytes.per.second to set a custom throttle for maximum network bandwidth available for Self-Balancing or to remove a custom throttle.
To override the default throttle value for reassignment operations (10485760 or 10MB/sec) with a custom, fixed value, you can use the following commands. Dynamic configurations can also be changed through API Reference for Confluent REST Proxy.
kafka-configs --bootstrap-server <host:port> --entity-type brokers --entity-default --alter --add-config confluent.balancer.throttle.bytes.per.second=70485760
To disable a custom throttle:
kafka-configs --bootstrap-server <host:port> --entity-type brokers --entity-default --alter --delete-config confluent.balancer.throttle.bytes.per.second
Monitoring the balancer with kafka-rebalance-cluster¶
Starting in Confluent Enterprise 6.2.0, APIs are provided to gain visibility into the status of Self-Balancing and its operations. You can use the APIs to retrieve these statuses programmatically.
A new Kafka command, kafka-rebalance-cluster, leverages the APIs to retrieve the same information from the CLI.
Get the balancer status¶
The balancer status API provides visibility into the state of the Self-Balancing component itself.
Use kafka-rebalance-cluster
with the --status
flag to query the status of the balancer.
Following are example status messages you may receive as output.
$ bin/kafka-rebalance-cluster.sh --bootstrap-server localhost:9089 --status
Balancer status: ENABLED
$ bin/kafka-rebalance-cluster.sh --bootstrap-server localhost:9089 --status
Balancer status: ERROR
Error description: SBC configured with multiple log directories
Get the workload optimization status (AnyUnevenLoad)¶
If the balancer is set to rebalance on
ANY_UNEVEN_LOAD
, it will automatically detect and act upon uneven
distribution of workloads within a cluster. The even-cluster-load
API provides
visibility into whether a goal violation for workload distribution has been met
and what Self-Balancing is currently doing about it. It provides more context in case
balancing fails due to some internal error or user intervention.
Run kafka-rebalance-cluster
with the --describe
flag to get the
fine-grained status of these balancing optimizations.
$ bin/kafka-rebalance-cluster.sh --bootstrap-server localhost:9089 --describe
Following are example outputs for this command in various scenarios.
When the balancer is set to
EMPTY_BROKER
(notANY_UNEVEN_LOAD
):Uneven load balance status: Current: DISABLED
After startup, but before any uneven load balancing operation has run:
Uneven load balance status: Current: STARTING
When a goal violation detection has succeeded:
Current: BALANCED Last Update Time: 2021-02-26_23:09:23 UTC Previous: BALANCED Last Update Time: 2021-02-26_23:07:23 UTC
During rebalancing:
Current: BALANCING Last Update Time: 2021-02-26_23:25:23 UTC Previous: BALANCED Last Update Time: 2021-02-26_23:19:23 UTC
kafka-remove-brokers¶
Confluent Enterprise 6.0.0 and newer releases include the Kafka command, kafka-remove-brokers.
Self-Balancing must be enabled for this command to work.
Note the following about removing brokers:
- In cases where metrics collection is not yet complete, an attempt to remove the a broker will fail almost immediately due to insufficient metrics for generating a rebalancing plan. If broker removal fails, wait for 30 minutes, then retry. Once Self-Balancing has initialized and had time to collect metrics, the operation should succeed. To learn more, see Broker removal attempt fails during Self-Balancing initialization in Troubleshooting.
- You can also remove a broker through the Control Center UI.
- Do not disable Self-balancing while an add or remove broker operation is in progress; wait until the add or remove completes. Disabling Self-Balancing during either operation will fence off the broker from future partition distribution.
- Starting with Confluent Platform 7.3.0, Self-Balancing Clusters supports Apache Kafka Raft Metadata mode (KRaft), see KRaft Overview for Confluent Platform. If you are using
Self-balancing with Confluent for Kubernetes,
and using CFK to shut down the broker, there is no further action needed. However, if you are running Confluent Platform outside of CFK on a KRaft supported version of Confluent Platform,
and separately calling kafka-remove-brokers, you must take an additional action to call the
UnregisterBroker
API throughkafka-cluster.sh
after the broker removal is complete. If you call kafka-remove-brokers withshouldShutdown=false
, you must manually remove the broker; if you setshouldShutdown=true
, Self-Balancing will shut down the broker for you. In either case, on KRaft mode Confluent Platform deployments, you must callUnregisterBroker
after broker shutdown is complete. This will cleanly remove all traces of the broker on the cluster. In non KRaft Confluent Platform, this step is not required; but in KRaft mode, simply calling kafka-remove-brokers is not enough to remove all lingering metadata internally within the controllers. TheUnregisterBroker
API does this cleanup.
Flags¶
The kafka-remove-brokers command provides the following required and optional flags.
--bootstrap-server
The connection string for the cluster’s broker(s) in the form
host:port
. You can specify multiple URLs, separated by commas, to allow for failover should a broker node go down. (REQUIRED)- Type: string
- Default: empty string
- Importance: high
--delete
Remove one broker from the cluster.
- Type: string
- Default: “”
- Importance: high
--describe
Describe the status of broker removal.
- Type: string
- Default: “”
- Importance: medium
--broker-id
The ID of the broker to remove. (REQUIRED)
- Type: string
- Default: “”
- Importance: low
--broker-ids
The IDs of the brokers to remove.
- Type: string
- Default: “”
- Importance: low
--command-config
Specifies a property file containing configurations to be passed to the Admin Client. This option is used only with the
--bootstrap-server
option.- Type: string
- Default: “”
- Importance: low
--no-shutdown
Prevents shutdown of brokers as part of the removal operation. By default, the brokers are shut down as part of broker removal.
- Type: boolean
- Default: false
- Importance: low
--version
Display Kafka version.
- Type: string
- Default: “”
- Importance: low
Examples¶
The following command removes broker 2 and moves data to broker 1. (This example uses 5 total brokers.)
./bin/kafka-remove-brokers --bootstrap-server localhost:9092,localhost:9093,localhost:9094,localhost:9095,localhost:9096 --broker-id 1 --delete 1>&2 | grep -v SLF4J
The following command provides the status of this rebalance operation. (The
--describe
flag is substituted for--delete
.)./bin/kafka-remove-brokers --bootstrap-server localhost:9092,localhost:9093,localhost:9094,localhost:9095,localhost:9096 --broker-id 1 --describe 1>&2 | grep -v SLF4J
Broker removal phases¶
Understanding the broker removal process can be useful for monitoring and troubleshooting Self-Balancing. To accomplish broker removal, Self-Balancing performs the following consecutive tasks:
- Validates that broker metrics are available on the controller (lead broker) running Self-Balancing (a type of dry run).
- Places replica exclusions on the brokers to be removed. This prevents new replicas from being placed on those brokers during the removal process.
- Computes the partition reassignment (rebalancing) plan.
- Executes the plan to reassign partitions, and moves topic data.
- Shuts down the broker(s).
- Removes the exclusions.
The last two phases (broker shutdown and remove exclusions) are optional. These are performed only if shouldShutdown=true
,
which is the default.
Tip
- Starting with Confluent Platform 7.0, broker removal has been improved under the hood. As a result, Self-Balancing now supports removing multiple brokers at once, no under-replicated partitions will persist during broker removal, and broker shutdown is only effected after all replicas are removed.
- If a broker removal fails due to insufficient metrics, the cluster may have under-replicated partitions . This resolves automatically once Self-Balancing initializes.
Broker removal can fail if attempted while Self-Balancing is still initializing, as described in these scenarios:
Scenario 1: Any attempt to remove a broker during initialization will fail if there are insufficient metrics (at phase 1). The solution is to wait for Self-Balancing to initialize (about 30 minutes), and retry the broker removal.
Scenario 2: An attempt to remove the controller during initialization can fail at phase 3. In this scenario, Self-Balancing is running long enough to collect sufficient metrics to compute a plan on the lead broker, so it passes phases 1 and 2 and shuts down the broker you want to remove. But Self-Balancing metrics collection has not progressed long enough to provide Self-Balancing metrics on all the other brokers, one of which will be the failover controller. This causes the remove operation to stall out and, since the controller was shut down (phase 2) it will not be available on Control Center. The solution here is similar to scenario 1; wait and retry the broker removal, but from the command line.
For full details, see Broker removal attempt fails during Self-Balancing initialization.
Self-Balancing initialization¶
When you enable Self-Balancing on a running cluster, or start a cluster with Self-Balancing already enabled, it requires about 30 minutes to initialize. Initialization consists of metrics collection by the lead broker across all brokers in the cluster. Self-Balancing must have sufficient metrics on the cluster in order to compute rebalancing plans.
Therefore, metrics collection is a prerequisite to performing Self-Balancing tasks. If broker removal is attempted during the initialization process, the action will fail. The solution is to retry the broker removal after Self-Balancing is initialized. You may have to do this from the command line if the broker you tried to remove is the controller because it may not be available on Control Center. For full details, see Broker removal phases and Broker removal attempt fails during Self-Balancing initialization.
Broker removal task priority¶
Self-Balancing elevates the relative priority of a broker removal request above other tasks like adding a broker or performing a normal rebalance:
- Priority 1: Remove broker
- Priority 2: Add broker
- Priority 3: Normal rebalance
An ongoing broker removal request takes priority over a follow-on request. Self-Balancing will reject a new “remove broker” request while another broker removal task is in progress.
If a new “add broker” request is received while another “add broker” task is in progress, Self-Balancing will merge the new request with the in-progress task.