.. _cp-multi-node: Configure a Multi-Node Environment with Docker ############################################## This topic demonstrates how to configure a multi-node |ak-tm| environment with Docker and cloud providers. |ak| is a distributed system and data is read from and written to the partition leader. The leader can be on any broker in a cluster. When a client (producer or consumer) starts, it will request metadata about which broker is the leader for a partition. This request for metadata can come from any broker. The metadata that is returned will include the available endpoints for the lead broker of that partition. The client will use those endpoints to connect to the broker to read or write data as required. .. figure:: ../images/multi-node-1.png |ak| needs to know how the brokers can communicate with each other, and how external clients (producers and consumers) can reach the broker. The required host and IP address is determined based on the data that the broker passes back in the initial connection (e.g. if it’s a single node, the broker returned is the same as the one connected to). |ak| brokers can have multiple listeners. A listener is a combination of Host/IP, Port, and Protocol. Following is an example Docker configuration of multiple listeners for |kraft| mode: :: KAFKA_LISTENERS: CONTROLLER://kafka0:29093,LISTENER_BOB://kafka0:29092,LISTENER_FRED://localhost:9092 KAFKA_ADVERTISED_LISTENERS: LISTENER_BOB://kafka0:29092,LISTENER_FRED://localhost:9092 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,LISTENER_BOB:PLAINTEXT,LISTENER_FRED:PLAINTEXT KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_BOB KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER KAFKA_LISTENERS A comma-separated list of listeners, host/IP, and port that |ak| binds and listens to. For more complex networking, this can be an IP address that is associated with a network interface on a machine. The default is ``0.0.0.0``, which means listening on all interfaces. This is equivalent to the ``listeners`` configuration parameter in the server properties file (For |zk| mode: ``CONFLUENT_HOME/etc/kafka/server.properties`` or |kraft| mode: ``CONFLUENT_HOME/etc/kafka/kraft/server.properties``). In a multi-node (production) environment running in |zk| mode, you must set the ``KAFKA_ADVERTISED_LISTENERS`` property in your Dockerfile to the external host/IP address. Otherwise, by default, clients will attempt to connect to the internal host address. In a single-node environment, running "bare metal" (no VMs, no Docker) everything might be the hostname or simply ``localhost``. However, more complex networking setups, such as multiple nodes, require additional configuration. KAFKA_ADVERTISED_LISTENERS A comma-separated list of listeners with their the host/IP and port. This is the metadata that is passed back to clients. This is equivalent to the ``advertised.listeners`` configuration parameter in the server properties file. KAFKA_CONTROLLER_LISTENER_NAMES In |kraft| mode, A comma-separated list of the names of the listeners used by the controller. On a node with ``process.roles=broker``, only the first listener in the list will be used by the broker. |zk|-mode brokers should not set this value. For |kraft| controllers in isolated or combined mode, the node will listen as a |kraft| controller on all listeners that are listed for this property, and each must appear in the ``listeners`` property. KAFKA_LISTENER_SECURITY_PROTOCOL_MAP Defines key/value pairs for the security protocol to use, per listener name. This is equivalent to the ``listener.security.protocol.map`` configuration parameter in the server properties file. KAFKA_INTER_BROKER_LISTENER_NAME Defines which listener to use for inter-broker communication. |ak| brokers communicate between themselves, usually on the internal network (e.g. Docker network, AWS VPC, etc). The host/IP must be accessible from the broker machine to others. This is equivalent to the ``inter.broker.listener.name`` configuration parameter in the server properties file. If |ak| clients are not local to the broker’s network, additional listeners are required. Each listener will report the address where it can be reached. The broker address depends on the network used. For example, if you’re connecting to the broker from an internal network, the host/IP is different than when connecting externally. Connecting to |ak| on Docker ***************************** If you are running |ak| on Docker internal networks plus a host machine, you must configure a listener for |ak| communication within the Docker network and a listener for non-Docker network traffic. - For communication within the Docker network, use the hostname of the Docker containers. Each Docker container on the same Docker network will use the hostname of the |ak| broker container to reach it. This could be inter-broker communication (i.e. between brokers), between other components running in Docker such as |kconnect-long|, or third-party clients or producers. - For communication outside of the Docker network, use localhost. The assumption is that the clients will connect on localhost, to a port exposed from the Docker container. For example, clients running local on the Docker host machine. You can use the following Docker compose snippet as an example for |ak| in |zk| mode. .. tabs:: .. tab:: |kraft| mode Note that for brevity, this example shows |kraft| combined mode, which is not supported for production workloads. For more about configuring |kraft|, see :ref:`configure-kraft`. .. codewithvars:: kafka0: image: confluentinc/cp-kafka:|release| hostname: kafka0 ports: - "9092:9092" environment: KAFKA_NODE_ID: 1 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,LISTENER_BOB:PLAINTEXT,LISTENER_FRED:PLAINTEXT' KAFKA_ADVERTISED_LISTENERS: 'LISTENER_BOB://kafka0:29092,LISTENER_FRED://localhost:9092' KAFKA_PROCESS_ROLES: 'broker,controller' KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka0:29093' KAFKA_LISTENERS: 'CONTROLLER://kafka0:29093,LISTENER_BOB://kafka0:29092,LISTENER_FRED://localhost:9092' KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER' KAFKA_INTER_BROKER_LISTENER_NAME: 'LISTENER_BOB' # Replace CLUSTER_ID with a unique base64 UUID using "bin/kafka-storage.sh random-uuid" # See https://docs.confluent.io/kafka/operations-tools/kafka-tools.html#kafka-storage-sh CLUSTER_ID: 'MkU3OEVBNTcwNTJENDM2Qk' [...] .. tab:: |zk| mode .. include:: ../includes/zk-deprecation.rst .. codewithvars:: docker kafka0: image: confluentinc/cp-kafka:|release| ports: - "9092:9092" depends_on: - zookeeper environment: KAFKA_ADVERTISED_LISTENERS: LISTENER_BOB://kafka0:29092,LISTENER_FRED://localhost:9092 KAFKA_INTER_BROKER_LISTENER_NAME: 'LISTENER_BOB' KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_BOB:PLAINTEXT,LISTENER_FRED:PLAINTEXT [...] - Clients within the Docker network connect using listener ``LISTENER_BOB``, with port ``29092`` and hostname ``kafka0``. With this configuration, the clients will receive the hostname ``kafka0`` to connect to. Each Docker container will resolve ``kafka0`` using Docker's internal network, and be able to reach the broker. - Clients external to the Docker network connect using listener ``LISTENER_FRED``, with port ``9092`` and hostname ``localhost``. Port 9092 is exposed by the Docker container and is available to connect to. With this configuration, the clients receive the hostname ``localhost`` to connect to for reading and writing data. - For |kraft| mode, controllers use listener ``CONTROLLER``. .. important:: This configuration will not work for environments where a client external to Docker and external to the host machine wants to connect. This is because neither ``kafka0`` (the internal Docker hostname) or localhost (the loopback address for the Docker host machine) would be resolvable. Connecting to |ak| on a cloud provider *************************************** If you are running |ak| on a cloud provider (e.g. |aws|) and on-premises machines locally or in another cloud, you must configure a listener for |ak| communication within the cloud network and a listener for non-cloud network traffic. Choose your configuration method, depending on whether external hostnames are internally resolvable. - If external hostnames are internally resolvable, you can use a single listener. Set the default listener, called PLAINTEXT, to the advertised hostname (i.e. the hostname passed to inbound clients): :: advertised.listeners=PLAINTEXT://ec2-54-191-84-122.us-west-2.compute.amazonaws.com:9092 Internal and external connections will use ``ec2-54-191-84-122.us-west-2.compute.amazonaws.com``. This address can be resolved locally and externally. .. figure:: ../images/multi-node-2.png - If external addresses are not locally resolvable, you must configure a listener for |ak| communication within the cloud network and a listener for communication outside of the cloud network. - For communication within the cloud network (VPC), use the internal IP of the virtual machine (or hostname, if DNS is configured). This can be inter-broker communication (i.e. between brokers), and between other components running in the VPC such as |kconnect-long|, or third-party clients or producers. - For communication outside of the cloud network, use the external IP of the instance (or hostname, if DNS is configured). This can be testing connectivity from a laptop, or simply from machines not hosted in the cloud provider. .. figure:: ../images/multi-node-3.png Troubleshooting *************** ******************* Explore with |kcat| ******************* You can use :ref:`kafkacat-usage` to explore the listeners. You can use the metadata list mode (``-L``) to view the metadata for the listener that you are connected to. Using the example above (``LISTENER_BOB / LISTENER_FRED``), here are the entries for broker 0: - Connecting on port 9092 mapped as ``LISTENER_FRED``, the broker address is returned as ``localhost``. :: kafkacat -b kafka0:9092 \ -L Your output should look like: :: Metadata for all topics (from broker -1: kafka0:9092/bootstrap): 1 brokers: broker 0 at localhost:9092 - Connecting on port 29092 mapped as ``LISTENER_BOB``, the broker address is returned as ``kafka0``. :: kafkacat -b kafka0:29092 \ -L Your output should look like: :: Metadata for all topics (from broker 0: kafka0:29092/0): 1 brokers: broker 0 at kafka0:29092 You can also use ``tcpdump`` to explore the traffic from a client connecting to the broker, and view the hostname that’s returned from the broker. ************************************************************ Why can I connect to the broker, but the client still fails? ************************************************************ Even if you can make the initial connection to the broker, the address returned in the metadata might still be for a hostname that is not accessible from your client. Here is an example scenario and how to fix this. #. You have a broker on |aws| and you want to send a message to it from your laptop. You know the external hostname for the EC2 instance (``ec2-54-191-84-122.us-west-2.compute.amazonaws.com``). You have created the necessary entry in the security group to open the broker port to your inbound traffic. Verify that your local machine can connect to the port on the |aws| instance with this command: :: nc -vz ec2-54-191-84-122.us-west-2.compute.amazonaws.com 9092 Your output resembles: :: found 0 associations found 1 connections: 1: flags=82 outif utun5 src 172.27.230.23 port 53352 dst 54.191.84.122 port 9092 rank info not available TCP aux info available Connection to ec2-54-191-84-122.us-west-2.compute.amazonaws.com port 9092 [tcp/XmlIpcRegSvc] succeeded! #. Run this command: :: echo "test"|kafka-console-producer --broker-list ec2-54-191-84-122.us-west-2.compute.amazonaws.com:9092 --topic test #. Your laptop resolves ``ec2-54-191-84-122.us-west-2.compute.amazonaws.com`` successfully to the IP address ``54.191.84.122``, and connects to the |aws| machine on port 9092. #. The broker receives the inbound connection on port 9092. It returns the metadata to the client, with the hostname ``ip-172-31-18-160.us-west-2.compute.internal`` because this is the hostname of the broker and the default value for listeners. #. The client the tries to send data to the broker using the metadata it was given. :: echo "test"|kafka-console-producer --broker-list ec2-54-191-84-122.us-west-2.compute.amazonaws.com:9092 --topic test Since ``ip-172-31-18-160.us-west-2.compute.internal`` is not resolvable from the internet, it fails. :: >>[2018-07-30 15:08:41,932] ERROR Error when sending message to topic test with key: null, value: 4 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback) org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for test-0: 1547 ms has passed since batch creation plus linger time #. Try the same thing from the broker machine itself: :: echo "foo"|kafka-console-producer --broker-list ec2-54-191-84-122.us-west-2.compute.amazonaws.com:9092 --topic test >> :: kafka-console-consumer --bootstrap-server ec2-54-191-84-122.us-west-2.compute.amazonaws.com:9092 --topic test --from-beginning foo This is successful because you are connecting to port 9092. This port is configured as the internal listener and reports back its hostname as ``ip-172-31-18-160.us-west-2.compute.internal`` which is resolvable from the broker machine because it’s its own hostname. #. Use the kafkacat ``-L`` flag to see the metadata returned by the broker: :: kafkacat -b ec2-54-191-84-122.us-west-2.compute.amazonaws.com:9092 -L Your output resembles: :: Metadata for all topics (from broker -1: ec2-54-191-84-122.us-west-2.compute.amazonaws.com:9092/bootstrap): 1 brokers: broker 0 at ip-172-31-18-160.us-west-2.compute.internal:9092 The internal hostname is returned. Using kafkacat in producer mode (``-C``) from your local machine, try and read from the topic. :: kafkacat -b ec2-54-191-84-122.us-west-2.compute.amazonaws.com:9092 -C -t test Because your’re getting the internal listener hostname back from the broker in the metadata, the client cannot resolve that hostname to read/write from. :: % ERROR: Local: Host resolution failure: ip-172-31-18-160.us-west-2.compute.internal:9092/0: Failed to resolve 'ip-172-31-18-160.us-west-2.compute.internal:9092': nodename nor servname provided, or not known Related content *************** - Blog post: `Why Can't I Connect to Kafka? | Troubleshoot Connectivity `_ - `Deploy with Ansible Playbooks `_ - `Deploy with Confluent for Kubernetes `_