Configuration Options and Commands for Self-Balancing Clusters

This section covers Self-Balancing configuration options and commands, and provides examples.

See Self-Balancing Tutorial for an example of how to set up and run a quick test of Self-Balancing by removing a broker and monitoring the rebalance. The example provides guidance on proper configuration of replication factors and demos the Self-Balancing specific command, kafka-remove-brokers.

Self-Balancing Configurations on the Brokers

The following configuration options specific to Self-Balancing Clusters are available in $CONFLUENT_HOME/etc/kafka/server.properties.

If you are using Self-Balancing in combination with Multi-Region Clusters , you must also specify the rack location for each broker with broker rack in each respective server.properties file. To learn more, see Replica Placement and Multi-Region Clusters.

confluent.balancer.enable

Measures the load across the Kafka cluster and rebalances data as needed, depending on multiple goals and factors.

  • Type: boolean
  • Default: false
  • Importance: high

Tip

  • In the example server.properties file that ships with Confluent Platform, confluent.balancer.enable is set to true, which means Self-Balancing is on. So if you want Self-Balancing on, you do not need to reconfigure this. (If this is specified as false, or not explicitly specified at all in the properties file, the value is inferred to be false, off.)
  • Similarly to the first point, in the Confluent Platform Docker images which you can use to install Confluent Platform, confluent.balancer.enable is set to true, which means Self-Balancing is on.
  • This is a dynamic option option. You can toggle Self-Balancing on or off while the cluster is running.
  • Do not disable Self-Balancing while an add or remove broker operation is in progress; wait until the add or remove completes. Disabling Self-Balancing during either operation will fence off the broker from future partition distribution.

confluent.balancer.heal.uneven.load.trigger

Sets the conditions upon which to trigger a rebalance.

Valid values are:

  • EMPTY_BROKER: Move data to rebalance the cluster only when an empty broker (one with no partitions on it) is added to the cluster.
  • ANY_UNEVEN_LOAD: Balance the load across the cluster whenever an imbalance is detected. This includes empty brokers and any other uneven load condition caused by ongoing cluster activity. For example, load peaks on specific partitions could trigger a rebalance.
  • Type: string
  • Default: EMPTY_BROKER
  • Importance: high

Tip

This is a dynamic option. You can change the trigger condition for Self-Balancing while the cluster is running.

confluent.balancer.heal.broker.failure.threshold.ms

Sets the default time to declare a broker permanently failed.

When a broker disappears from the cluster, Self-Balancing will wait this length of time before declaring the broker permanently failed and rebalancing its data onto other brokers. This is independent of what value the confluent.balancer.heal.uneven.load.trigger value is set to.

The default is 1 hour (3600000 ms).

  • To turn off broker failure detection, edit $CONFLUENT_HOME/etc/kafka/server.properties to uncomment the line that sets confluent.balancer.heal.broker.failure.threshold.ms to -1.
  • To adjust the threshold for the duration of time before a broker is declared “failed”, uncomment the line for confluent.balancer.heal.broker.failure.threshold.ms and change its value to a positive number of your choosing.
  • Type: int
  • Default: 3600000
  • Importance: medium

confluent.balancer.throttle.bytes.per.second

Specifies the maximum network bandwidth available for use by data balancing operations. The value is specified in bytes/sec/broker.

The default is 10MB per second.

  • Type: int
  • Default: 10485760 (10MB/sec)
  • Importance: medium

Tip

This is a dynamic option option. You can adjust the throttle while the cluster is running.

How to choose a throttle setting

Self-Balancing uses the Apache Kafka® replication throttle to limit the rate at which data is moved when reassigning partitions as part of a rebalance. Because each cluster’s workload and hardware capabilities are different, it is difficult to provide generally applicable guidance for choosing a throttle. Normal replication traffic counts against this throttle, so in order for a rebalance to make progress, the throttle must be larger than the highest inbound or outbound replication rate of any broker in the cluster. These values are exposed as the kafka.server:type=BrokerTopicMetrics,name=ReplicationBytesInPerSec and kafka.server:type=BrokerTopicMetrics,name=ReplicationBytesOutPerSec broker metrics.

A safe method to choose a throttle is to start with a small multiple (3 to 5x) of this maximum replication traffic, and then dynamically increase the throttle in small increments, while monitoring cluster health. Produce request latency is a good metric to monitor, because it will increase if too much network throughput is used for reassignment traffic or if too much data is being written to disk as partitions are reassigned. This starting value is a conservative one, and for some clusters, a much larger throttle may be safe.

Self-Balancing automatically removes the replication throttle when it is not reassigning partitions, so as a cluster administrator you do not need to manually remove it.

confluent.balancer.disk.max.load

Limits a specified fraction of the log disk (0-1.0) to be used before rebalancing.

The default is 85% of the log disk.

  • Type: int
  • Default: 0.85
  • Importance: medium

confluent.balancer.disk.max.replicas

Specifies a maximum number of replicas allowed per broker.

The default is 10,000 replicas, which is also the maximum allowed. The recommended value for this option is 4,000 replicas/partitions per broker.

  • Type: int
  • Default: 10000
  • Importance: medium

confluent.balancer.exclude.topic.names

Specifies topics that should not be moved by Self-Balancing. You can specify multiple topics in a comma-separated list.

For example, the following configuration would exclude the topics pizza and popcorn:

confluent.balancer.exclude.topic.names=pizza,popcorn
  • Type: string
  • Default: “”
  • Importance: medium

Tip

  • Removal operations always move topics, regardless of this setting.
  • As a best practice, keep excluded topics to a minimum, and use this configuration only as necessary. Excluding too many topics can interfere with Self-Balancing. To learn more, see Too many excluded topics causes problems with Self-Balancing in Troubleshooting.

confluent.balancer.exclude.topic.prefixes

Specifies topics that should not be moved by Self-Balancing based on topic prefixes. You can specify multiple topic prefixes in a comma-separated list.

For example, the following configuration would exclude all topics prefixed with pizza.sales.ny and pizza.sales.tx:

confluent.balancer.exclude.topic.prefixes=pizza.sales.ny,pizza.sales.tx
  • Type: string
  • Default: “”
  • Importance: medium

Tip

  • Removal operations always move topics, regardless of this setting.
  • As a best practice, keep excluded topics to a minimum, and use this configuration only as necessary. Excluding too many topics can interfere with Self-Balancing. To learn more, see Too many excluded topics causes problems with Self-Balancing in Troubleshooting.

confluent.balancer.topic.replication.factor

Specifies the replication factor for the topics the Self-Balancing uses to store internal state.

The default value is 3.

  • Type: int
  • Default: 3
  • Importance: medium

Note

  • For anything other than development testing, a value greater than 1 is recommended for confluent.balancer.topic.replication.factor to ensure availability.
  • Replication factors can never be greater than the total number of brokers (regardless of Self-Balancing).
  • When Self-Balancing is enabled, the value for confluent.balancer.topic.replication.factor and all other instances of replication.factor in server.properties must be set to a value greater than 1, but less than the total number of brokers in the cluster. If the value for a replication factor is equal to the total number of brokers deployed and a broker goes down or is removed, the replication factor will prevent rebalancing.

Self-Balancing Internal Topics

Self-Balancing creates the following system or internal topics, which it uses to manage cluster auto-balancing.

  • _confluent_balancer_api_state: Stores state information used by the Self-Balancing APIs.
  • _confluent_balancer_partition_samples: Stores partition metrics.
  • _confluent_balancer_broker_samples: Stores broker metrics.

Additionally, Self-Balancing requires metrics on cluster performance from the Confluent Telemetry Reporter, which is enabled by default (confluent.reporters.telemetry.auto.enable = true) when Self-Balancing is enabled. Metrics needed specifically for Self-Balancing on processing loads, message queue size, response times, replication data, and more are sent to the internal topic _confluent-telemetry-metrics. Self-Balancing reads from this internal topic and uses the metrics in its balancing algorithm. With Self-Balancing running, _confluent-telemetry-metrics can collect a large amount of data in a short time. The default retention period for internal topics is 3 days, however you can use kafka-topics --config to modify this. A retention period of 1 day is typically sufficient for this topic with regards to Self-Balancing.

By convention, Confluent Platform system topics are prefixed with an underscore. Self-Balancing topics are prefixed with _confluent_balancer_ You can get a list of existing topics (user-created and system topics) as follows:

kafka-topics --list --bootstrap-server localhost:9092

Get detailed information on all topics or a specific topic with the --describe option:

kafka-topics --describe --bootstrap-server localhost:9092
kafka-topics --describe --topic <topic> --bootstrap-server localhost:9092

For example, running kafka-topics --describe on the _confluent_balancer_partition_samples topic results in output similar to the following.

Topic: _confluent_balancer_partition_samples PartitionCount: 32      ReplicationFactor: 2    Configs: cleanup.policy=delete,segment.bytes=1073741824,retention.ms=3600000
Topic: _confluent_balancer_partition_samples Partition: 0    Leader: 0       Replicas: 0,2   Isr: 0,2        Offline:
Topic: _confluent_balancer_partition_samples Partition: 1    Leader: 4       Replicas: 4,3   Isr: 4,3        Offline:
Topic: _confluent_balancer_partition_samples Partition: 2    Leader: 1       Replicas: 1,0   Isr: 1,0        Offline:
Topic: _confluent_balancer_partition_samples Partition: 3    Leader: 2       Replicas: 2,4   Isr: 2,4        Offline:
Topic: _confluent_balancer_partition_samples Partition: 4    Leader: 3       Replicas: 3,1   Isr: 3,1        Offline:
Topic: _confluent_balancer_partition_samples Partition: 5    Leader: 0       Replicas: 0,3   Isr: 0,3        Offline:
Topic: _confluent_balancer_partition_samples Partition: 6    Leader: 4       Replicas: 4,0   Isr: 4,0        Offline:
Topic: _confluent_balancer_partition_samples Partition: 7    Leader: 1       Replicas: 1,4   Isr: 1,4        Offline:
...

Required Configurations for Control Center

Self-Balancing requires embedded Confluent REST Proxy to communicate with Confluent Control Center monitoring.

Configure REST Endpoints in the Control Center properties file

If you want to use Control Center with Self-Balancing for Configuration and Monitoring, you must configure the Control Center cluster with REST endpoints to enable HTTP servers on the brokers. If this is not configured properly for all brokers, Self-Balancing will not be accessible from Confluent Control Center.

In the appropriate Control Center properties file, use confluent.controlcenter.streams.cprest.url to define the REST endpoints for controlcenter.cluster. The default is http://localhost:8090, as shown below.

# Kafka REST endpoint URL
confluent.controlcenter.streams.cprest.url="http://localhost:8090"

Identify the associated URL for each broker. If you have multiple brokers in the cluster, use a comma-separated list.

See also

Configure authentication for REST endpoints on Kafka brokers (Secure Setup)

Tip

  • Self-Balancing does not require the Metadata Service (MDS) or security to run, but if you want to configure security, you can get started with the following example which shows an MDS client configuration for RBAC.
  • You can use confluent.metadata.server.listeners (which will enable the Metadata Service) instead of confluent.http.server.listeners to listen for API requests. Use either confluent.metadata.server.listeners or confluent.http.server.listeners, but not both. If a listener uses HTTPS, then appropriate TLS/SSL configuration parameters must also be set. To learn more, see Admin REST APIs Configuration Options.

To run Self-Balancing in a secure setup, you must configure authentication for REST endpoints in each of the Kafka broker server.properties files. If the Kafka broker files are missing these configs, Control Center will not be able to access Self-Balancing in a secure setup.

At a minimum, you will need the following configurations.

# EmbeddedKafkaRest: HTTP Auth Configuration
kafka.rest.kafka.rest.resource.extension.class=io.confluent.kafkarest.security.KafkaRestSecurityResourceExtension
kafka.rest.rest.servlet.initializor.classes=io.confluent.common.security.jetty.initializer.InstallBearerOrBasicSecurityHandler

Here is an example of an MDS client configuration for Kafka RBAC in a broker server.properties file .

# EmbeddedKafkaRest: Kafka Client Configuration
kafka.rest.bootstrap.servers=<host:port>,<host:port>,<host:port>
kafka.rest.client.security.protocol=SASL_PLAINTEXT

# EmbeddedKafkaRest: HTTP Auth Configuration
kafka.rest.kafka.rest.resource.extension.class=io.confluent.kafkarest.security.KafkaRestSecurityResourceExtension
kafka.rest.rest.servlet.initializor.classes=io.confluent.common.security.jetty.initializer.InstallBearerOrBasicSecurityHandler
kafka.rest.public.key.path=<rbac_enabled_public_pem_path>

# EmbeddedKafkaRest: MDS Client configuration
kafka.rest.confluent.metadata.bootstrap.server.urls=<host:port>,<host:port>,<host:port>
kafka.rest.ssl.truststore.location=<truststore_location>
kafka.rest.ssl.truststore.password=<password>
kafka.rest.confluent.metadata.http.auth.credentials.provider=BASIC
kafka.rest.confluent.metadata.basic.auth.user.info=<user:password>
kafka.rest.confluent.metadata.server.urls.max.age.ms=60000
kafka.rest.client.confluent.metadata.server.urls.max.age.ms=60000

See also

Examples: Update broker configurations on the fly

To update most Self-Balancing settings, you must stop the brokers and shut down the cluster. However, the following cluster default settings are dynamic, meaning they can be performed while the cluster is running.

You can modify the values for these dynamic properties through Confluent Control Center or at the command line with the kafka-configs command and --entity-default flag, as shown in the examples below.

Tip

To run the examples, you must specify a bootstrap <host:port>. If your cluster is running on a local host, the default for --bootstrap-server is localhost:9092.

Enable or disable Self-Balancing

Use confluent.balancer.enable (turns Self-Balancing on or off).

To turn Self-Balancing on:

kafka-configs --bootstrap-server <host:port> --entity-type brokers --entity-default --alter --add-config confluent.balancer.enable=true

To turn Self-Balancing off:

kafka-configs --bootstrap-server <host:port> --entity-type brokers --entity-default --alter --add-config confluent.balancer.enable=false

Tip

Do not disable Self-Balancing while an add or remove broker operation is in progress; wait until the add or remove completes. Disabling Self-Balancing during either operation will fence off the broker from future partition distribution.

Set trigger condition for rebalance

Use confluent.balancer.heal.uneven.load.trigger to rebalance only when brokers are added or removed, or anytime for any uneven load.

Set Self-Balancing to rebalance on any uneven load (including a change in available brokers):

kafka-configs --bootstrap-server <host:port> --entity-type brokers --entity-default --alter --add-config confluent.balancer.heal.uneven.load.trigger=ANY_UNEVEN_LOAD

Set Self-Balancing to rebalance only when brokers are added or removed:

kafka-configs --bootstrap-server <host:port> --entity-type brokers --entity-default --alter --add-config confluent.balancer.heal.uneven.load.trigger=EMPTY_BROKER

Set or remove a custom throttle

Use confluent.balancer.throttle.bytes.per.second to set a custom throttle for maximum network bandwidth available for Self-Balancing or to remove a custom throttle.

To override the default throttle value for reassignment operations (10485760 or 10MB/sec) with a custom, fixed value:

kafka-configs --bootstrap-server <host:port> --entity-type brokers --entity-default --alter --add-config confluent.balancer.throttle.bytes.per.second=70485760

To disable a custom throttle:

kafka-configs --bootstrap-server <host:port> --entity-type brokers --entity-default --alter --delete-config confluent.balancer.throttle.bytes.per.second

Tip

Dynamic configurations can also be changed through Confluent REST Proxy API Reference.

Monitoring the balancer with kafka-rebalance-cluster

Starting in Confluent Enterprise 6.2.0, APIs are provided to gain visibility into the status of Self-Balancing and its operations. You can use the APIs to retrieve these statuses programmatically.

A new Kafka command, kafka-rebalance-cluster, leverages the APIs to retrieve the same information from the CLI.

Get the balancer status

The balancer status API provides visibility into the state of the Self-Balancing component itself.

Use kafka-rebalance-cluster with the --status flag to query the status of the balancer. Following are example status messages you may receive as output.

$ bin/kafka-rebalance-cluster.sh  --bootstrap-server localhost:9089 --status
Balancer status: ENABLED
$ bin/kafka-rebalance-cluster.sh  --bootstrap-server localhost:9089 --status
Balancer status: ERROR
Error description: SBC configured with multiple log directories

Get the workload optimization status (AnyUnevenLoad)

If the balancer is set to rebalance on ANY_UNEVEN_LOAD, it will automatically detect and act upon uneven distribution of workloads within a cluster. The even-cluster-load API provides visibility into whether a goal violation for workload distribution has been met and what Self-Balancing is currently doing about it. It provides more context in case balancing fails due to some internal error or user intervention.

Run kafka-rebalance-cluster with the --describe flag to get the fine-grained status of these balancing optimizations.

$ bin/kafka-rebalance-cluster.sh  --bootstrap-server localhost:9089 --describe

Following are example outputs for this command in various scenarios.

  • When the balancer is set to EMPTY_BROKER (not ANY_UNEVEN_LOAD):

    Uneven load balance status:
     Current: DISABLED
    
  • After startup, but before any uneven load balancing operation has run:

    Uneven load balance status:
     Current: STARTING
    
  • When a goal violation detection has succeeded:

    Current: BALANCED
    Last Update Time: 2021-02-26_23:09:23 UTC
    Previous: BALANCED
    Last Update Time: 2021-02-26_23:07:23 UTC
    
  • During rebalancing:

    Current: BALANCING
    Last Update Time: 2021-02-26_23:25:23 UTC
    Previous: BALANCED
    Last Update Time: 2021-02-26_23:19:23 UTC
    

kafka-remove-brokers

Confluent Enterprise 6.0.0 and newer releases include the Kafka command, kafka-remove-brokers.

Self-Balancing must be enabled for this command to work.

Tip

Flags

The kafka-remove-brokers command provides the following required and optional flags.

--bootstrap-server

The connection string for the cluster’s broker(s) in the form host:port. You can specify multiple URLs, separated by commas, to allow for failover should a broker node go down. (REQUIRED)

  • Type: string
  • Default: empty string
  • Importance: high
--delete

Remove one broker from the cluster.

  • Type: string
  • Default: “”
  • Importance: high
--describe

Describe the status of broker removal.

  • Type: string
  • Default: “”
  • Importance: medium
--broker-id

The ID of the broker to remove. (REQUIRED)

  • Type: string
  • Default: “”
  • Importance: low
--broker-ids

The IDs of the brokers to remove.

  • Type: string
  • Default: “”
  • Importance: low
--command-config

Specifies a property file containing configurations to be passed to the Admin Client. This option is used only with the --bootstrap-server option.

  • Type: string
  • Default: “”
  • Importance: low
--no-shutdown

Prevents shutdown of brokers as part of the removal operation. By default, the brokers are shut down as part of broker removal.

  • Type: boolean
  • Default: false
  • Importance: low
--version

Display Kafka version.

  • Type: string
  • Default: “”
  • Importance: low

Examples

  • The following command removes broker 2 and moves data to broker 1. (This example uses 5 total brokers.)

    ./bin/kafka-remove-brokers --bootstrap-server localhost:9092,localhost:9093,localhost:9094,localhost:9095,localhost:9096 --broker-id 1 --delete 1>&2 | grep -v SLF4J
    
  • The following command provides the status of this rebalance operation. (The --describe flag is substituted for --delete.)

    ./bin/kafka-remove-brokers --bootstrap-server localhost:9092,localhost:9093,localhost:9094,localhost:9095,localhost:9096 --broker-id 1 --describe 1>&2 | grep -v SLF4J
    

Broker Removal Phases

Understanding the broker removal process can be useful for monitoring and troubleshooting Self-Balancing. To accomplish broker removal, Self-Balancing performs the following consecutive tasks:

  1. Validates that broker metrics are available on the controller (lead broker) running Self-Balancing (a type of dry run).
  2. Places replica exclusions on the brokers to be removed. This prevents new replicas from being placed on those brokers during the removal process.
  3. Computes the partition reassignment (rebalancing) plan.
  4. Executes the plan to reassign partitions, and moves topic data.
  5. Shuts down the broker(s).
  6. Removes the exclusions.

The last two phases (broker shutdown and remove exclusions) are optional. These are performed only if shouldShutdown=true, which is the default.

Tip

  • Starting with Confluent Platform 7.0, broker removal has been improved under the hood. As a result, Self-Balancing now supports removing multiple brokers at once, no under-replicated partitions will persist during broker removal, and broker shutdown is only effected after all replicas are removed.
  • If a broker removal fails due to insufficient metrics, the cluster may have under-replicated partitions . This resolves automatically once Self-Balancing initializes.

Broker removal can fail if attempted while Self-Balancing is still initializing, as described in these scenarios:

Scenario 1: Any attempt to remove a broker during initialization will fail if there are insufficient metrics (at phase 1). The solution is to wait for Self-Balancing to initialize (about 30 minutes), and retry the broker removal.

Scenario 2: An attempt to remove the controller during initialization can fail at phase 3. In this scenario, Self-Balancing is running long enough to collect sufficient metrics to compute a plan on the lead broker, so it passes phases 1 and 2 and shuts down the broker you want to remove. But Self-Balancing metrics collection has not progressed long enough to provide Self-Balancing metrics on all the other brokers, one of which will be the failover controller. This causes the remove operation to stall out and, since the controller was shut down (phase 2) it will not be available on Control Center. The solution here is similar to scenario 1; wait and retry the broker removal, but from the command line.

For full details, see Broker removal attempt fails during Self-Balancing initialization.

Self-Balancing Initialization

When you enable Self-Balancing on a running cluster, or start a cluster with Self-Balancing already enabled, it requires about 30 minutes to initialize. Initialization consists of metrics collection by the lead broker across all brokers in the cluster. Self-Balancing must have sufficient metrics on the cluster in order to compute rebalancing plans.

Therefore, metrics collection is a prerequisite to performing Self-Balancing tasks. If broker removal is attempted during the initialization process, the action will fail. The solution is to retry the broker removal after Self-Balancing is initialized. You may have to do this from the command line if the broker you tried to remove is the controller because it may not be available on Control Center. For full details, see Broker Removal Phases and Broker removal attempt fails during Self-Balancing initialization.

Broker Removal Task Priority

Self-Balancing elevates the relative priority of a broker removal request above other tasks like adding a broker or performing a normal rebalance:

  • Priority 1: Remove broker
  • Priority 2: Add broker
  • Priority 3: Normal rebalance

An ongoing broker removal request takes priority over a follow-on request. Self-Balancing will reject a new “remove broker” request while another broker removal task is in progress.

If a new “add broker” request is received while another “add broker” task is in progress, Self-Balancing will merge the new request with the in-progress task.