Configure a Multi-Node Confluent Platform Environment with Docker

This topic demonstrates how to configure a multi-node Apache Kafka® environment with Docker and cloud providers.

Kafka is a distributed system and data is read from and written to the partition leader. The leader can be on any broker in a cluster. When a client (producer or consumer) starts, it will request metadata about which broker is the leader for a partition. This request for metadata can come from any broker. The metadata that is returned will include the available endpoints for the lead broker of that partition. The client will use those endpoints to connect to the broker to read or write data as required.

../_images/multi-node-1.png

Kafka needs to know how the brokers can communicate with each other, and how external clients (producers and consumers) can reach the broker. The required host and IP address is determined based on the data that the broker passes back in the initial connection (e.g. if it’s a single node, the broker returned is the same as the one connected to).

Kafka brokers can have multiple listeners. A listener is a combination of Host/IP, Port, and Protocol. Following is an example Docker configuration of multiple listeners for KRaft mode:

KAFKA_LISTENERS: CONTROLLER://kafka0:29093,LISTENER_BOB://kafka0:29092,LISTENER_FRED://localhost:9092
KAFKA_ADVERTISED_LISTENERS: LISTENER_BOB://kafka0:29092,LISTENER_FRED://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,LISTENER_BOB:PLAINTEXT,LISTENER_FRED:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_BOB
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_LISTENERS

A comma-separated list of listeners, host/IP, and port that Kafka binds and listens to. For more complex networking, this can be an IP address that is associated with a network interface on a machine. The default is 0.0.0.0, which means listening on all interfaces. This is equivalent to the listeners configuration parameter in the server properties file (For ZooKeeper mode: CONFLUENT_HOME/etc/kafka/server.properties or KRaft mode: CONFLUENT_HOME/etc/kafka/kraft/server.properties).

In a multi-node (production) environment running in ZooKeeper mode, you must set the KAFKA_ADVERTISED_LISTENERS property in your Dockerfile to the external host/IP address. Otherwise, by default, clients will attempt to connect to the internal host address. In a single-node environment, running “bare metal” (no VMs, no Docker) everything might be the hostname or simply localhost. However, more complex networking setups, such as multiple nodes, require additional configuration.

KAFKA_ADVERTISED_LISTENERS
A comma-separated list of listeners with their the host/IP and port. This is the metadata that is passed back to clients. This is equivalent to the advertised.listeners configuration parameter in the server properties file.
KAFKA_CONTROLLER_LISTENER_NAMES
In KRaft mode, A comma-separated list of the names of the listeners used by the controller. On a node with process.roles=broker, only the first listener in the list will be used by the broker. ZooKeeper-mode brokers should not set this value. For KRaft controllers in isolated or combined mode, the node will listen as a KRaft controller on all listeners that are listed for this property, and each must appear in the listeners property.
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP
Defines key/value pairs for the security protocol to use, per listener name. This is equivalent to the listener.security.protocol.map configuration parameter in the server properties file.
KAFKA_INTER_BROKER_LISTENER_NAME
Defines which listener to use for inter-broker communication. Kafka brokers communicate between themselves, usually on the internal network (e.g. Docker network, AWS VPC, etc). The host/IP must be accessible from the broker machine to others. This is equivalent to the inter.broker.listener.name configuration parameter in the server properties file.

If Kafka clients are not local to the broker’s network, additional listeners are required. Each listener will report the address where it can be reached. The broker address depends on the network used. For example, if you’re connecting to the broker from an internal network, the host/IP is different than when connecting externally.

Connecting to Kafka on Docker

If you are running Kafka on Docker internal networks plus a host machine, you must configure a listener for Kafka communication within the Docker network and a listener for non-Docker network traffic.

  • For communication within the Docker network, use the hostname of the Docker containers. Each Docker container on the same Docker network will use the hostname of the Kafka broker container to reach it. This could be inter-broker communication (i.e. between brokers), between other components running in Docker such as Kafka Connect, or third-party clients or producers.
  • For communication outside of the Docker network, use localhost. The assumption is that the clients will connect on localhost, to a port exposed from the Docker container. For example, clients running local on the Docker host machine.

You can use the following Docker compose snippet as an example for Kafka in ZooKeeper mode.

Note that for brevity, this example shows KRaft combined mode, which is not supported for production workloads. For more about configuring KRaft, see KRaft Configuration for Confluent Platform.

kafka0:
  image: confluentinc/cp-kafka:7.7.1
  hostname: kafka0
  ports:
    - "9092:9092"
  environment:
    KAFKA_NODE_ID: 1
    KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,LISTENER_BOB:PLAINTEXT,LISTENER_FRED:PLAINTEXT'
    KAFKA_ADVERTISED_LISTENERS: 'LISTENER_BOB://kafka0:29092,LISTENER_FRED://localhost:9092'
    KAFKA_PROCESS_ROLES: 'broker,controller'
    KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka0:29093'
    KAFKA_LISTENERS: 'CONTROLLER://kafka0:29093,LISTENER_BOB://kafka0:29092,LISTENER_FRED://localhost:9092'
    KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
    KAFKA_INTER_BROKER_LISTENER_NAME: 'LISTENER_BOB'
    # Replace CLUSTER_ID with a unique base64 UUID using "bin/kafka-storage.sh random-uuid"
    # See https://docs.confluent.io/kafka/operations-tools/kafka-tools.html#kafka-storage-sh
    CLUSTER_ID: 'MkU3OEVBNTcwNTJENDM2Qk'
   [...]
  • Clients within the Docker network connect using listener LISTENER_BOB, with port 29092 and hostname kafka0. With this configuration, the clients will receive the hostname kafka0 to connect to. Each Docker container will resolve kafka0 using Docker’s internal network, and be able to reach the broker.
  • Clients external to the Docker network connect using listener LISTENER_FRED, with port 9092 and hostname localhost. Port 9092 is exposed by the Docker container and is available to connect to. With this configuration, the clients receive the hostname localhost to connect to for reading and writing data.
  • For KRaft mode, controllers use listener CONTROLLER.

Important

This configuration will not work for environments where a client external to Docker and external to the host machine wants to connect. This is because neither kafka0 (the internal Docker hostname) or localhost (the loopback address for the Docker host machine) would be resolvable.

Connecting to Kafka on a cloud provider

If you are running Kafka on a cloud provider (e.g. AWS) and on-premises machines locally or in another cloud, you must configure a listener for Kafka communication within the cloud network and a listener for non-cloud network traffic.

Choose your configuration method, depending on whether external hostnames are internally resolvable.

  • If external hostnames are internally resolvable, you can use a single listener. Set the default listener, called PLAINTEXT, to the advertised hostname (i.e. the hostname passed to inbound clients):

    advertised.listeners=PLAINTEXT://ec2-54-191-84-122.us-west-2.compute.amazonaws.com:9092
    

    Internal and external connections will use ec2-54-191-84-122.us-west-2.compute.amazonaws.com. This address can be resolved locally and externally.

    ../_images/multi-node-2.png
  • If external addresses are not locally resolvable, you must configure a listener for Kafka communication within the cloud network and a listener for communication outside of the cloud network.

    • For communication within the cloud network (VPC), use the internal IP of the virtual machine (or hostname, if DNS is configured). This can be inter-broker communication (i.e. between brokers), and between other components running in the VPC such as Kafka Connect, or third-party clients or producers.

    • For communication outside of the cloud network, use the external IP of the instance (or hostname, if DNS is configured). This can be testing connectivity from a laptop, or simply from machines not hosted in the cloud provider.

      ../_images/multi-node-3.png

Troubleshooting

Explore with kcat

You can use kcat (formerly kafkacat) Utility for Confluent Platform to explore the listeners. You can use the metadata list mode (-L) to view the metadata for the listener that you are connected to. Using the example above (LISTENER_BOB / LISTENER_FRED), here are the entries for broker 0:

  • Connecting on port 9092 mapped as LISTENER_FRED, the broker address is returned as localhost.

    kafkacat -b kafka0:9092 \
            -L
    

    Your output should look like:

    Metadata for all topics (from broker -1: kafka0:9092/bootstrap):
    1 brokers:
      broker 0 at localhost:9092
    
  • Connecting on port 29092 mapped as LISTENER_BOB, the broker address is returned as kafka0.

    kafkacat -b kafka0:29092 \
           -L
    

    Your output should look like:

    Metadata for all topics (from broker 0: kafka0:29092/0):
    1 brokers:
      broker 0 at kafka0:29092
    

You can also use tcpdump to explore the traffic from a client connecting to the broker, and view the hostname that’s returned from the broker.

Why can I connect to the broker, but the client still fails?

Even if you can make the initial connection to the broker, the address returned in the metadata might still be for a hostname that is not accessible from your client. Here is an example scenario and how to fix this.

  1. You have a broker on AWS and you want to send a message to it from your laptop. You know the external hostname for the EC2 instance (ec2-54-191-84-122.us-west-2.compute.amazonaws.com). You have created the necessary entry in the security group to open the broker port to your inbound traffic. Verify that your local machine can connect to the port on the AWS instance with this command:

    nc -vz ec2-54-191-84-122.us-west-2.compute.amazonaws.com 9092
    

    Your output resembles:

    found 0 associations
    found 1 connections:
        1:  flags=82<CONNECTED,PREFERRED>
      outif utun5
      src 172.27.230.23 port 53352
      dst 54.191.84.122 port 9092
      rank info not available
      TCP aux info available
    
    Connection to ec2-54-191-84-122.us-west-2.compute.amazonaws.com port 9092 [tcp/XmlIpcRegSvc] succeeded!
    
  2. Run this command:

    echo "test"|kafka-console-producer --broker-list ec2-54-191-84-122.us-west-2.compute.amazonaws.com:9092 --topic test
    
    1. Your laptop resolves ec2-54-191-84-122.us-west-2.compute.amazonaws.com successfully to the IP address 54.191.84.122, and connects to the AWS machine on port 9092.

    2. The broker receives the inbound connection on port 9092. It returns the metadata to the client, with the hostname ip-172-31-18-160.us-west-2.compute.internal because this is the hostname of the broker and the default value for listeners.

    3. The client the tries to send data to the broker using the metadata it was given.

      echo "test"|kafka-console-producer --broker-list ec2-54-191-84-122.us-west-2.compute.amazonaws.com:9092 --topic test
      

      Since ip-172-31-18-160.us-west-2.compute.internal is not resolvable from the internet, it fails.

      >>[2018-07-30 15:08:41,932] ERROR Error when sending message to topic test with key: null, value: 4 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
      org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for test-0: 1547 ms has passed since batch creation plus linger time
      
  3. Try the same thing from the broker machine itself:

    echo "foo"|kafka-console-producer --broker-list ec2-54-191-84-122.us-west-2.compute.amazonaws.com:9092 --topic test
    >>
    
    kafka-console-consumer --bootstrap-server ec2-54-191-84-122.us-west-2.compute.amazonaws.com:9092 --topic test --from-beginning
    foo
    

    This is successful because you are connecting to port 9092. This port is configured as the internal listener and reports back its hostname as ip-172-31-18-160.us-west-2.compute.internal which is resolvable from the broker machine because it’s its own hostname.

  4. Use the kafkacat -L flag to see the metadata returned by the broker:

    kafkacat -b ec2-54-191-84-122.us-west-2.compute.amazonaws.com:9092 -L
    

    Your output resembles:

    Metadata for all topics (from broker -1: ec2-54-191-84-122.us-west-2.compute.amazonaws.com:9092/bootstrap):
    1 brokers:
      broker 0 at ip-172-31-18-160.us-west-2.compute.internal:9092
    

    The internal hostname is returned. Using kafkacat in producer mode (-C) from your local machine, try and read from the topic.

    kafkacat -b ec2-54-191-84-122.us-west-2.compute.amazonaws.com:9092 -C -t test
    

    Because your’re getting the internal listener hostname back from the broker in the metadata, the client cannot resolve that hostname to read/write from.

    % ERROR: Local: Host resolution failure: ip-172-31-18-160.us-west-2.compute.internal:9092/0: Failed to resolve 'ip-172-31-18-160.us-west-2.compute.internal:9092': nodename nor servname provided, or not known