Configure a Multi-Node Confluent Platform Environment with Docker¶
This topic demonstrates how to configure a multi-node Apache Kafka® environment with Docker and cloud providers.
Kafka is a distributed system and data is read from and written to the partition leader. The leader can be on any broker in a cluster. When a client (producer or consumer) starts, it will request metadata about which broker is the leader for a partition. This request for metadata can come from any broker. The metadata that is returned will include the available endpoints for the lead broker of that partition. The client will use those endpoints to connect to the broker to read or write data as required.
Kafka needs to know how the brokers can communicate with each other, and how external clients (producers and consumers) can reach the broker. The required host and IP address is determined based on the data that the broker passes back in the initial connection (e.g. if it’s a single node, the broker returned is the same as the one connected to).
Kafka brokers can have multiple listeners. A listener is a combination of Host/IP, Port, and Protocol. Following is an example Docker configuration of multiple listeners for KRaft mode:
KAFKA_LISTENERS: CONTROLLER://kafka0:29093,LISTENER_BOB://kafka0:29092,LISTENER_FRED://localhost:9092
KAFKA_ADVERTISED_LISTENERS: LISTENER_BOB://kafka0:29092,LISTENER_FRED://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,LISTENER_BOB:PLAINTEXT,LISTENER_FRED:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_BOB
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
- KAFKA_LISTENERS
A comma-separated list of listeners, host/IP, and port that Kafka binds and listens to. For more complex networking, this can be an IP address that is associated with a network interface on a machine. The default is
0.0.0.0
, which means listening on all interfaces. This is equivalent to thelisteners
configuration parameter in the server properties file (For ZooKeeper mode:CONFLUENT_HOME/etc/kafka/server.properties
or KRaft mode:CONFLUENT_HOME/etc/kafka/kraft/server.properties
).In a multi-node (production) environment running in ZooKeeper mode, you must set the
KAFKA_ADVERTISED_LISTENERS
property in your Dockerfile to the external host/IP address. Otherwise, by default, clients will attempt to connect to the internal host address. In a single-node environment, running “bare metal” (no VMs, no Docker) everything might be the hostname or simplylocalhost
. However, more complex networking setups, such as multiple nodes, require additional configuration.- KAFKA_ADVERTISED_LISTENERS
- A comma-separated list of listeners with their the host/IP and port. This is the
metadata that is passed back to clients. This is equivalent to the
advertised.listeners
configuration parameter in the server properties file. - KAFKA_CONTROLLER_LISTENER_NAMES
- In KRaft mode, A comma-separated list of the names of the listeners used by the controller. On a node with
process.roles=broker
, only the first listener in the list will be used by the broker. ZooKeeper-mode brokers should not set this value. For KRaft controllers in isolated or combined mode, the node will listen as a KRaft controller on all listeners that are listed for this property, and each must appear in thelisteners
property. - KAFKA_LISTENER_SECURITY_PROTOCOL_MAP
- Defines key/value pairs for the security protocol to use, per listener name.
This is equivalent to the
listener.security.protocol.map
configuration parameter in the server properties file. - KAFKA_INTER_BROKER_LISTENER_NAME
- Defines which listener to use for inter-broker communication. Kafka brokers
communicate between themselves, usually on the internal network (e.g. Docker network, AWS VPC, etc). The host/IP must
be accessible from the broker machine to others. This is equivalent to the
inter.broker.listener.name
configuration parameter in the server properties file.
If Kafka clients are not local to the broker’s network, additional listeners are required. Each listener will report the address where it can be reached. The broker address depends on the network used. For example, if you’re connecting to the broker from an internal network, the host/IP is different than when connecting externally.
Connecting to Kafka on Docker¶
If you are running Kafka on Docker internal networks plus a host machine, you must configure a listener for Kafka communication within the Docker network and a listener for non-Docker network traffic.
- For communication within the Docker network, use the hostname of the Docker containers. Each Docker container on the same Docker network will use the hostname of the Kafka broker container to reach it. This could be inter-broker communication (i.e. between brokers), between other components running in Docker such as Kafka Connect, or third-party clients or producers.
- For communication outside of the Docker network, use localhost. The assumption is that the clients will connect on localhost, to a port exposed from the Docker container. For example, clients running local on the Docker host machine.
You can use the following Docker compose snippet as an example for Kafka in ZooKeeper mode.
Note that for brevity, this example shows KRaft combined mode, which is not supported for production workloads. For more about configuring KRaft, see KRaft Configuration for Confluent Platform.
kafka0:
image: confluentinc/cp-kafka:7.7.1
hostname: kafka0
ports:
- "9092:9092"
environment:
KAFKA_NODE_ID: 1
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,LISTENER_BOB:PLAINTEXT,LISTENER_FRED:PLAINTEXT'
KAFKA_ADVERTISED_LISTENERS: 'LISTENER_BOB://kafka0:29092,LISTENER_FRED://localhost:9092'
KAFKA_PROCESS_ROLES: 'broker,controller'
KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka0:29093'
KAFKA_LISTENERS: 'CONTROLLER://kafka0:29093,LISTENER_BOB://kafka0:29092,LISTENER_FRED://localhost:9092'
KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
KAFKA_INTER_BROKER_LISTENER_NAME: 'LISTENER_BOB'
# Replace CLUSTER_ID with a unique base64 UUID using "bin/kafka-storage.sh random-uuid"
# See https://docs.confluent.io/kafka/operations-tools/kafka-tools.html#kafka-storage-sh
CLUSTER_ID: 'MkU3OEVBNTcwNTJENDM2Qk'
[...]
As of Confluent Platform 7.5, ZooKeeper is deprecated for new deployments. Confluent recommends KRaft mode for new deployments. For more information, see KRaft Overview for Confluent Platform.
kafka0:
image: confluentinc/cp-kafka:7.7.1
ports:
- "9092:9092"
depends_on:
- zookeeper
environment:
KAFKA_ADVERTISED_LISTENERS: LISTENER_BOB://kafka0:29092,LISTENER_FRED://localhost:9092
KAFKA_INTER_BROKER_LISTENER_NAME: 'LISTENER_BOB'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_BOB:PLAINTEXT,LISTENER_FRED:PLAINTEXT
[...]
- Clients within the Docker network connect using listener
LISTENER_BOB
, with port29092
and hostnamekafka0
. With this configuration, the clients will receive the hostnamekafka0
to connect to. Each Docker container will resolvekafka0
using Docker’s internal network, and be able to reach the broker. - Clients external to the Docker network connect using listener
LISTENER_FRED
, with port9092
and hostnamelocalhost
. Port 9092 is exposed by the Docker container and is available to connect to. With this configuration, the clients receive the hostnamelocalhost
to connect to for reading and writing data. - For KRaft mode, controllers use listener
CONTROLLER
.
Important
This configuration will not work for environments where a client external to Docker and external to the host machine
wants to connect. This is because neither kafka0
(the internal Docker hostname) or localhost (the loopback address
for the Docker host machine) would be resolvable.
Connecting to Kafka on a cloud provider¶
If you are running Kafka on a cloud provider (e.g. AWS) and on-premises machines locally or in another cloud, you must configure a listener for Kafka communication within the cloud network and a listener for non-cloud network traffic.
Choose your configuration method, depending on whether external hostnames are internally resolvable.
If external hostnames are internally resolvable, you can use a single listener. Set the default listener, called PLAINTEXT, to the advertised hostname (i.e. the hostname passed to inbound clients):
advertised.listeners=PLAINTEXT://ec2-54-191-84-122.us-west-2.compute.amazonaws.com:9092
Internal and external connections will use
ec2-54-191-84-122.us-west-2.compute.amazonaws.com
. This address can be resolved locally and externally.If external addresses are not locally resolvable, you must configure a listener for Kafka communication within the cloud network and a listener for communication outside of the cloud network.
For communication within the cloud network (VPC), use the internal IP of the virtual machine (or hostname, if DNS is configured). This can be inter-broker communication (i.e. between brokers), and between other components running in the VPC such as Kafka Connect, or third-party clients or producers.
For communication outside of the cloud network, use the external IP of the instance (or hostname, if DNS is configured). This can be testing connectivity from a laptop, or simply from machines not hosted in the cloud provider.
Troubleshooting¶
Explore with kcat¶
You can use kcat (formerly kafkacat) Utility for Confluent Platform to explore the listeners. You can use the metadata list mode
(-L
) to view the metadata for the listener that you are connected to. Using the example above
(LISTENER_BOB / LISTENER_FRED
), here are the entries for broker 0:
Connecting on port 9092 mapped as
LISTENER_FRED
, the broker address is returned aslocalhost
.kafkacat -b kafka0:9092 \ -L
Your output should look like:
Metadata for all topics (from broker -1: kafka0:9092/bootstrap): 1 brokers: broker 0 at localhost:9092
Connecting on port 29092 mapped as
LISTENER_BOB
, the broker address is returned askafka0
.kafkacat -b kafka0:29092 \ -L
Your output should look like:
Metadata for all topics (from broker 0: kafka0:29092/0): 1 brokers: broker 0 at kafka0:29092
You can also use tcpdump
to explore the traffic from a client connecting to the broker, and view the hostname that’s
returned from the broker.
Why can I connect to the broker, but the client still fails?¶
Even if you can make the initial connection to the broker, the address returned in the metadata might still be for a hostname that is not accessible from your client. Here is an example scenario and how to fix this.
You have a broker on AWS and you want to send a message to it from your laptop. You know the external hostname for the EC2 instance (
ec2-54-191-84-122.us-west-2.compute.amazonaws.com
). You have created the necessary entry in the security group to open the broker port to your inbound traffic. Verify that your local machine can connect to the port on the AWS instance with this command:nc -vz ec2-54-191-84-122.us-west-2.compute.amazonaws.com 9092
Your output resembles:
found 0 associations found 1 connections: 1: flags=82<CONNECTED,PREFERRED> outif utun5 src 172.27.230.23 port 53352 dst 54.191.84.122 port 9092 rank info not available TCP aux info available Connection to ec2-54-191-84-122.us-west-2.compute.amazonaws.com port 9092 [tcp/XmlIpcRegSvc] succeeded!
Run this command:
echo "test"|kafka-console-producer --bootstrap-server ec2-54-191-84-122.us-west-2.compute.amazonaws.com:9092 --topic test
Your laptop resolves
ec2-54-191-84-122.us-west-2.compute.amazonaws.com
successfully to the IP address54.191.84.122
, and connects to the AWS machine on port 9092.The broker receives the inbound connection on port 9092. It returns the metadata to the client, with the hostname
ip-172-31-18-160.us-west-2.compute.internal
because this is the hostname of the broker and the default value for listeners.The client the tries to send data to the broker using the metadata it was given.
echo "test"|kafka-console-producer --bootstrap-server ec2-54-191-84-122.us-west-2.compute.amazonaws.com:9092 --topic test
Since
ip-172-31-18-160.us-west-2.compute.internal
is not resolvable from the internet, it fails.>>[2018-07-30 15:08:41,932] ERROR Error when sending message to topic test with key: null, value: 4 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback) org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for test-0: 1547 ms has passed since batch creation plus linger time
Try the same thing from the broker machine itself:
echo "foo"|kafka-console-producer --bootstrap-server ec2-54-191-84-122.us-west-2.compute.amazonaws.com:9092 --topic test >>
kafka-console-consumer --bootstrap-server ec2-54-191-84-122.us-west-2.compute.amazonaws.com:9092 --topic test --from-beginning foo
This is successful because you are connecting to port 9092. This port is configured as the internal listener and reports back its hostname as
ip-172-31-18-160.us-west-2.compute.internal
which is resolvable from the broker machine because it’s its own hostname.Use the kafkacat
-L
flag to see the metadata returned by the broker:kafkacat -b ec2-54-191-84-122.us-west-2.compute.amazonaws.com:9092 -L
Your output resembles:
Metadata for all topics (from broker -1: ec2-54-191-84-122.us-west-2.compute.amazonaws.com:9092/bootstrap): 1 brokers: broker 0 at ip-172-31-18-160.us-west-2.compute.internal:9092
The internal hostname is returned. Using kafkacat in producer mode (
-C
) from your local machine, try and read from the topic.kafkacat -b ec2-54-191-84-122.us-west-2.compute.amazonaws.com:9092 -C -t test
Because you’re getting the internal listener hostname back from the broker in the metadata, the client cannot resolve that hostname to read/write from.
% ERROR: Local: Host resolution failure: ip-172-31-18-160.us-west-2.compute.internal:9092/0: Failed to resolve 'ip-172-31-18-160.us-west-2.compute.internal:9092': nodename nor servname provided, or not known