Cordon Brokers and Log Directories in Confluent Platform

Cordoning marks a Apache Kafka® broker or an individual log directory as off-limits for new partition placement while the existing partitions on that broker or log directory keep serving reads and writes. Cordoning is the first step when you decommission a broker or a log directory, because it prevents the controller from assigning new partitions to a resource that you are about to remove.

Cordoning works like kubectl cordon. The broker stays in the cluster and continues to serve its current partitions, but the controller does not place any new partitions on a cordoned broker or log directory.

If your cluster uses Self-Balancing Clusters or Confluent Auto Data Balancer, do not use cordoning. These features manage replica placement and do not account for cordoned brokers or log directories, so they can place replicas on cordoned resources. For more information, see Limitations and known issues and Auto Data Balancing in Confluent Platform. To remove a broker from a cluster that uses Self-Balancing Clusters, see Remove a broker.

Important

Broker and log directory cordoning requires the Confluent Platform 8.3 metadata version or later. Until the metadata version supports cordoning, the cordoned.log.dirs configuration has no effect, and an attempt to set it dynamically is rejected with an InvalidConfigurationException. To finalize the metadata version, see Steps for upgrading to 8.3.x.

cordoned.log.dirs is the broker configuration that controls cordoning, and it takes effect only at the Confluent Platform 8.3 metadata version or later. Set cordoned.log.dirs to a comma-separated list of log directory paths to cordon, or set it to * to cordon all log directories on the broker. Each path that you list in cordoned.log.dirs must also be configured in the broker’s log.dirs or log.dir setting. For the broker configuration reference, see Kafka Broker and Controller Configuration Reference for Confluent Platform.

Use the kafka-configs tool to set cordoned.log.dirs dynamically through the Admin API, or set it statically in the broker’s server.properties file, where it takes effect on the next broker restart. You can check whether a log directory is cordoned through the LogDirDescription.isCordoned() method of the Admin API.

When a broker or log directory is cordoned, the controller excludes it from placement of the replicas for new partitions. If the remaining brokers and log directories cannot satisfy the replication factor, topic and partition creation fails with an InvalidReplicationFactorException.

Decommission a broker

To decommission a broker, cordon it, move its partitions to other brokers, shut it down, and then unregister it.

  1. Cordon the broker by setting cordoned.log.dirs to * through the Admin API. The following example cordons broker 1:

    bin/kafka-configs --bootstrap-server localhost:9092 --alter \
      --add-config cordoned.log.dirs="*" --entity-type brokers --entity-name 1
    

    The command returns the following output:

    Completed updating config for broker 1.
    
  2. Reassign all partitions from the cordoned broker to the other brokers in the cluster. The partition reassignment tool cannot generate a reassignment plan for decommissioning automatically, so you must create a plan that moves every replica hosted on the broker to the remaining brokers. For the reassignment procedure, see Scaling the cluster (adding a node to a Kafka cluster).

    Move partitions only to brokers and log directories that are not cordoned. A move that targets a cordoned log directory fails with an InvalidReplicaAssignmentException and the message Log directory <dir> is cordoned.

  3. After the reassignment finishes, shut down the broker.

  4. Unregister the broker to remove it from the cluster. The following example unregisters broker 1:

    bin/kafka-cluster unregister --bootstrap-server localhost:9092 --id 1
    

Decommission a log directory

To decommission a single log directory on a broker, cordon the directory, move its partitions elsewhere, shut down the broker, uncordon the directory, remove it from the broker configuration, and then restart the broker.

  1. Cordon the log directory by setting cordoned.log.dirs to the directory path through the Admin API. The following example cordons /data/dir1 on broker 1:

    bin/kafka-configs --bootstrap-server localhost:9092 --alter \
      --add-config cordoned.log.dirs=/data/dir1 --entity-type brokers --entity-name 1
    

    The command returns the following output:

    Completed updating config for broker 1.
    
  2. Reassign all partitions from the cordoned log directory to other log directories or brokers in the cluster. The partition reassignment tool cannot generate a reassignment plan for decommissioning a log directory automatically, so you must create a plan that moves every replica hosted on the log directory. For the reassignment procedure, see Scaling the cluster (adding a node to a Kafka cluster).

    Choose target log directories that are not cordoned. A move that targets a cordoned log directory fails with an InvalidReplicaAssignmentException and the message Log directory <dir> is cordoned.

  3. After the reassignment finishes, shut down the broker.

  4. Uncordon the log directory. Because the broker that hosts the directory is offline, use the --bootstrap-controller option to reach the controller directly. The following example uncordons the log directory on broker 1:

    bin/kafka-configs --bootstrap-controller localhost:9093 --alter \
      --delete-config cordoned.log.dirs --entity-type brokers --entity-name 1
    

    The command returns the following output:

    Completed updating config for broker 1.
    
  5. Remove the decommissioned log directory from the log.dirs broker configuration. For example, if the broker configuration contained the following setting:

    log.dirs=/data/dir1,/data/dir2
    

    Update the setting to remove the decommissioned directory:

    log.dirs=/data/dir2
    
  6. Restart the broker.