マルチノード環境の構成¶
このトピックでは、Docker やクラウドプロバイダーでマルチノード Apache Kafka® 環境を構成する方法を示します。
Kafka は分散システムであり、パーティションリーダーに対してデータの読み書きが実行されます。リーダーは、クラスター内のどのブローカーでも設定できます。クライアント(プロデューサーまたはコンシューマー)が開始するとき、パーティションのリーダーであるブローカーを識別するためのメタデータを要求します。このメタデータの要求はどのブローカーでも処理できます。返されるメタデータには、対象パーティションのリードブローカーで利用可能なエンドポイントが含まれます。クライアントはそれらのエンドポイントを使用して、ブローカーに接続し、必要に応じてデータの読み書きを実行します。

Kafka では、ブローカーが互いに通信する方法、外部クライアント(プロデューサーやコンシューマー)がブローカーに到達する方法を把握する必要があります。必要なホストや IP アドレスは、初期接続でブローカーが返すデータに基づいて決定されます(たとえば、単一ノードの場合、返されるブローカーは接続先と同じです)。
Kafka ブローカーには複数のリスナーを設定できます。リスナーはホストまたは IP、ポートおよびプロトコルの組み合わせで表されます。次に複数リスナーがある Docker 構成の例を示します。
KAFKA_LISTENERS: LISTENER_BOB://kafka0:29092,LISTENER_FRED://localhost:9092
KAFKA_ADVERTISED_LISTENERS: LISTENER_BOB://kafka0:29092,LISTENER_FRED://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_BOB:PLAINTEXT,LISTENER_FRED:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_BOB
- KAFKA_LISTENERS
Kafka がバインドしリッスンする、リスナー、ホストまたは IP、ポートのコンマ区切りのリスト。複雑なネットワーク構成では、これがマシンのネットワークインターフェイスに関連付けられた IP アドレスになることがあります。デフォルトは
0.0.0.0
で、すべてのインターフェイスをリッスンすることを表します。これは、サーバープロパティファイル(<path-to-confluent>/etc/kafka/server.properties
)のlisteners
構成パラメーターに相当します。マルチノード環境(本稼働環境)では、Dockerfile の
KAFKA_ADVERTISED_LISTENERS
プロパティを外部のホストまたは IP アドレスに設定する必要があります。設定しない場合、デフォルトでは、クライアントは内部のホストアドレスに接続を試みます。"ベアメタル"(VM なし、Docker なし)を実行する単一ノード環境では、すべてがホスト名、つまり単にlocalhost
となります。ただし、複数ノードなど複雑なネットワークになると、追加の構成が必要です。- KAFKA_ADVERTISED_LISTENERS
- リスナーのコンマ区切りリストで、ホストまたは IP およびポートを指定します。メタデータとしてクライアントに送り返されます。これはサーバープロパティファイル(
<path-to-confluent>/etc/kafka/server.properties
)のadvertised.listeners
構成パラメーターに相当します。 - KAFKA_LISTENER_SECURITY_PROTOCOL_MAP
- 使用するセキュリティプロトコルのキーと値のペアを、リスナー名ごとに定義します。これはサーバープロパティファイル(
<path-to-confluent>/etc/kafka/server.properties
)のlistener.security.protocol.map
構成パラメーターに相当します。 - KAFKA_INTER_BROKER_LISTENER_NAME
- ブローカー間通信に使用する対象のリスナーを定義します。Kafka ブローカーはそれぞれのブローカー間で、一般的には内部ネットワーク(Docker ネットワーク、AWS VPC など)上で通信します。このホストまたは IP でブローカーマシンから他のマシンにアクセスできる必要があります。これはサーバープロパティファイル(
<path-to-confluent>/etc/kafka/server.properties
)のinter.broker.listener.name
構成パラメーターに相当します。
Kafka クライアントがブローカーネットワークのローカルにない場合、追加のリスナーが必要です。各リスナーは、ブローカーに到達できるアドレスをレポートします。このブローカーアドレスは使用するネットワークで決まります。たとえば、内部ネットワークからブローカーに接続する場合、ホストまたは IP は、外部から接続する場合とは異なります。
Docker 上の Kafka への接続¶
Docker の内部ネットワークとホストマシンで Kafka を実行している場合、Docker ネットワーク内の Kafka 通信用のリスナーと、非 Docker ネットワークトラフィック用のリスナーを構成する必要があります。
- Docker ネットワーク内の通信では、Docker コンテナのホスト名を使用します。同じ Docker ネットワークの各 Docker コンテナは、Kafka ブローカーコンテナのホスト名を使用してアクセスします。これには、ブローカー間(つまりブローカー同士の)通信、Kafka Connect など Docker で動作する他のコンポーネント間の通信、またはサードパーティクライアントやプロデューサーが該当します。
- Docker ネットワークの外部の通信には、localhost を使用します。前提としてクライアントは、Docker コンテナから公開されているポートに localhost から接続します。たとえば、Docker ホストマシンでローカルに動作しているクライアントが該当します。
次の Docker 構成のスニペットを例として使用できます。
kafka0:
image: "confluentinc/cp-enterprise-kafka:7.1.1"
ports:
- '9092:9092'
depends_on:
- zookeeper
environment:
KAFKA_ADVERTISED_LISTENERS: LISTENER_BOB://kafka0:29092,LISTENER_FRED://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_BOB:PLAINTEXT,LISTENER_FRED:PLAINTEXT
[…]
- Docker ネットワーク内のクライアントは、リスナー
BOB
、ポート29092
、ホスト名kafka0
で接続します。この構成では、クライアントは接続先のホスト名kafka0
を受信します。各 Docker コンテナは Docker の内部ネットワークを使用して、kafka0
を解決し、このブローカーに到達できます。 - Docker ネットワークの外部にあるクライアントは、リスナー
FRED
、ポート9092
、ホスト名localhost
で接続します。ポート 9092 は Docker コンテナで公開され、接続先として利用できます。この構成では、クライアントは、データを読み書きするために接続するホスト名としてlocalhost
を受信します。
重要
この構成は、Docker およびホストマシンにとっての外部クライアントが接続しようとする環境では機能しません。これは、kafka0
(内部 Docker ホスト名)および localhost(Docker ホストマシンのループバックアドレス)は解決できないためです。
クラウドプロバイダーにある Kafka への接続¶
Kafka を、クラウドプロバイダー(AWS など)と合わせてローカルにオンプレミスのマシンで実行している場合、あるいは別のクラウドと実行している場合、クラウドネットワーク内の Kafka 通信用のリスナーとクラウド外ネットワークトラフィック用のリスナーを構成する必要があります。
外部ホスト名を内部で解決できるかどうかで、構成方式を選択します。
外部のホスト名を内部で解決できる場合は、単一のリスナーを使用できます。PLAINTEXT というデフォルトリスナーをアドバタイズホスト名(つまりインバウンドクライアントに渡されるホスト名)に設定します。
advertised.listeners=PLAINTEXT://ec2-54-191-84-122.us-west-2.compute.amazonaws.com:9092
内部接続と外部接続で
ec2-54-191-84-122.us-west-2.compute.amazonaws.com
を使用します。このアドレスはローカルでも、外部でも解決できます。外部アドレスがローカルに解決できない場合、クラウドネットワーク内の Kafka 通信用のリスナーとクラウドネットワーク外の通信用のリスナーを構成する必要があります。
クラウドネットワーク(VPC)内の通信では、仮想マシンの内部 IP (DNS が構成されている場合はホスト名)を使用します。これには、ブローカー間(つまりブローカー同士の)通信、Kafka Connect など VPC で動作する他のコンポーネント間の通信、またはサードパーティクライアントやプロデューサーなどが該当します。
クラウドネットワークの外部との通信では、インスタンスの外部 IP(DNS が構成されている場合はホスト名)を使用します。この接続は、ノート PC やクラウドプロバイダーでホストされていないマシンからテストできます。
トラブルシューティング¶
kcat での調査¶
各リスナーを調査するには、kcat (旧 kafkacat)ユーティリティ を使用できます。メタデータリストモード(-L
)を使用すると、接続先であるリスナーのメタデータを表示できます。前に示した例を使用して(LISTENER_BOB / LISTENER_FRED
)、以下にブローカー 0 のエントリを示します。
LISTENER_FRED
にマップされたポート 9092 に接続すると、ブローカーアドレスにlocalhost
が返されます。kafkacat -b kafka0:9092 \ -L
出力は以下のようになります。
Metadata for all topics (from broker -1: kafka0:9092/bootstrap): 1 brokers: broker 0 at localhost:9092
LISTENER_BOB
にマップされたポート 29092 に接続すると、ブローカーアドレスにkafka0
が返されます。kafkacat -b kafka0:29092 \ -L
出力は以下のようになります。
Metadata for all topics (from broker 0: kafka0:29092/0): 1 brokers: broker 0 at kafka0:29092
さらに tcpdump
を使用して、ブローカーに接続しているクライアントからのトラフィックを調べ、ブローカーから返されたホスト名を表示することもできます。
ブローカーへの接続は確立されているのに、クライアントから接続できない理由¶
ブローカーへの初期接続に成功した場合でも、メタデータで返されたアドレスが、クライアントからアクセスできないホスト名である可能性があります。サンプルのシナリオと解決方法を示します。
AWS にブローカーがあり、ノート PC からそこにメッセージを送信することを考えます。EC2 インスタンスの外部ホスト名(
ec2-54-191-84-122.us-west-2.compute.amazonaws.com
)を把握しています。セキュリティグループに必須エントリを作成し、ブローカーポートをインバウンドトラフィックに対して開きました。次のコマンドで AWS インスタンスのポートにローカルマシンから接続できるかどうかを確認します。nc -vz ec2-54-191-84-122.us-west-2.compute.amazonaws.com 9092
出力は以下のようになります。
found 0 associations found 1 connections: 1: flags=82<CONNECTED,PREFERRED> outif utun5 src 172.27.230.23 port 53352 dst 54.191.84.122 port 9092 rank info not available TCP aux info available Connection to ec2-54-191-84-122.us-west-2.compute.amazonaws.com port 9092 [tcp/XmlIpcRegSvc] succeeded!
次のコマンドを実行します。
echo "test"|kafka-console-producer --broker-list ec2-54-191-84-122.us-west-2.compute.amazonaws.com:9092 --topic test
ノート PC は
ec2-54-191-84-122.us-west-2.compute.amazonaws.com
を IP アドレス54.191.84.122
に正しく解決し、AWS マシンのポート 9092 に接続します。ブローカーはポート 9092 でインバウンド接続を受信します。ホスト名
ip-172-31-18-160.us-west-2.compute.internal
のメタデータをクライアントに返します。これはブローカーのホスト名で、リスナー向けのデフォルト値であるからです。クライアントは受け取ったメタデータを使用して、ブローカーへデータの送信を試みます。
echo "test"|kafka-console-producer --broker-list ec2-54-191-84-122.us-west-2.compute.amazonaws.com:9092 --topic test
ip-172-31-18-160.us-west-2.compute.internal
はインターネットで解決できないので、接続に失敗します。>>[2018-07-30 15:08:41,932] ERROR Error when sending message to topic test with key: null, value: 4 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback) org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for test-0: 1547 ms has passed since batch creation plus linger time
ブローカーマシン自体で同じことを試します。
echo "foo"|kafka-console-producer --broker-list ec2-54-191-84-122.us-west-2.compute.amazonaws.com:9092 --topic test >>
kafka-console-consumer --bootstrap-server ec2-54-191-84-122.us-west-2.compute.amazonaws.com:9092 --topic test --from-beginning foo
この場合は、ポート 9092 に接続しているので成功します。このポートは内部リスナーとして構成され、
ip-172-31-18-160.us-west-2.compute.internal
というホスト名を返します。これはブローカーマシン自体のホスト名であるため、ブローカーマシンから解決できます。kafkacat の
-L
フラグを使用して、ブローカーから返されるメタデータを表示します。kafkacat -b ec2-54-191-84-122.us-west-2.compute.amazonaws.com:9092 -L
出力は以下のようになります。
Metadata for all topics (from broker -1: ec2-54-191-84-122.us-west-2.compute.amazonaws.com:9092/bootstrap): 1 brokers: broker 0 at ip-172-31-18-160.us-west-2.compute.internal:9092
内部ホスト名が返されます。ローカルマシンから kafkacat をプロデューサーモード(
-C
)で使用して、トピックの読み取りを試します。kafkacat -b ec2-54-191-84-122.us-west-2.compute.amazonaws.com:9092 -C -t test
ブローカーから返されたメタデータから得られるのは内部リスナーのホスト名であるため、クライアントは読み書きする対象としてそのホスト名を解決できません。
% ERROR: Local: Host resolution failure: ip-172-31-18-160.us-west-2.compute.internal:9092/0: Failed to resolve 'ip-172-31-18-160.us-west-2.compute.internal:9092': nodename nor servname provided, or not known
おすすめの関連情報¶
ブログ記事: 「Why Can't I Connect to Kafka?| Troubleshoot Connectivity」