You are viewing documentation for an older version of Confluent Platform. For the latest, click here.

Production Deployment

This section is not meant to be an exhaustive guide to running your Kafka cluster in production, but it covers the key things to consider before putting your cluster live

Three main areas are covered:

  • Logistical considerations, such as hardware recommendations and deployment strategies
  • Configuration changes that are more suited to a production environment
  • Post-deployment considerations, such as data rebalancing, multi-data center setup


If you’ve been following the normal development path, you’ve probably been playing with Kafka on your laptop or on a small cluster of machines laying around. But when it comes time to deploying Kafka to production, there are a few recommendations that you should consider. Nothing is a hard-and-fast rule; Kafka is used for a wide range of use cases and on a bewildering array of machines. But these recommendations provide good starting points based on our experience with production clusters.


Kafka relies heavily on the filesystem for storing and caching messages. All data is immediately written to a persistent log on the filesystem without necessarily flushing to disk. In effect this just means that it is transferred into the kernel’s pagecache. A modern OS will happily divert all free memory to disk caching with little performance penalty when the memory is reclaimed. Furthermore, Kafka uses heap space very carefully and does not require setting heap sizes more than 5GB. This will result in a file system cache of up to 28-30GB on a 32GB machine.

You need sufficient memory to buffer active readers and writers. You can do a back-of-the-envelope estimate of memory needs by assuming you want to be able to buffer for 30 seconds and compute your memory need as write_throughput*30.

A machine with 64 GB of RAM is a decent choice, but 32 GB machines are not uncommon. Less than 32 GB tends to be counterproductive (you end up needing many, many small machines).


Most Kafka deployments tend to be rather light on CPU requirements. As such, the exact processor setup matters less than the other resources. You should choose a modern processor with multiple cores. Common clusters utilize 24 core machines.

If you need to choose between faster CPUs or more cores, choose more cores. The extra concurrency that multiple cores offers will far outweigh a slightly faster clock speed.


We recommend using multiple drives to get good throughput and not sharing the same drives used for Kafka data with application logs or other OS filesystem activity to ensure good latency. As of 0.8 you can either RAID these drives together into a single volume or format and mount each drive as its own directory. Since Kafka has replication the redundancy provided by RAID can also be provided at the application level. This choice has several tradeoffs. If you configure multiple data directories partitions will be assigned round-robin to data directories. Each partition will be entirely in one of the data directories. If data is not well balanced among partitions this can lead to load imbalance between disks.

RAID can potentially do better at balancing load between disks (although it doesn’t always seem to) because it balances load at a lower level. The primary downside of RAID is that it is usually a big performance hit for write throughput and reduces the available disk space.

Another potential benefit of RAID is the ability to tolerate disk failures. However our experience has been that rebuilding the RAID array is so I/O intensive that it effectively disables the server, so this does not provide much real availability improvement.

Our recommendation is to configure your Kafka server with multiple log directories, each directory mounted on a separate drive.

Finally, avoid network-attached storage (NAS). People routinely claim their NAS solution is faster and more reliable than local drives. Despite these claims, we have never seen NAS live up to its hype. NAS is often slower, displays larger latencies with a wider deviation in average latency, and is a single point of failure.


A fast and reliable network is obviously important to performance in a distributed system. Low latency helps ensure that nodes can communicate easily, while high bandwidth helps shard movement and recovery. Modern data-center networking (1 GbE, 10 GbE) is sufficient for the vast majority of clusters.

Avoid clusters that span multiple data centers, even if the data centers are colocated in close proximity. Definitely avoid clusters that span large geographic distances.

Kafka clusters assume that all nodes are equal—not that half the nodes are actually 150ms distant in another data center. Larger latencies tend to exacerbate problems in distributed systems and make debugging and resolution more difficult.

Similar to the NAS argument, everyone claims that their pipe between data centers is robust and low latency. This is true—until it isn’t (a network failure will happen eventually; you can count on it). From our experience, the hassle and cost of managing cross–data center clusters is simply not worth the benefits.


We recommend running Kafka on ext4.

General considerations

It is possible nowadays to obtain truly enormous machines: hundreds of gigabytes of RAM with dozens of CPU cores. Conversely, it is also possible to spin up thousands of small virtual machines in cloud platforms such as EC2. Which approach is best?

In general, it is better to prefer medium-to-large boxes. Avoid small machines, because you don’t want to manage a cluster with a thousand nodes, and the overhead of simply running Kafka is more apparent on such small boxes.

At the same time, avoid the truly enormous machines. They often lead to imbalanced resource usage (for example, all the memory is being used, but none of the CPU) and can add logistical complexity if you have to run multiple nodes per machine.


We recommend running the latest version of JDK 1.8 with the G1 collector (older freely available versions have disclosed security vulnerabilities).

If you are still on JDK 1.7 (which is also supported) and you are planning to use G1 (the current default), make sure you’re on u51. We tried out u21 in testing, but we had a number of problems with the GC implementation in that version.

Our recommended GC tuning (tested on a large deployment with JDK 1.8 u5) looks like this:

-Xms6g -Xmx6g -XX:MetaspaceSize=96m -XX:+UseG1GC -XX:MaxGCPauseMillis=20
       -XX:InitiatingHeapOccupancyPercent=35 -XX:G1HeapRegionSize=16M
       -XX:MinMetaspaceFreeRatio=50 -XX:MaxMetaspaceFreeRatio=80

For reference, here are the stats on one of LinkedIn’s busiest clusters (at peak):

  • 60 brokers
  • 50k partitions (replication factor 2)
  • 800k messages/sec in
  • 300 MB/sec inbound, 1 GB/sec+ outbound

The tuning looks fairly aggressive, but all of the brokers in that cluster have a 90% GC pause time of about 21ms, and they’re doing less than 1 young GC per second.

Important Configuration Options

Kafka ships with very good defaults, especially when it comes to performance- related settings and options. When in doubt, just leave the settings alone.

With that said, there are some logistical configurations that should be changed for production. These changes are necessary either to make your life easier, or because there is no way to set a good default (because it depends on your cluster layout).


The list of zookeeper hosts that the broker registers at. It is recommended that you configure this to all the hosts in your zookeeper cluster

  • Type: string
  • Importance: high
Integer id that identifies a broker. No two brokers in the same Kafka cluster can have the same id.
  • Type: int
  • Importance: high

The directories in which the Kafka log data is located.

  • Type: string
  • Default: “/tmp/kafka-logs”
  • Importance: high

Hostname to publish to ZooKeeper for clients to use. In IaaS environments, this may need to be different from the interface to which the broker binds. If this is not set, it will use the value for “” if configured. Otherwise it will use the value returned from

  • Type: string
  • Default:
  • Importance: high

The port to publish to ZooKeeper for clients to use. In IaaS environments, this may need to be different from the port to which the broker binds. If this is not set, it will publish the same port that the broker binds to

  • Type: int
  • Default: port
  • Importance: high

The default number of log partitions for auto-created topics. We recommend increasing this as it is better to over partition a topic. Over partitioning a topic leads to better data balancing as well as aids consumer parallelism. For keyed data, in particular, you want to avoid changing the number of partitions in a topic.

  • Type: int
  • Default: 1
  • Importance: medium

Replication configs


The default replication factor that applies to auto-created topics. We recommend setting this to at least 2.

  • Type: int
  • Default: 1
  • Importance: medium

The minimum number of replicas in ISR needed to commit a produce request with required.acks=-1 (or all).

  • Type: int
  • Default: 1
  • Importance: medium

Indicates whether to enable replicas not in the ISR set to be elected as leader as a last resort, even though doing so may result in data loss.

  • Type: int
  • Default: 1
  • Importance: medium

File descriptors and mmap

Kafka uses a very large number of files. At the same time, Kafka uses a large number of sockets to communicate with the clients. All of this requires a relatively high number of available file descriptors.

Sadly, many modern Linux distributions ship with a paltry 1,024 file descriptors allowed per process. This is far too low for even a small Kafka node, let alone one that hosts hundreds of partitions.

You should increase your file descriptor count to something very large, such as 100,000. This process is irritatingly difficult and highly dependent on your particular OS and distribution. Consult the documentation for your OS to determine how best to change the allowed file descriptor count.


Stable version

The current stable branch is 3.4 and the latest release of that branch is 3.4.6, which we include in our bundle and is the one ZkClient 0.7 uses. ZkClient is the client layer Kafka uses to interact with ZooKeeper.

Note that the ZooKeeper start script that comes with the platform will start a 3.4.6 ZooKeeper server. If you already have an ensemble running with an older version, you will want to upgrade it at least to a version in the 3.4 branch to avoid compatibility issues, and ideally 3.4.6 or later.

Operationalizing ZooKeeper

Operationally, we do the following for a healthy ZooKeeper installation:

  • Redundancy in the physical/hardware/network layout: try not to put them all in the same rack, decent (but don’t go nuts) hardware, try to keep redundant power and network paths, etc. A typical ZooKeeper ensemble has 5-7 servers, which tolerates 2 and 3 servers down, respectively. If you have a small deployment, then using 3 servers is acceptable, but keep in mind that you’ll only be able to tolerate 1 server down in this case.
  • I/O segregation: if you do a lot of write type traffic you’ll almost definitely want the transaction logs on a dedicated disk group. Writes to the transaction log are synchronous (but batched for performance), and consequently, concurrent writes can significantly affect performance. ZooKeeper snapshots can be one such a source of concurrent writes, and ideally should be written on a disk group separate from the transaction log. Snapshots are writtent to disk asynchronously, so it is typically ok to share with the operating system and message log files. You can configure a server to use a separate disk group with the dataLogDir parameter.
  • Application segregation: Unless you really understand the application patterns of other apps that you want to install on the same box, it can be a good idea to run ZooKeeper in isolation (though this can be a balancing act with the capabilities of the hardware). If you do end up sharing the ensemble, you might want to use the chroot feature. With chroot, you give each application its own namespace.
  • Use care with virtualization: In general, there is no problem with using ZooKeeper in a virtualized environment. It is best to have servers are placed in different availability zones to avoid correlated crashes and to make sure that the storage system available can accomodate the requirements of the transaction logging and the snapshotting of ZooKeeper. Also, keep in mind that there has been some issues resolved in ZooKeeper related to name resolution in the cloud, so depending on the version you are using, you may end up hitting such an issue. If you suspect you are there, consider reading the ZooKeeper documentation or checking directly with the ZooKeeper community
  • ZooKeeper configuration: It’s java, make sure you give it ‘enough’ heap space (We usually run them with 3-5G, but that’s mostly due to the data set size we have here). Unfortunately we don’t have a good formula for it, but keep in mind that allowing for more ZooKeeper state means that snapshots can become large, and large snapshots affect recovery time. In fact, if the snapshot becomes too large (a few gigabytes), then you may need to increase the initLimit parameter to give enough time for servers to recover and join the ensemble.
  • Monitoring: Both JMX and the 4 letter words (4lw) commands are very useful, they do overlap in some cases (and in those cases we prefer the 4 letter words, they seem more predictable, or at the very least, they work better with the LI monitoring infrastructure).
  • Don’t overbuild the ensemble: large ensembles, especially under a write-heavy workload, means a lot of cross-server communication, since updates are propagated to all server replicas. Also, don’t underbuild it and risk swamping the ensemble with read requests. Keep in mind that the deal is that more servers give you more read capacity but less write capacity, while smaller ensembles are the other way around. One way to have lower impact on write capacity while adding read capacity is to add observers.
  • Additional admin documentation: If you need some additional detail about the administration of ZooKeeper, the admin guide contains more material.

Overall, we try to keep the ZooKeeper system as small as will handle the load (plus standard growth capacity planning) and as simple as possible. We try not to do anything fancy with the configuration or application layout as compared to the official release as well as keep it as self contained as possible. For these reasons, we tend to skip the OS packaged versions, since it has a tendency to try to put things in the OS standard hierarchy, which can be ‘messy’, for want of a better way to word it.

Mirroring data between clusters

We refer to the process of replicating data between Kafka clusters “mirroring” to avoid confusion with the replication that happens amongst the nodes in a single cluster. Kafka comes with a tool for mirroring data between Kafka clusters. The tool reads from one or more source clusters and writes to a destination cluster, like this:


A common use case for this kind of mirroring is to provide a replica in another datacenter. This scenario will be discussed in more detail in the next section.

You can run many such mirroring processes to increase throughput and for fault-tolerance (if one process dies, the others will take overs the additional load).

Data will be read from topics in the source cluster and written to a topic with the same name in the destination cluster. In fact the mirror maker is little more than a Kafka consumer and producer hooked together.

The source and destination clusters are completely independent entities: they can have different numbers of partitions and the offsets will not be the same. For this reason the mirror cluster is not really intended as a fault-tolerance mechanism (as the consumer position will be different); for that we recommend using normal in-cluster replication. The mirror maker process will, however, retain and use the message key for partitioning so order is preserved on a per-key basis.

Here is an example showing how to mirror a single topic (named my-topic) from two input clusters:

bin/ --consumer.config \
   --consumer.config --producer.config \
   --whitelist my-topic

Note that we specify the list of topics with the --whitelist option. This option allows any regular expression using Java-style regular expressions. So you could mirror two topics named A and B using --whitelist 'A|B'. Or you could mirror all topics using --whitelist '*'. Make sure to quote any regular expression to ensure the shell doesn’t try to expand it as a file path. For convenience we allow the use of ‘,’ instead of ‘|’ to specify a list of topics.

Sometimes it is easier to say what it is that you don’t want. Instead of using --whitelist to say what you want to mirror you can use --blacklist to say what to exclude. This also takes a regular expression argument.

Combining mirroring with the configuration auto.create.topics.enable=true makes it possible to have a replica cluster that will automatically create and replicate all data in a source cluster even as new topics are added.