KRaft Configuration Reference for Confluent Platform

This document covers hardware recommendations, configuration, debugging tools, and monitoring options for running Apache Kafka® in KRaft (pronounced craft) mode.

Note that as of Confluent Platform 7.5, ZooKeeper is deprecated for new deployments. Confluent recommends KRaft mode for new deployments.

Hardware and JVM requirements

A production KRaft server can cover a wide variety of use cases. In general, you should run KRaft on a server with similar specifications to a server running ZooKeeper. In summary, for production, this is:

  • Minimum of 4 GB of RAM
  • Dedicated CPU core should be considered when the server is shared
  • An SSD disk at least 64 GB in size is highly recommended
  • JVM heap size of at least 1 GB is recommended

For more details, see Hardware.

Configuration options

The contents of configuration files for KRaft vary depending on whether the server is a broker, controller or both. You can find the example KRaft configuration files in /etc/kafka/kraft/.

You will see three different example files in this folder after you install Confluent Platform:

  • broker.properties - An example of the settings to use when the server is a broker only.
  • controller.properties - An example of the settings to use when the server is a controller only.
  • server.properties - An example of the settings to use when the server is both a broker and a controller. This configuration is not supported for production use.

For all files, the settings in Server basics section must be included. Additional settings depend on whether the server is a broker, controller or both.

Additional settings for KRaft mode are listed in the following sections with links to the configuration reference for those properties.

Server basics

These entries must be included for a server running in KRaft mode. Following is an example of the Server Basics section of the controller.properties file for a system with three controllers. Descriptions of each entry follow.

############################# Server Basics #############################

# The role of this server. Setting this puts us in KRaft mode.
process.roles=controller

# The node id associated with this instance's roles.
node.id=1

# The connect string for the controller quorum.
controller.quorum.voters=1@controller1.example.com:9093,2@controller2.example.com:9093,3@controller3.example.com:9093
process.roles

When you operate Apache Kafka® in KRaft mode, you must set the process.roles property. This property specifies whether the server acts as a controller, broker, or both, although currently both is not supported for production workloads. In KRaft mode, specific Kafka servers are selected to be controllers, storing metadata for the cluster in the metadata log, and other servers are selected to be brokers. The servers selected to be controllers will participate in the metadata quorum. Each controller is either an active or a hot standby for the current active controller.

In a production environment, the controller quorum will be deployed on multiple nodes. This is called an ensemble. An ensemble is a set of 2n + 1 controllers where n is any number greater than 0. The odd number of controllers allows the controller quorum to perform majority elections for leadership. At any given time, there can be up to n failed servers in an ensemble and cluster will keep quorum. For example, with 3 controllers, the cluster can tolerate one controller failure. If at any time, quorum is lost, the cluster will go down. Currently, more than 3 controllers is not recommended in critical environments. If a (rare) partial network failure occurs, the cluster metadata quorum can become unavailable. This limitation will be addressed in a future release of Kafka.

  • Type: string
  • Default:
  • Importance: required for KRaft mode

process.roles can have the following values:

Value Result
Not set The server is assumed to be in ZooKeeper mode
broker The server operates only as a broker.
controller The server operates in isolated mode as a controller only.
broker,controller The server operates in combined mode, where it is both a broker and a controller. Combined mode is considered an early access feature and Confluent does not currently support combined mode for production workloads. However, combined mode can be used for local testing. For an example of combined mode, see the confluent-local Docker image.
node.id

The unique identifier for this server. Each node ID must be unique across all the servers in a particular cluster. No two servers can have the same node ID regardless of their process.roles value. This identifier replaces broker.id, which is used when operating in ZooKeeper mode.

  • Type: int
  • Default:
  • Importance: required for KRaft mode
controller.quorum.voters

A comma-separated list of quorum voters. All of the servers (controllers and brokers) in a Kafka cluster discover the quorum voters using this property, and you must identify all of the controllers by including them in the list you provide for the property.

Each controller is identified with their ID, host and port information in the format of {id}@{host}:{port}. Multiple entries are separated by commas and might look like the following:

controller.quorum.voters=1@host1:port1,2@host2:port2,3@host3:port3

The node ID supplied in the controller.quorum.voters property must match the corresponding ID on the controller servers. For example, on controller1, node.id must be set to 1. If a server is a broker only, its node ID should not appear in the controller.quorum.voters list.

  • Type: string
  • Default:
  • Importance: required for KRaft mode

Socket server settings

listeners

A comma-separated list of addresses where the socket server listens.

For controllers in isolated mode: Only controller listeners are allowed in this list when process.roles=controller, and this listener should be consistent with controller.quorum.voters value. If not configured, the host name will be equal to the value of java.net.InetAddress.getCanonicalHostName() with the PLAINTEXT listener name, and port 9092.

For controllers in combined mode, you should list the controller listeners as well as the broker listeners. For brokers: see listeners.

  • Type: string with the format listener_name://host_name:port
  • Default: If not configured, the host name will be equal to the value of java.net.InetAddress.getCanonicalHostName(), with PLAINTEXT listener name, and port 9092. Example: listeners=PLAINTEXT://your.host.name:9092
  • Importance: high
controller.listener.names

A comma-separated list of listener_name entries for listeners used by the controller. On a node with process.roles=broker, only the first listener in the list will be used by the broker. ZooKeeper-based brokers should not set this value. For KRaft controllers in isolated or combined mode, the node will listen as a KRaft controller on all listeners that are listed for this property, and each must appear in the listeners property. They shouldn’t appear in the advertised.listeners property, which is used in ZooKeeper mode.

  • Type: string
  • Default: null
  • Importance: required for KRaft mode.

Log settings

log.dirs
A comma separated list of directories under which to store log files. To override this property for metatdata logs when using a KRaft controller, see Metadata retention settings. For KRaft mode, you should currently list only one log directory as JBOD is not supported. For more information, see KIP-858: Handle JBOD broker disk failure in KRaft.
  • Type: string
  • Default: null
  • Importance: high
num.partitions

The default number of log partitions per topic. More partitions allow greater parallelism for consumption, but this will also result in more files across the brokers. Set this configuration for brokers only. This will be ignored by KRaft controllers.

  • Type: int
  • Default: 1
  • Importance: medium

Metadata retention settings

metadata.log.dir

Use to specify where the metadata log for clusters in KRaft mode is placed . If not set, the metadata log is placed in the first log directory specified in the log.dirs property.

  • Type: string
  • Default: null
  • Importance: high
metadata.max.idle.interval.ms

Sets how often the active controller writes no-op records to the metadata partition. The default is 500 milliseconds. If the value is 0, no no-op records are appended to the metadata partition.

  • Type: int
  • Default: 500
  • importance: low

Settings for other components

When you use KRaft instead of ZooKeeper, you must use current, non-deprecated configurations settings. The settings to use are described in the following table.

Feature Allowed with ZooKeeper Required with KRaft
Clients and services zookeeper.connect=zookeeper:2181 bootstrap.servers=broker:9092
Schema Registry kafkastore.connection.url=zookeeper:2181 kafkastore.bootstrap.servers=broker:9092
Administrative tools kafka-topics --zookeeper zookeeper:2181 (deprecated)
kafka-topics --bootstrap-server broker:9092
--command-config properties to connect to brokers
Retrieve Kafka cluster ID zookeeper-shell zookeeper:2181 get/cluster/id From the command line, use kafka-metadata-quorum (See kafka-metadata-quorum) or confluent cluster describe --url, or view metadata.properties. or http://broker:8090 --output json

Generate and format IDs

Before you start Kafka, you must use the kafka-storage tool with the random-uuid command to generate a cluster ID for each new cluster. You only need one cluster ID, which you will use to format each node in the cluster.

bin/kafka-storage random-uuid

This results in output like the following:

q1Sh-9_ISia_zwGINzRvyQ

Then use the cluster ID to format each node in the cluster with the kafka-storage tool that is provided with Confluent Platform, and the format command like the following example.

bin/kafka-storage format -t q1Sh-9_ISia_zwGINzRvyQ -c etc/kafka/kraft/server.properties

Previously, Kafka would format blank storage directories automatically and generate a new cluster ID automatically. One reason for the change is that auto-formatting can sometimes obscure an error condition. This is particularly important for the metadata log maintained by the controller and broker servers. If a majority of the controllers were able to start with an empty log directory, a leader might be able to be elected with missing committed data.

Tools for debugging KRaft mode

Kafka provides tools to help you debug a cluster running in KRaft-mode.

Describe runtime status

You can describe the runtime state of the cluster metadata partition using the kafka-metadata-quorum tool . For example, the following command displays a summary of the metadata quorum:

bin/kafka-metadata-quorum --bootstrap-server  host1:9092 describe --status
Output might look like the following:

   ClusterId:              fMCL8kv1SWm87L_Md-I2hg
   LeaderId:               3002
   LeaderEpoch:            2
   HighWatermark:          10
   MaxFollowerLag:         0
   MaxFollowerLagTimeMs:   -1
   CurrentVoters:          [3000,3001,3002]
   CurrentObservers:       [0,1,2]

Debug log segments

The kafka-dump-log tool tool can be used to debug the log segments and snapshots for the cluster metadata directory. The tool will scan the provided files and decode the metadata records. For example, the following command decodes and prints the records in the first log segment:

bin/kafka-dump-log --cluster-metadata-decoder --files tmp/kraft-combined-logs/_cluster_metadata-0/00000000000000023946.log

Inspect the metadata partition

The kafka-metadata-shell tool tool can be used to interactively inspect the metadata cluster. The following example shows how to open the shell.

kafka-metadata-shell --directory tmp/kraft-combined-logs/_cluster-metadata-0/

The shell will load, and after you are in the shell, you can explore the contents of the metadata log and then exit.

 Loading...
 [ Kafka Metadata Shell ]
 >> ls
 brokers  configs  features  linkIds  links  shell  topicIds  topics
 >> ls /topics
 test
 >> cat /topics/test/0/data
 {
   "partitionId" : 0,
   "topicId" : "5zoAlv-xEh9xRANKXt1Lbg",
   "replicas" : [ 1 ],
   "isr" : [ 1 ],
   "removingReplicas" : null,
   "addingReplicas" : null,
   "leader" : 1,
   "leaderEpoch" : 0,
   "partitionEpoch" : 0
  }
>> exit

Monitor KRaft

Following are some JMX metrics to monitor on the controller and broker when operating in KRaft mode. Some of the metrics depend on the setting for process.roles.

For more broker metrics, see Broker metrics. For more information, see KRaft monitoring.

KRaft quorum monitoring metrics

kafka.server:type=raft-metrics MBean name Description
append-records-rate The average number of records appended per second by the leader of the raft quorum.
commit-latency-avg The average time in milliseconds to commit an entry in the raft log.
commit-latency-max The maximum time in milliseconds to commit an entry in the raft log.
current-epoch The current quorum epoch.
current-leader The current quorum leader’s id; -1 indicates unknown.
current-state The current state of this member; possible values are leader, candidate, voted, follower, unattached, observer.
current-vote The current voted leader’s id; -1 indicates not voted for anyone.
election-latency-avg The average time in milliseconds spent on electing a new leader.
election-latency-max The maximum time in milliseconds spent on electing a new leader.
fetch-records-rate The average number of records fetched from the leader of the raft quorum.
high-watermark The high watermark maintained on this member; -1 if it is unknown.
log-end-offset The current raft log end offset.
number-unknown-voter-connections Number of unknown voters whose connection information is not cached. This value of this metric is always 0.
poll-idle-ratio-avg The average fraction of time the client’s poll() is idle as opposed to waiting for the user code to process records.

Other quorum metrics:

MBean Description
kafka.server:type=MetadataLoader,name=CurrentMetadataVersion Outputs the feature level of the current metadata version.
kafka.server:type=MetadataLoader,name=HandleLoadSnapshotCount The total number of times that a KRaft snapshot has been loaded since the process was started.
kafka.server:type=SnapshotEmitter,name=LatestSnapshotGeneratedBytes The total size in bytes of the latest snapshot that the node has generated. If a snapshot has not been generated yet, this is the size of the latest snapshot that was loaded. If no snapshots have been generated or loaded, this is 0.
kafka.server:type=SnapshotEmitter,name=LatestSnapshotGeneratedAgeMs The interval in milliseconds since the latest snapshot was generated. If no snapshot has been generated yet, this is the approximate time delta since the process was started.

Controller metrics

With KRaft, Kafka added a new controller quorum to the cluster instead of the cluster being controlled by ZooKeeper. These controllers must be able to commit records for Kafka to be available so you need to monitor their health.

For the full list of KRaft metrics, see KRaft broker metrics and KRaft Quorum metrics.

kafka.controller:type=KafkaController MBean name Description
ActiveBrokerCount When using KRaft, the number of registered and unfenced brokers as observed by this controller. When using ZooKeeper, this value is the number of brokers known to the controller.
ActiveControllerCount The number of active controllers on this node. Valid values are ‘0’ or ‘1’. Alert if the aggregated sum across all brokers in the cluster is anything other than 1 because there should be exactly one controller per cluster.
FencedBrokerCount When using KRaft, the number of registered but fenced brokers as observed by this controller. When using ZooKeeper, this value is always 0.
GlobalPartitionCount The number of all partitions in the cluster as observed by this controller.
GlobalTopicCount The number of all topics in the cluster as observed by this controller.
LastAppliedRecordLagMs Reports the difference between the local time and the append time of the last applied record batch. For active controllers the value of this lag is always zero.
LastAppliedRecordOffset The offset of the last record that was applied by the controller to the cluster metadata partition. For the active controller this may include uncommitted records. For the inactive controller this always includes committed records only.
LastAppliedRecordTimestamp The timestamp of the last record that was applied by the controller to the cluster metadata partition.
LastCommittedRecordOffset The active controller reports the offset of the last committed offset it consumed. Inactive controllers will always report the same value as LastAppliedRecordOffset. You can monitor the last committed offsets to see that they are advancing. You can also use these metrics to check that all of the brokers and controllers are at a similar offset.
LastAppliedRecordTimestamp The timestamp of the last record that was applied by the controller to the cluster metadata partition.
MetadataErrorCount The number of times this controller node has encountered an error during metadata log processing.
NewActiveControllerCount Counts the number of times this node has seen a new controller elected. A transition to the “no leader” state is not counted here. If the same controller as before becomes active, that still counts.
EventQueueOperationsStartedCount The total number of controller event queue operations that were started. This count includes deferred operations.
EventQueueOperationsTimedOutCount The total number of controller event queue operations that timed out before they could be performed.
OfflinePartitionCount The number of offline topic partitions (non-internal) as observed by this controller.
PreferredReplicaImbalanceCount The count of topic partitions for which the leader is not the preferred leader.
TimedOutBrokerHeartbeatCount The number of broker heartbeats that timed out on this controller since the process was started. Note that only active controllers handle heartbeats, so only they will see increases in this metric.
ZkWriteDeltaTimeMs The number of milliseconds the KRaft controller took writing a delta into ZooKeeper.
ZkWriteSnapshotTimeMs The number of milliseconds the KRaft controller took reconciling a snapshot into ZooKeeper.
ZkWriteBehindLag The amount of lag in records that ZooKeeper is behind relative to the highest committed record in the metadata log. This metric will only be reported by the active KRaft controller.

ControllerEventManager metrics:

kafka.controller:type=ControllerEventManager MBean name Description
EventQueueProcessingTimeMs A histogram of the time in milliseconds that requests spent being processed in the controller event queue.
EventQueueTimeMs A histogram of the time in milliseconds that requests spent waiting in the controller event queue.

KRaft Broker metrics

kafka.server:type=broker-metadata-metrics MBean name Description
last-applied-record-offset The offset of the last record from the cluster metadata partition that was applied by the broker.
last-applied-record-timestamp The timestamp of the last record from the cluster metadata partition that was applied by the broker.
last-applied-record-lag-ms The difference between now and the timestamp of the last record from the cluster metadata partition that was applied by the broker.
metadata-load-error-count The number of errors encountered by the BrokerMetadataListener while loading the metadata log and generating a new metadata delta based on it.
metadata-apply-error-count The number of errors encountered by the BrokerMetadataPublisher while applying a new metadata imaged based on the latest metadata delta.