Running Kafka in Production with Confluent Platform¶
This section describes the key considerations before going to production with Confluent Platform.
Hardware¶
If you have followed the normal development path, you have tried Apache Kafka® on your laptop or on a small cluster of machines. But when it comes time to deploying Kafka to production, there are a few recommendations that you should consider.
The following table lists hardware recommendations. Nothing is a hard-and-fast rule; Kafka is used for a wide range of use cases and on a lot of different machines. These recommendations provide a good starting point based on the experiences of Confluent with production clusters.
Component | Nodes | Storage | Memory | CPU |
---|---|---|---|---|
Control Center-Normal mode | 1 | 300 GB, preferably SSDs | 32 GB RAM (JVM default 6 GB) | 12 cores or more |
Control Center-Reduced infrastructure mode | 1 | 128 GB, preferably SSDs | 8 GB RAM (JVM default 4 GB) | 4 cores or more |
Broker | 3 |
|
64 GB RAM | Dual 12-core sockets |
KRaft controller | 3-5 | 64 GB SSD | 4 GB RAM | 4 cores |
Confluent Manager for Apache Flink | 2 Kubernetes pods | 10 GB (Kubernetes persistent volume) | 4 GB RAM (For managing 150 Flink applications) | 3 cores (For managing 150 Flink applications) |
Connect | 2 | Storage is only required at installation time. | 0.5 - 4 GB heap size depending on connectors | Typically not CPU-bound. More cores is better than faster cores. |
Replicator- Same as Connect for nodes, storage, memory, and CPU. (See note that follows about AWS.) | 2 | Storage is only required at installation time. | 0.5 - 4 GB heap size | More cores is better |
ksqlDB - See Capacity planning | 2 | Use SSD. Sizing depends on the number of concurrent queries and the aggregation performed. Minimum 100 GB for a basic server. | 20 GB RAM | 4 cores |
REST Proxy | 2 | Storage is only required at installation time. | 1 GB overhead plus 64 MB per producer and 16 MB per consumer | 16 cores to handle HTTP requests in parallel and background threads for consumers and producers. |
Schema Registry | 2 | Storage is only required at installation time. | 1 GB heap size | Typically not CPU-bound. More cores is better than faster cores. |
ZooKeeper | 3-5 |
Each write to ZooKeeper must be persisted in the transaction log before the client gets an ack. Using SSD reduces the ZooKeeper write latency. |
4 GB RAM | 2-4 cores |
- If you want to use RAID disks, the recommendation is:
- RAID 1 and RAID 10: Preferred
- RAID 0: 2nd preferred
- RAID 5: Not recommended
Note
If deploying Confluent Platform on AWS VMs and running Replicator as a connector, be aware that VMs with burstable CPU types (T2, T3, T3a, and T4g) will not support high throughput streaming workloads. Replicator worker nodes running on these VMs experience throughput degradation due to credits expiring, making these VMs unsuitable for Confluent Platform nodes expected to run at elevated CPU levels for a sustained period of time, and supporting workloads that are above and beyond their baseline resource rates.
More information follows in the next sections.
Memory¶
Kafka relies heavily on the filesystem for storing and caching messages. All data is immediately written to a persistent log on the filesystem without necessarily flushing to disk. In effect this just means that it is transferred into the kernel’s pagecache. A modern OS will happily divert all free memory to disk caching with little performance penalty when the memory is reclaimed. Furthermore, Kafka uses heap space very carefully and does not require setting heap sizes more than 6 GB. This will result in a file system cache of up to 28-30 GB on a 32 GB machine.
You need sufficient memory to buffer active readers and writers. You can do a back-of-the-envelope estimate of
memory needs by assuming you want to be able to buffer for 30 seconds and compute your memory need as
write_throughput * 30
.
A machine with 64 GB of RAM is a decent choice, but 32 GB machines are not uncommon. Less than 32 GB tends to be counterproductive (you end up needing many, many small machines).
CPUs¶
Most Kafka deployments tend to be rather light on CPU requirements. As such, the exact processor setup matters less than the other resources. Note that if TLS is enabled, the CPU requirements can be significantly higher (the exact details depend on the CPU type and JVM implementation).
You should choose a modern processor with multiple cores. Common clusters utilize 24 core machines.
If you need to choose between faster CPUs or more cores, choose more cores. The extra concurrency that multiple cores offers will far outweigh a slightly faster clock speed.
Disks¶
Note
- Tiered Storage in Confluent Platform requires a single mount point and therefore does not support JBOD (just a bunch of disks), as described under Known limitations. If you want to use Tiered Storage, do not use JBOD.
- Manage Self-Balancing Kafka Clusters in Confluent Platform requires a single mount point and therefore does not support JBOD, as described under Limitations. If you want to use Self-Balancing Clusters, do not use JBOD.
You should use multiple drives to maximize throughput. Do not share the same drives used for Kafka data with application logs or other OS filesystem activity to ensure good latency. You can either combine these drives together into a single volume as a Redundant Array of Independent Disks (RAID) or format and mount each drive as its own directory. Because Kafka has replication the redundancy provided by RAID can also be provided at the application level. This choice has several tradeoffs.
If you configure multiple data directories, the broker places a new partition in the path with the least number of partitions currently stored. Each partition will be entirely in one of the data directories. If data is not well balanced among partitions, this can lead to load imbalance among disks.
RAID can potentially do better at balancing load between disks (although it doesn’t always seem to) because it balances load at a lower level.
RAID 10 is recommended as the best “sleep at night” option for most use cases. It provides improved read and write performance, data protection (ability to tolerate disk failures), and fast rebuild times.
The primary downside of RAID is that it reduces the available disk space. Another downside is the I/O cost of rebuilding the array when a disk fails. The rebuild cost applies to RAID in general, with nuances between the different versions.
Finally, you should avoid network-attached storage (NAS). NAS is often slower, displays larger latencies with a wider deviation in average latency, and is a single point of failure.
Network¶
A fast and reliable network is an essential performance component in a distributed system. Low latency nodes can communicate easily, while high bandwidth helps shard movement and recovery. Modern data-center networking (1 GbE, 10 GbE) is sufficient for the vast majority of clusters and latency less than 30 milliseconds is generally recommended for Kafka.
If the network between brokers is dissimilar in terms of latency, reliability or bandwidth, see Configure Multi-Region Clusters in Confluent Platform.
General Considerations¶
In general, medium-to-large machines are preferred for Kafka:
- Avoid small machines because you don’t want to manage a cluster with a thousand nodes, and the overhead of running Kafka is more apparent on such small boxes.
- Avoid the large machines because they often lead to imbalanced resource usage. For example, all the memory is being used, but none of the CPU. They can also add logistical complexity if you have to run multiple nodes per machine.
Filesystem¶
You should run Kafka on XFS or ext4.
VMware optimization¶
Confluent and VMWare recommend enabling compression. to help mitigate the performance impacts while maintaining business continuity.
Warning
Disable vMotion and disk snapshotting for Confluent Platform as the features could cause a full cluster outage when used with Kafka.
JVM¶
You need to separately install the correct version of Java before you start the Confluent Platform installation process.
The following table lists Java support in Confluent Platform by version. Note the following:
- Java 8 was deprecated in Confluent Platform version 7.4.x and will be removed in a future version.
- Ubuntu 22.04 supports Java versions 17 and 11, and does not support Java 8 in Confluent Platform 7.8.x.
- Docker images support Java versions 17 or 11. For more information, see Docker.
- Eclipse Temurin (formerly known as AdoptOpenJDK), OpenJDK, Zulu OpenJDK, and Oracle are supported with Confluent Platform.
- You should use the full JDK to help Confluent Support with troubleshooting and to provide better support if you experience issues with Confluent Platform.
Confluent Platform | Recommended | Supported |
---|---|---|
7.8.x | 17 | 17, 11, 8 * |
7.7.x | 17 | 17, 11, 8 * |
7.6.x | 17 | 17, 11, 8 * |
7.5.x | 17 | 17, 11, 8 * |
7.4.x | 17 | 17, 11, 8 * |
7.3.x | 17 | 17, 11, 8 |
7.2.x | 11 | 11, 8 |
7.1 | 11 or 8 | 11, 8 |
* Java 8 is deprecated, and will be removed in a future version.
From a security perspective, you should always use the latest released patch version because older versions may have security vulnerabilities.
Java 9 and 10 are not supported in Confluent Platform as those versions are short-term rapid release versions.
For more information about Java versions, see Java Version History.
The recommended GC tuning (tested on a large deployment with JDK 1.8 u5) looks like this:
-Xms6g -Xmx6g -XX:MetaspaceSize=96m -XX:+UseG1GC -XX:MaxGCPauseMillis=20
-XX:InitiatingHeapOccupancyPercent=35 -XX:G1HeapRegionSize=16M
-XX:MinMetaspaceFreeRatio=50 -XX:MaxMetaspaceFreeRatio=80
For reference, here are the stats on one of LinkedIn’s busiest clusters (at peak):
- 60 brokers
- 50k partitions (replication factor 2)
- 800k messages/sec in
- 300 MBps inbound, 1 GBps + outbound
The tuning looks fairly aggressive, but all of the brokers in that cluster have a 90% GC pause time of about 21ms, and they’re doing less than 1 young GC per second.
Production Configuration Options¶
The Kafka default settings should work in most cases, especially the performance-related settings and options, but there are some logistical configurations that should be changed for production depending on your cluster layout.
Refer to the following resources for additional information:
- For recommendations for maximizing Kafka in production, listen to the podcast, Running Apache Kafka in Production.
- For a course on running Kafka in production, see Mastering Production Data Streaming Systems with Apache Kafka.
- To learn more about running Kafka in KRaft mode, see KRaft Configuration for Confluent Platform.
- To learn about benchmark testing and results for Kafka performance on the latest hardware in the cloud, see Apache Kafka Performance, Latency, Throughput, and Test.
Kafka in KRaft mode¶
If you are using Kafka in KRaft mode, you must configure a node to be a broker or a controller. In addition, you must create a unique cluster ID and format the log directories with that ID.
Typically in a production environment, you should have a minimum of three brokers and three controllers.
Navigate to the Kafka properties file for KRaft (find example KRaft configuration files under
/etc/kafka/kraft/
) and customize the following:Configure the
process.roles
,node.id
andcontroller.quorum.voters
for each node.For
process.roles
, set whether the node will be abroker
or acontroller
.combined
mode, meaningprocess.roles
is set tobroker,controller
, is currently not supported for production workloads.Set a system-wide unique ID for the
node.id
for each broker/controller.controller.quorum.voters
should be a comma-separated list of controllers in the formatnodeID@hostname:port
############################# Server Basics ############################# # The role of this server. Setting this puts us in KRaft mode process.roles=broker # The node id associated with this instance's roles node.id=2 # The connect string for the controller quorum controller.quorum.voters=1@controller1:9093,3@controller3:9093,5@controller5:9093
Configure how brokers and clients communicate with the broker using
listeners
, and where controllers listen withcontroller.listener.names
.listeners
: Comma-separated list of URIs and listener names to listen on in the formatlistener_name://host_name:port
controller.listener.names
: Comma-separated list oflistener_name
entries for listeners used by the controller.
For more information, see KRaft Configuration for Confluent Platform.
Before you start Kafka, you must use the kafka-storage tool with the
random-uuid
command to generate a cluster ID for each new cluster. You only need one cluster ID, which you will use to format each node in the cluster.bin/kafka-storage random-uuid
This results in output like the following:
q1Sh-9_ISia_zwGINzRvyQ
Then use the cluster ID to format storage for each node in the cluster with the
kafka-storage
tool that is provided with Confluent Platform, and theformat
command like the following example, specifying the properties file for a controller.bin/kafka-storage format -t q1Sh-9_ISia_zwGINzRvyQ -c etc/kafka/kraft/controller.properties
Previously, Kafka would format blank storage directories automatically and generate a new cluster ID automatically. One reason for the change is that auto-formatting can sometimes obscure an error condition. This is particularly important for the metadata log maintained by the controller and broker servers. If a majority of the controllers were able to start with an empty log directory, a leader might be able to be elected with missing committed data. To configure the log directory, either set
metadata.log.dir
orlog.dirs
. For more information, see Inter-broker listeners.Configure security for your environment.
- For general security guidance, see Manage Security in Confluent Platform.
- For role-based access control (RBAC), see Configure Metadata Service (MDS) in Confluent Platform.
Kafka in ZooKeeper mode¶
If you are using ZooKeeper for cluster metadata management, use the following guidelines.
During startup in ZooKeeper mode, Kafka brokers register themselves in ZooKeeper to become a member of the cluster.
In a production environment, multiple brokers are required.
To configure brokers, navigate to the Apache Kafka® properties file (/etc/kafka/server.properties
) and customize the following:
Connect to the same ZooKeeper ensemble by setting the
zookeeper.connect
in all nodes to the same value. Replace all instances oflocalhost
to the hostname or FQDN (fully qualified domain name) of your node. For example, if your hostname iszookeeper
:zookeeper.connect=zookeeper:2181
Configure the broker IDs for each node in your cluster using one of these methods.
Dynamically generate the broker IDs: add
broker.id.generation.enable=true
and comment outbroker.id
. For example:############################# Server Basics ############################# # The ID of the broker. This must be set to a unique integer for each broker. #broker.id=0 broker.id.generation.enable=true
Manually set the broker IDs: set a unique value for
broker.id
on each node.
Configure how other brokers and clients communicate with the broker using
listeners
, and optionallyadvertised.listeners
.listeners
: Comma-separated list of URIs and listener names to listen on.advertised.listeners
: Comma-separated list of URIs and listener names for other brokers and clients to use. Theadvertised.listeners
parameter ensures that the broker advertises an address that is accessible from both local and external hosts.
For more information, see Production Configuration Options.
Configure security for your environment.
- For general security guidance, see Manage Security in Confluent Platform.
- For role-based access control (RBAC), see Configure Metadata Service (MDS) in Confluent Platform.
- For TLS encryption, SASL authentication, and authorization, see Enable Security for a ZooKeeper-Based Cluster in Confluent Platform.
Replication configurations¶
Replication factor is a topic setting that determines how many copies of a topic are created. The replication factor includes the total number of replicas including the leader, which means that topics with a replication factor of one (1) are topics that are not replicated.
default.replication.factor
The default replication factor that applies to auto-created topics. You should set this to at least 2.
- Type: int
- Default: 1
- Importance: medium
min.insync.replicas
The minimum number of in-sync replicas (ISRs) needed to commit a produce request with
required.acks=-1
(or all).- Type: int
- Default: 1
- Importance: medium
unclean.leader.election.enable
Indicates whether to enable replicas not in the ISR set to be elected as leader as a last resort, even though doing so may result in data loss.
- Type: boolean
- Default: false
- Importance: medium
Logs and partitions¶
Each Kafka partition is a log file stored in the directory specified by the log.dirs
setting.
These settings determine the number of partitions for a topic and the log storage location.
log.dirs
The directories in which the Kafka log data is located.
- Type: string
- Default: “/tmp/kafka-logs”
- Importance: high
num.partitions
The default number of log partitions for auto-created topics. You should increase this since it is better to over-partition a topic. Over-partitioning a topic leads to better data balancing and aids consumer parallelism. For keyed data, you should avoid changing the number of partitions in a topic.
- Type: int
- Default: 1
- Valid Values: [1,…]
- Importance: medium
- Dynamic Update Mode: read-only
File Descriptors and mmap¶
Kafka uses a very large number of files and a large number of sockets to communicate with the clients. All of this requires a relatively high number of available file descriptors.
Many modern Linux distributions ship with only 1,024 file descriptors allowed per process. This is too low for Kafka.
You should increase your file descriptor count to at least 100,000, and possibly much more. This process can be difficult and is highly dependent on your particular OS and distribution. Consult the documentation for your OS to determine how best to change the allowed file descriptor count.
Here are some recommendations:
To calculate the current mmap number, you can count the .index
files in the Kafka data
directory. The .index
files represent the majority of the memory mapped files. Here is the
procedure:
Count the
.index
files using this command:find . -name '*index' | wc -l
Set the
vm.max_map_count
for the session. This will calculate the current number of memory mapped files. The minimum value for mmap limit (vm.max_map_count
) is the number of open files ulimit.Important
You should set
vm.max_map_count
sufficiently higher than the number of.index
files to account for broker segment growth.sysctl -w vm.max_map_count=262144
Set the
vm.max_map_count
so that it will survive a reboot use this command:echo 'vm.max_map_count=262144' >> /etc/sysctl.conf sysctl -p
For more information, see the OS section of the Kafka documentation and How to Address Out of Memory Due to Maximum MMap Count Exceeded (requires sign in) in the Confluent Support portal.