Scale Confluent Platform Clusters and Balance Data using Confluent for Kubernetes

For meeting changing workload demands and ensuring high availability and performance, you can use Confluent for Kubernetes (CFK) to easily scale your Apache Kafka® clusters and other Confluent Platform components up or down. CFK leverages the Self-Balancing Clusters (Self-Balancing) features to automate data rebalancing and optimize resource utilization. This guide provides the requirements and workflows for scaling cluster size and data replication in CFK.

Scale Kafka cluster

At a high level, adding brokers to a cluster involves a few key steps:

  • Define the configuration for each of the new brokers.

  • Provision storage, networking, and compute resources to the brokers.

  • Start the brokers with the defined configurations and provisioned resources.

  • Reassign partitions across the cluster so that the new brokers share the load and the cluster’s overall performance improves.

To automate the above process, Confluent for Kubernetes (CFK) leverages Self-Balancing, which is enabled by default with CFK.

If you need to manually disable or re-enable Self-Balancing, see Disable or re-configure Self-Balancing for the steps.

Requirements and considerations

Review the following requirements, considerations, and limitations for scaling Kafka clusters and managing data replication in CFK.

When scaling down (reducing brokers)
  • Important: When scaling down a Kafka cluster using CFK, do not perform any other operations on the cluster until the shrink process has completed successfully.

  • Scaling down removes brokers starting from the highest numbered broker.

  • You cannot remove brokers from the middle of a cluster. For example, in a 6-broker setup, you cannot remove broker 2.

  • Brokers experiencing faults can be stripped of replicas using the broker removal command with the --no-shutdown flag, but they remain in the cluster until further mitigation is applied.

For detailed broker removal procedures, see Self-Balancing Clusters Configuration Options.

When scaling up (adding brokers)

Expansion always adds brokers starting from the next highest number after the current highest numbered broker. For example, scaling from 6 to 7 brokers will create broker 6 (if brokers are numbered 0-5).

Data replication throttling configuration

When Kafka clusters require data recovery through replication, you can fine-tune throttling based on your cluster configuration:

Property

Description

Default Value

Recommended Value

follower.replication.throttled.rate

Upper bound (bytes/sec) on replication traffic for the followers enumerated in the property, follower.replication.throttled.replicas (for each topic).

unlimited

This property can only be set dynamically. Keep the limit above 1MB/sec for accurate behavior.

leader.replication.throttled.rate

Upper bound (bytes/sec) on replication traffic for leaders enumerated in the property leader.replication.throttled.replicas (for each topic).

unlimited

This property can only be set dynamically. Keep the limit above 1MB/sec for accurate behavior.

confluent.broker.limit.producer.bytes.per.second

Broker-wide produce bandwidth limit (bytes/sec). The broker will throttle produce requests to ensure that the cumulative bandwidth does not exceed this limit.

unlimited

Set to 20MB/sec for producer bandwidth.

confluent.broker.limit.consumer.bytes.per.second

Broker-wide consumer bandwidth limit (bytes/sec). The broker will throttle fetch requests to ensure that the cumulative bandwidth does not exceed this limit.

unlimited

Set to 60MB/sec for consumer bandwidth.

confluent.quota.tenant.broker.max.producer.rate

Maximum producer quota (bytes/sec) per tenant per broker.

This maximum is universal and applies to both followers and leaders. However, only brokers hosting at least one leader of the tenant’s topic partitions ever reach the maximum quota.

12.5 MiB

Default cap on tenant quota that can be assigned to a single broker for produce and consume quotas: 12.5MB/sec each. With 100MB/sec cluster-wide tenant quota, a tenant needs to send load to at least 8 partitions (on 8 brokers) to get the full produce and consume quota.

confluent.quota.tenant.broker.max.consumer.rate

Maximum consumer quota (bytes/sec) per tenant per broker.

This maximum is universal and applies to both followers and leaders. However, only brokers hosting at least one leader of the tenant’s topic partitions ever reach the maximum quota.

12.5 MiB

Default cap on tenant quota that can be assigned to a single broker for produce and consume quotas: 12.5MB/sec each. With 100MB/sec cluster-wide tenant quota, a tenant needs to send load to at least 8 partitions (on 8 brokers) to get the full produce and consume quota.

Self-Balancing clusters throttling configuration

Important

Do not increase the disk size only for a particular broker. Disks should be the same size across brokers as a precondition for Self-Balancing. A discrepancy of disk size among the brokers causes the DiskUsageDistributionGoal violation.

For Self-Balancing Clusters, configure the following throttling properties. All the property names are prefixed with confluent.balancer, such as confluent.balancer.producer.in.max.bytes.per.second.

Property

Description

Default Value

Recommended Value

producer.in.max.bytes.per.second

Upper limit for producer incoming bytes/sec/broker. Self-Balancing will attempt to keep incoming data throughput below this limit.

9223372036854775807

If the property is set higher than confluent.broker.limit.producer.bytes.per.second, throttling will be set earlier, and it may not reach this limit.

If it is set lower than the confluent.broker.limit.producer.bytes.per.second limit, it will make Self-Balancing attempt to keep the producer incoming traffic under the proposed value, and you will have a certain headroom reserved, which can be used for cluster maintenance tasks without affecting their primary produce traffic. However, setting a value that is just 10-15% above the produce traffic seen by the cluster will lead to replica movements.

Setting it at least 20% under the confluent.broker.limit.producer.bytes.per.second limit is advised to avoid unnecessary movements while still providing a safe upper limit without hitting a throttle.

consumer.out.max.bytes.per.second

Upper limit for consumer outgoing bytes/sec/leader broker.

Self-Balancing will attempt to keep outgoing data throughput below this limit.

Note that Fetch From Follower traffic is not accounted for.

9223372036854775807

It has the same semantics and behavior as confluent.balancer.producer.in.max.bytes.per.second, but it is associated with confluent.broker.limit.consumer.bytes.per.second.

replication.in.max.bytes.per.second

Upper limit for replication incoming bytes/sec/broker.

Self-Balancing will attempt to keep incoming replication traffic below this limit.

9223372036854775807

The replication inbound traffic has the same caveats as confluent.balancer.producer.in.max.bytes.per.second. However, you must be conscious of the replication factor of your cluster and ensure the value is not less than (replication factor - 1) * confluent.balancer.producer.in.max.bytes.per.second because ISR traffic is accounted for here.

Scale up Kafka cluster

To scale up a Kafka cluster:

  1. Increase the number of Kafka replicas using one of the following options:

    • Use the kubectl scale command:

      kubectl scale kafka <Kafka-CR-name> --replicas=N
      
    • Increase the number of Kafka replicas in the Kafka custom resource (CR) and apply the new setting with the kubectl apply command:

      spec:
        replicas:
      
  2. Ensure that proper DNS records are configured for the new brokers, and ensure that the CFK can resolve the new broker hostname, using a command such as nslookup.

    If you are using hosts file instead of a DNS service, update hosts file with the new brokers information. For example:

    1. Get the new broker IP addresses:

      kubectl get services -n <namespace>
      
    2. Refer to the existing broker host names with the broker prefix, and derive the hostnames of the new brokers.

    3. Add the new broker hosts to the /etc/hosts file, and inject the updated file to the CFK pod as described in Adding entries to Pod /etc/hosts.

Scale down Kafka cluster

With Confluent Platform 7.x and later, you can have CFK scale down Kafka clusters.

CFK leverages the Self-Balancing feature to automate the shrinking process. Self-Balancing is enabled by default with CFK.

Note

Due to a known limitation in Kubernetes scheduler, in multi-rack / availability zone (AZ) clusters, Kafka cluster shrinking is not supported.

To have CFK automatically scale down your cluster, the following requirements must be satisfied:

  • Set up the Admin REST Class as described in Manage Confluent Admin REST Class for Confluent Platform Using Confluent for Kubernetes. CFK uses the KafkaRestClass resource in the namespace where the Kafka cluster is running.

  • If the Admin REST Class is set up with the basic authentication for the REST client, the first user listed in basic.txt will be used to shrink the cluster. See Basic authentication for details on basic.txt.

    This first user must have a role that is listed under spec.services.kafkaRest.authentication.basic.roles in the Kafka custom resource (CR).

  • If the Kafka brokers use the DirectoryPathInContainer property to specify the credentials to authenticate to Confluent Admin REST Class, you need to set up Vault and add the required Vault annotations to the CFK Helm values before you deploy CFK.

    If updating an existing CFK pod, you need to roll the CFK pod after updating the CFK Helm values. See Provide secrets for Confluent Platform operations without CRs for details.

  • When performing the scale down/shrink operation with CFK, keep the webhooks disabled. To disable the webhooks, there are two options:

    Webhooks would cause an issue with the cluster shrink workflow. Keeping webhooks enabled and performing the cluster shrink at the same time causes a mismatch between the expected replica and the actual replica, which results in a stuck reconcile loop.

To automatically scale down a Kafka cluster:

  1. Make sure the Kafka cluster is stable.

  2. Decrease the number of brokers in the Kafka CR and apply the change using the kubectl apply command:

    spec:
      replicas:
    

    replicas: should not be set to less than 3. CFK sets a default replication factor of 3 for all Kafka topics.

    CFK triggers the workflow to shrink the Kafka cluster according to the value of replicas updated in the Kafka custom resource (CR).

Disable scaling down of Kafka cluster

The Kafka cluster shrinking feature is enabled by default.

To disable the cluster shrinking feature, apply the annotation, platform.confluent.io/enable-shrink=false, to the Kafka cluster:

kubectl annotate Kafka <Kafka CR name> -n <namespace> platform.confluent.io/enable-shrink=false

Disable or re-configure Self-Balancing

The Self-Balancing feature is enabled by default in Confluent for Kubernetes.

To balance the load across the cluster whenever an imbalance is detected, set confluent.balancer.heal.uneven.load.trigger to ANY_UNEVEN_LOAD. The default is EMPTY_BROKER.

kafka:
  configOverrides:
    server:
      - confluent.balancer.heal.uneven.load.trigger=ANY_UNEVEN_LOAD

For a complete list of available settings you can use to control Self-Balancing, see Configuration Options and Commands for Self-Balancing Clusters.

Scale other Confluent Platform components

Other Confluent Platform component clusters, namely ksqlDB, Connect, and REST Proxy clusters, can be scaled up or down depending on the load. Schema Registry does not have use cases for scaling.

When scaling REST Proxy, it is important that you use either a sticky load balancer or session affinity. For details, see Session affinity for REST Proxy consumers.

Scale up or down other Confluent Platform components using one of the following options:

  • Use the kubectl scale command:

    kubectl scale <CP-component-CR-kind> <component-CR-name> --replicas=N
    
  • Update the number of replicas in the component custom resource (CR) and apply the new setting with the kubectl apply command:

    kind: <CP component>
    spec:
      replicas: <new number of replicas>