Self-Balancing Clusters Demo (Docker)

This quick start demonstrates the capabilities of Self-Balancing Clusters on Confluent Server using Docker containers. Self-Balancing Clusters shift data across brokers to create even workloads according to a variety of distribution and performance goals. By the end of this demo, you will have successfully run Self-Balancing Clusters to rebalance data after adding and removing brokers.

About the demo

Self-balancing simplifies the management of Kafka clusters in the following ways:

  • When the load on the cluster is unevenly distributed, Self-Balancing automatically rebalances partitions to optimize performance
  • When a new broker is added to the cluster, Self-Balancing automatically fills it with partitions
  • When an operator wants to remove a broker, she can call a Self-Balancing API to shut down the broker and drain the partitions from it
  • When a broker has been down for a certain amount of time, Self-Balancing will automatically reassign the partitions to other brokers

This quick start provides hands-on examples for each of the above scenarios.

Setup and what’s included

This demo pulls the following Docker images:

  • ZooKeeper
  • Kafka
  • Confluent Control Center (Optional to also try out the UI in addition to command line.)
Prerequisites:
  • Docker
    • Docker version 1.11 or later is installed and running.
    • Docker Compose is installed. Docker Compose is installed by default with Docker for Mac.
    • Docker memory is allocated minimally at 6 GB. When using Docker Desktop for Mac, the default Docker memory allocation is 2 GB. You can change the default allocation to 6 GB in Docker. Navigate to Preferences > Resources > Advanced.
  • Internet connectivity
  • Operating System currently supported by Confluent Platform
  • Networking and Kafka on Docker
    • Configure your hosts and ports to allow both internal and external components to the Docker network to communicate.
    • Configure your hostnames and ports to allow the Docker network’s internal and external components to communicate.
  • (Optional) curl.
    • In the steps below, you will download a Docker Compose file. You can download this file any way you like, but the instructions below provide the explicit curl command you can use to download the file.
  • For curl commands that call APIs (used in the Shrinkage scenario), you may want to use jq along with the --silent flag to get nicely formatted output.

Start the services

Clone the Confluent demo-scene repository from GitHub and work in the demo-scene/self-balancing subdirectory, which provides the sample code you will compile and run in this tutorial.

To clone with SSH:

git clone git@github.com:confluentinc/demo-scene.git

To clone with HTTPS:

git clone https://github.com/confluentinc/demo-scene.git

Now, navigate to the newly-created self-balancing directory.

cd self-balancing/

Run the demo scenarios

The following exercises describe how to create various types of cluster imbalances modeled on typical, real-world scenarios. For each case, you will use Self-Balancing to monitor and auto balance the cluster.

Uneven Load

Create a topic with topic data distributed unevenly across the brokers.

  1. Run docker-compose in the foreground to start ZooKeeper, 3 Confluent Platform brokers, and Control Center. Running this in the foreground allows you to monitor the logs for interesting information about the state of the cluster.

    docker-compose -f kafka-0-1-2.yml up
    
  2. Open a new command window and then create a topic with explicit replica assignments that force a data distribution with no replicas on broker 2 to create an uneven load.

    kafka-topics \
    --bootstrap-server localhost:9092 \
    --create \
    --topic sbc \
    --replica-assignment 0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1
    
  3. Produce data at a rate of about 1MB/second per hour.

    kafka-producer-perf-test \
    --producer-props bootstrap.servers=localhost:9092 \
    --topic sbc \
    --record-size 1000 \
    --throughput 1000 \
    --num-records 3600000
    
  4. Start watching changes in the topic.

    watch kafka-topics \
        --bootstrap-server localhost:9092 \
        --describe \
        --topic sbc
    

    This watch will show changes in replica assignments, in-sync replicas, and relevant information such as replication throttling details.

    Optionally, use Control Center to watch the same changes. Control Center is available at http://localhost:9021/ in your web browser.

  5. Wait for Self-Balancing to start the rebalance.

    Self-Balancing samples data about disk use, leader count, partition count, network throughput and other variables. It then aggregates this data to make informed decisions about how to reassign partitions.

    Important

    • Self-Balancing needs between 10 and 20 minutes from startup to collect enough samples to generate a rebalancing plan (if one is needed).
    • Self-Balancing invalidates previous samples when the number of partitions in the cluster changes materially since they may not accurately reflect the current state of the cluster.

    While Self-Balancing is still sampling, the following message appears on the logs periodically:

    INFO Skipping proposal precomputing because load monitor does not have enough snapshots.
    

    After the sampling is completed, the rebalancing process begins.

    Monitor the logs and the watch kafka-topics command to observe the changes as they happen.

Expansion

Add two more brokers to the cluster and watch Self-Balancing fill them with partitions.

  1. Run docker-compose in a new command window with the following files to add two brokers to the previous setup (do not stop any of the processes from the first part of the tutorial).

    docker-compose -f kafka-0-1-2.yml -f kafka-3-4.yml up --no-recreate
    
  2. Watch Self-Balancing rebalance the cluster.

    Self-Balancing should be able to use the data it has already sampled, and the rebalance should kick off almost immediately.

    The logs should reflect how Self-Balancing is also detecting the fact that you are rebalancing to a new broker, and not just the fact that the cluster is out of balance. This is important because Self-Balancing offers a configuration parameter called confluent.balancer.heal.uneven.load.trigger which you can set to either ANY_UNEVEN_LOAD or EMPTY_BROKER.

    • EMPTY_BROKER will only rebalance when Self-Balancing finds an empty broker (this expansion scenario)
    • ANY_UNEVEN_LOAD will rebalance when the load is uneven, regardless of the cause (the previous uneven load scenario).

Shrinkage

Remove a broker from the cluster and watch Self-Balancing shut it down and drain the partitions.

  1. Trigger a broker removal.

    For this example, use REST API for Apache Kafka®.

    First, the following curl command will return a collection with all the clusters the REST API can address.

    curl localhost:8090/kafka/v3/clusters
    

    In this case, there is only one cluster. Note the value of data[0].cluster_id as the ID of your cluster.

    You have several options for triggering a broker removal.

    • The following command will trigger the removal of broker 3. (Replace <cluster_id> in the example with the cluster ID from the previous step.)

      curl -X DELETE localhost:8090/kafka/v3/clusters/:<cluster_id>/brokers/3
      

      This should return 202 Accepted.

    • Alternatively, you can perform this step using kafka-remove-brokers, available as a part of Self-Balancing starting with Confluent Platform 6.0.0.

      kafka-remove-brokers \
        --bootstrap-server localhost:9092 \
        --delete \
        --broker-id 3
      
    • Finally, you can also remove a broker through the Confluent Control Center from the Brokers Overview page.

  2. Watch Self-Balancing remove the broker.

    Self-Balancing should be able to use the data it has already sampled, and the removal should kick off almost immediately. The removal, however, is a long-running process (because it involves data movement).

    Run the following command to get a partition_reassignment_status and a broker_shutdown_status while the removal operation is in progress. (Replace <cluster_id> with your cluster ID.)

    curl localhost:8090/kafka/v3/clusters/:<cluster_id>/remove-broker-tasks/3
    

    As an alternative monitoring option, you can plug in the --describe flag to kafka-remove-brokers command. For example:

    kafka-remove-brokers \
      --bootstrap-server localhost:9092 \
      --describe \
      --broker-id 3
    

Broker failure

Simulate a broker failure and watch Self-Balancing address this condition.

  1. Stop a container. For example, to stop broker 4:

    docker container stop $(docker container ls -q --filter name=kafka4)
    
  2. Watch Self-Balancing create new replicas.

    Self-Balancing should be able to use the data it has already sampled, and the creation of new replicas should kick off almost immediately.

    Note

    For this demo, confluent.balancer.heal.broker.failure.threshold.ms is set to 5000, meaning that Self-Balancing will consider the broker dead after only 5 seconds. This is not suitable for production environments, where typically timeouts should be set to between 30 minutes and to 1 hour (or whatever is most reasonable for your environment), but this is helpful here so that the demo runs faster.

Teardown

When you are done working with Docker, you can stop and remove Docker containers and images as needed.

  1. Use docker container ls to view a list of all Docker running containers.
  2. Use docker container stop to stop running containers.
  3. Run docker images ls to list images.
  4. Run docker image rm to remove images you do not want to keep on your system.

You can rebuild and restart the containers at any time using the docker-compose up -d command. (If you have deleted the images, this command will pull them down again.)

For more information, refer to the official Docker documentation.