チュートリアル: クラスターの境界を越えたデータのレプリケーション¶
このガイドでは、2 つの Apache Kafka® クラスターを起動し、続いて Replicator プロセスを開始してそれらのクラスター間でデータをレプリケートする方法について説明します。チュートリアルの目的上、同じマシンで両方のクラスターを実行します。これを行うために、各クラスターが固有のポートとデータディレクトリを持っていることを、あらゆる手段を使って確認します。各クラスターが固有のサーバーを持っている通常の環境で実行している場合、ZooKeeper とブローカーの構成にこれらの変更を加える必要はありません。

Replicator クイックスタート構成¶
ちなみに
インストールの前提条件とコマンド例について
以下の手順では、Confluent Platform、Confluent CLI、Java 1.8 または 1.11(Confluent Platform に必要)がローカルにインストールされていることを前提としています。Confluent Platform を初めて使用する場合は、「Confluent Platform のクイックスタート(ローカルインストール)」を終えてからこのチュートリアルに進んでください。
サーバーと Replicator を起動するためのコマンドでは、Confluent Platform のホームディレクトリから実行するか、Confluent Platform が CLASSPATH に存在することを前提としています。また、特に明記しない限り、以下の例は、インストールされている Confluent Platform のデフォルトの場所にプロパティファイルが存在することを前提としています。これにより、ほとんどの場合、サンプルコマンドを簡単にターミナルに直接コピーして貼り付けることができます。たとえば、以下のコマンドを実行するとします。
./bin/zookeeper-server-start etc/kafka/zookeeper.properties
これは以下と同じになります。
$CONFLUENT_HOME/bin/zookeeper-server-start $CONFLUENT_HOME/etc/kafka/zookeeper.properties
Kafka ブローカーと Confluent Platform コンポーネントのポート¶
このチュートリアルの例では、以下のポート構成が定義されています。
送信元 | 送信先 | |
---|---|---|
Kafka ブローカー | 9082 | 9092 |
ZooKeeper | 2171 | 2181 |
Connect Replicator ワーカー | 8083(送信元から送信先にトピックをコピー) | |
Control Center | 9021 |
送信先クラスターの起動¶
最初に、ZooKeeper サーバーを起動します。このガイドでは、サービスが
localhost
で実行されることを前提としています。以下のコマンドを固有のターミナルで実行して、ZooKeeper を起動します。
./bin/zookeeper-server-start etc/kafka/zookeeper.properties
次に、単一ノード Kafka クラスターとして動作する Kafka ブローカーを起動します。
以下のコマンドを固有のターミナルで実行して Kafka を起動します。
./bin/kafka-server-start etc/kafka/server.properties
これらのサービスを稼働させる方法の詳細については、Confluent Platform の クイックスタート の説明を参照してください。
注釈
送信先クラスターは、送信元クラスターと同じ(またはそれ以降の)バージョンの Confluent Platform を実行している必要があります。Replicator は、送信先クラスターにリンクされている Connect クラスター内で実行され、送信元クラスターからメッセージを読み取ります。そのため、送信先が古いバージョンを実行している場合、Replicator はメッセージフォーマットを解釈できなくなります。
送信元クラスターの起動¶
送信先クラスターはデフォルトポートで実行するように構成しましたが、競合を回避するために、送信元クラスターは別のポートで実行する必要があります。ポートマッピング で示したように、送信元クラスター内の Kafka ブローカーはポート 9082 上で構成されており、ZooKeeper は 2171 上で構成されています。構成ファイルを一時的な場所にコピーし、送信先クラスターと競合しないようにそれらのポートを変更します。
$CONFLUENT_HOME
の下にチュートリアルのプロパティファイル用の "クイックスタート" ディレクトリ(例:my-examples/
)を作成します。ちなみに
$CONFLUENT_HOME
は<path-to-confluent>
を表します。Linux システムでは、この表記をシェル環境変数の設定に使用できます。- このセクションの例では、
$CONFLUENT_HOME
からコマンドを実行し、my-examples/
を使用してプロパティファイルを保存することを前提としています。このモデルに従うと、コードブロックに対してコピーアンドペーストを使用できます。
構成ファイルを
my-examples
(または選択したディレクトリ)にコピーします。cp etc/kafka/zookeeper.properties my-examples/zookeeper_origin.properties
cp etc/kafka/server.properties my-examples/server_origin.properties
ポート番号をアップデートします。
sed -i '' -e "s/2181/2171/g" my-examples/zookeeper_origin.properties sed -i '' -e "s/9092/9082/g" my-examples/server_origin.properties sed -i '' -e "s/2181/2171/g" my-examples/server_origin.properties sed -i '' -e "s/#listeners/listeners/g" my-examples/server_origin.properties sed -i '' -e "s/8090/8091/g" my-examples/server_origin.properties sed -i '' -e "s/#confluent.metadata.server.listeners/confluent.metadata.server.listeners/g" my-examples/server_origin.properties
送信元のブローカー ID をアップデートします。
sed -i '' -e "s/broker.id=0/broker.id=1/g" my-examples/server_origin.properties
ちなみに
ブローカーは 2 つの異なるクラスターに存在するため、これは常に必要というわけではありませんが、コマンドラインから Kafka でこれらを管理する場合、一意のブローカー ID を設定しておくと便利です。
データディレクトリをアップデートします。
sed -i '' -e "s/zookeeper/zookeeper_origin/g" my-examples/zookeeper_origin.properties sed -i '' -e "s/kafka-logs/kafka-logs-origin/g" my-examples/server_origin.properties
ここで、送信元クラスターを起動できます。
以下のコマンドを固有のターミナルで実行して ZooKeeper を起動します。
./bin/zookeeper-server-start my-examples/zookeeper_origin.properties
以下のコマンドを固有のターミナルで実行して Kafka を起動します。
./bin/kafka-server-start my-examples/server_origin.properties
トピックの作成¶
新しいコマンドウィンドウを開き、Kafka コマンドを実行します。
以下のコマンドを使用して、送信元クラスターで "test-topic" という名前のトピックを作成します。
kafka-topics --create --topic test-topic --replication-factor 1 --partitions 1 --bootstrap-server localhost:9082
ちなみに
環境によっては、Kafka コマンドで .sh
拡張子を使用する必要が生じる場合があります(例: kafka-topics.sh
)。
トピックが正常に作成されたことを以下のように確認します。
kafka-topics --list --bootstrap-server localhost:9082
出力は以下のようになります(_confluent
トピックは内部トピックです)。
__confluent.support.metrics
_confluent-command
test-topic
Replicator を構成して実行する場合、前述したように定義した構成を使用して、この "test-topic" が送信先クラスター(ポート 2181
)にレプリケートされます。この例では便宜上、テストトピックは 1 つのパーティションで作成しています。Replicator では、任意の数のトピックとパーティションに設定できます。
Replicator の構成および実行¶
Confluent Replicator は、実行可能ファイルとして実行するか、Kafka Connect フレームワーク内のコネクターとして実行できます。このクイックスタートでは、Replicator を実行可能ファイルとして起動します。
コンシューマー、プロデューサー、および Replicator 構成ファイルの作成¶
Replicator 実行可能スクリプトでは、以下の 3 つの構成ファイルを使用します。
- 送信元クラスター用の構成
- 送信先クラスター用の構成
- Replicator の構成
ちなみに
これらの構成ファイルは、選択した任意のディレクトリに配置できますが、Confluent Platform に付属のデフォルトプロパティファイルとの競合を回避するために、以下の手順では $CONFLUENT_HOME/my-examples/
のパスに配置することを前提とします。(producer.properties
と consumer.properties
のデフォルト構成は etc/kafka/
に存在し、replicator.properties
のデフォルト構成は etc/kafka-connect-replicator/
に存在します)。
以下のファイルを $CONFLUENT_HOME/my-examples/
に作成します。
consumer.properties
という名前の新しいファイルで送信元クラスターを構成します。このファイルを編集し、送信元 クラスターのブローカーのアドレスが含まれていることを確認します。デフォルトのブローカーリストは、先ほど起動した送信元クラスターと一致します。# Origin cluster connection configuration bootstrap.servers=localhost:9082
producer.properties
という名前の新しいファイルで送信先クラスターを構成します。このファイルを編集し、送信先 クラスターのブローカーのアドレスが含まれていることを確認します。デフォルトのブローカーリストは、先ほど起動した送信先クラスターと一致します。# Destination cluster connection configuration bootstrap.servers=localhost:9092
Connect ワーカー用の
replication.properties
という名前の新しいファイルで Replicator 構成を定義します。このクイックスタートではtopic.rename.format
の構成を示しますが、接続に関連しない任意の Confluent Replicator 構成プロパティ をこのファイルに指定できます。# Replication configuration topic.rename.format=${topic}.replica replication.factor=1 config.storage.replication.factor=1 offset.storage.replication.factor=1 status.storage.replication.factor=1 confluent.topic.replication.factor=1
ちなみに
- ポートが
replication.properties
に定義されていない場合、このワーカーはそのデフォルトポート8083
で実行されますが、このデプロイではこれは望ましい構成です。 - レプリケーション係数プロパティ(すべて
1
に設定)が使用されているのは、これらのテストクラスターが小規模であるためです。本稼働環境で推奨される最小クラスターサイズは3
で、これらのプロパティではこれがデフォルトです。
Replicator の起動¶
必要な構成ファイルを作成したら、以下のコマンドを使用して Replicator 実行可能ファイルをその固有のターミナルで起動します(プロパティファイルは my-examples
に存在するものとします)。
./bin/replicator --cluster.id replicator --consumer.config my-examples/consumer.properties --producer.config my-examples/producer.properties --replication.config my-examples/replication.properties --whitelist 'test-topic'
Replicator 実行可能ファイルの一部のパラメーターはコマンドラインで渡すことができます。
--cluster.id
: この実行可能ファイルが参加する Replicator クラスターを特定するために使用する識別子です。同じcluster.id
を持つ複数の Replicator 実行可能ファイルのインスタンスは一緒に動作します。--consumer.config
: 送信元クラスター構成のパスです。--producer.config
: 送信先クラスター構成のパスです。--replication.config
: 接続に固有のものではない構成が含まれるファイルのパスです。これらの構成はコマンドライン引数でオーバーライドされます。--whitelist
: 送信元から送信先にレプリケートするトピックのリスト
コマンドラインオプションの完全なリストについては、「Replicator 実行可能ファイルのコマンドラインパラメーター」を参照してください。
送信元タスクの作成と Replicator が稼働中であることを示すレプリケートされるトピックの作成に関連する成功メッセージを確認し、トピックをコピーします。
クラスター間でのトピックのレプリケーションの検証¶
Replicator は、初期化を完了すると、送信元クラスターをチェックしてレプリケートする必要があるトピックがあるかどうかを確認します。
この例では、test-topic
を発見し、対応するトピックを送信先クラスターに作成します。これは、以下のコマンドで確認できます。
./bin/kafka-topics --describe --topic test-topic.replica --bootstrap-server localhost:9092
ここで test-topic.replica
の存在をチェックしているのは、送信先クラスターにレプリケートされたときに、構成に従って test-topic
の名前が変更されたためです。
出力は以下のようになります。
./bin/kafka-topics --describe --topic test-topic.replica --bootstrap-server localhost:9092
Topic: test-topic.replica PartitionCount: 1 ReplicationFactor: 1 Configs: message.timestamp.type=CreateTime,segment.bytes=1073741824
Topic: test-topic.replica Partition: 0 Leader: 0 Replicas: 0 Isr: 0 Offline: 0
また、送信先クラスターのトピックのリストと詳細を表示することもできます。レプリケートされたトピック(test-topic.replica
など)が表示されます。
./bin/kafka-topics --list --bootstrap-server localhost:9092
ちなみに
- 送信元クラスターのトピックのリストを表示するには、
kafka-topics --list
をlocalhost:9082
に対して実行します。 - 元のトピックの説明を表示するには、
kafka-topics --describe
を実行しますが、test-topic
を検索してターゲットlocalhost:9082
に対して実行します。
送信元クラスターでトピックを作成したら、Kafka プロデューサーを使用してそのトピックにデータを送信し、送信元クラスターの test-topic
に書き込むことができます。その後、送信先クラスターの test-topic.replica
から消費することでデータがレプリケートされたことを確認できます。たとえば、Kafka のコンソールプロデューサーを使用して一連の数字を送信するために、新しいターミナルウィンドウで以下のコマンドを実行します。
seq 10000 | ./bin/kafka-console-producer --topic test-topic --broker-list localhost:9082
コンソールコンシューマーを、それ自体のウィンドウで使用して、送信先クラスターへのデリバリーを確認できます。
./bin/kafka-console-consumer --from-beginning --topic test-topic.replica --bootstrap-server localhost:9092
コンシューマー出力に 1 ~ 10,000 の数字が表示された場合、これはマルチクラスターレプリケーションが正しく作成されたことを示します。
Ctrl + C キー
を押して("+" はキーを同時に押すことを意味します)、コンシューマー読み取りを終了し、コマンドプロンプトに戻ります。
Control Center を使用した Replicator のモニタリング¶
Control Center を使用して現在のデプロイで Replicator をモニタリングできます。
送信元クラスターと送信先クラスターの両方にある Replicator とブローカーを停止し、次に ZooKeeper インスタンスを停止します(この順番に従ってください)。
各コマンドウィンドウで
Ctrl + C キー
を押して("+" はキーを同時に押すことを意味します)プロセスを停止します。簡単に再起動できるようにウィンドウは開いたままにします。以下の手順に従って Replicator のモニタリング拡張機能をアクティブ化します。詳細については、「Replicator モニタリング拡張機能」を参照してください。
replicator-rest-extension-<version>.jar
の完全なパスを CLASSPATH に追加します。rest.extension.classes=io.confluent.connect.replicator.monitoring.ReplicatorMonitoringExtension
をmy-examples/replication.properties
に追加します。
送信先と送信元の両方の Kafka 構成ファイル(それぞれ
etc/kafka/server.properties
とmy-examples/server_origin.properties
)に以下の行を追加するか、これらの行のコメントを解除します。両方のファイルのconfluent.metrics.reporter.bootstrap.servers
の構成ではlocalhost
のポート9092
が指定されている必要があるため、これらのポート番号の一方または両方の変更が必要になる場合があります(confluent.metrics
でファイル内を検索するとこれらの行に移動します)。confluent.metrics.reporter.topic.replicas=1 metric.reporters=io.confluent.metrics.reporter.ConfluentMetricsReporter confluent.metrics.reporter.bootstrap.servers=localhost:9092
- 最初の行は、レプリケーション係数
1
を使用して、デプロイが開発モードであることを Control Center に示しています。 - 残りの 2 行により、Control Center でのメトリクスのレポートが有効になり、モニタリングデータを収集および格納する Confluent の内部トピックへのアクセスが提供されます。
ちなみに
- この例では、Metrics Reporter は、Confluent Control Center のブートストラップ先のクラスター、つまり送信先クラスターを参照している必要があります。これが適切に設定されていないと、送信元トピックに対するメトリクスは Control Center に表示されません。
etc/kafka/server.properties
とmy-examples/server_origin.properties
に同じ構成値confluent.metrics.reporter.bootstrap.servers=localhost:9092
が必要なのはこのためです。 - これらの手順をより複雑な実際の環境に適用する際は、別のアプローチが必要になる場合があります。たとえば、送信元と送信先用に複数の Control Center インスタンスがあり、それぞれが固有のクラスターをモニタリングするデプロイの場合、
confluent.metrics.reporter.bootstrap.servers
は必要に応じて送信元または送信先を参照している必要があります。詳細については、Control Center を使用する「マルチクラスター構成」、「Replicator のモニタリング」、および「Multi-Region Clusters」を参照してください。
- 最初の行は、レプリケーション係数
my-examples/producer.properties
を編集してプロデューサー用のモニタリングインターセプターを追加します。# Monitoring interceptor for producer interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor
my-examples/consumer.properties
を編集してコンシューマー用のモニタリングインターセプターを追加します。# Monitoring interceptor for consumer interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor
etc/confluent-control-center/control-center-dev.properties
を編集して、Control Center の送信元と送信先のブートストラップサーバーを指定する以下の 2 つの行を追加します。これは、複数のクラスターをモニタリングするために必要です(これらの行を追加する便利な場所は、ファイルの先頭付近、confluent.controlcenter.id
を指定する行の直後にある "Control Center Settings" です)。# multi-cluster monitoring confluent.controlcenter.kafka.origin.bootstrap.servers=localhost:9082 confluent.controlcenter.kafka.destination.bootstrap.servers=localhost:9092
ちなみに
Control Center では、Replicator モニタリングメトリクスを検索する場所を知るために、Connect REST エンドポイントのホストとポートが必要です。この例で使用する構成ファイル(
control-center-dev.properties
)では、これがデフォルトポートに構成されているため、そのままで正常に動作します。# A comma separated list of Connect host names confluent.controlcenter.connect.cluster=http://localhost:8083
本稼働環境対応の構成ファイル(
control-center-production.properties
)には、デフォルトのコメントアウトがあります。このファイルを代わりに使用する場合や、複数のコネクターがある場合、Connect クラスターを異なる構成にする場合は、Connect エンドポイントを指定する必要があります。そのためには、デフォルトをコメント解除するか、独自の Connect クラスターのホストを指定します。詳細については、 Control Center 構成リファレンス でconfluent.controlcenter.connect.<connect-cluster-name>.cluster
とconfluent.controlcenter.connect.cluster
(非推奨)の説明を参照してください。上記と同じコマンドを使用して送信先クラスターと送信元クラスターの ZooKeeper インスタンスを再起動します。以下に例を示します。
./bin/zookeeper-server-start etc/kafka/zookeeper.properties
./bin/zookeeper-server-start my-examples/zookeeper_origin.properties
上記と同じコマンドを使用して送信先クラスターと送信元クラスターブローカーを再起動します。以下に例を示します。
./bin/kafka-server-start etc/kafka/server.properties
./bin/kafka-server-start my-examples/server_origin.properties
上記と同じコマンドを使用して Replicator と Connect ワーカーを再起動します。以下に例を示します。
./bin/replicator --cluster.id replicator --consumer.config my-examples/consumer.properties --producer.config my-examples/producer.properties --replication.config my-examples/replication.properties --whitelist 'test-topic'
以下のコマンドを使用して Control Center を起動します。
./bin/control-center-start etc/confluent-control-center/control-center-dev.properties
control-center-dev.properties
にポートが定義されていない場合、Control Center はデフォルトでポート9021
で実行されます(「Control Center ユーザーガイド」を参照してください)。これは、このデプロイの望ましい構成です。ウェブブラウザーで http://localhost:9021/ にアクセスして Control Center を開きます。
クラスターは、構成に基づいて自動生成された名前を使って Control Center 上にレンダリングされます。
(省略可) Control Center で、ユースケースに合わせて、Control Center ユーザーガイドの「Replicator」の「送信元および送信先のクラスター」で説明されているようにクラスター名を編集します。
Control Center で、送信先クラスターを選択し、ナビゲーションパネルで Replicators をクリックします。次に、Control Center を使用してレプリケーションのパフォーマンスをモニタリングし、送信元とレプリケートされたトピックの詳細を表示します。
Control Center で、元のトピックとレプリケートされたトピックの両方にメッセージが作成されたことを確認するために、
kafka-consumer-perf-test
をそれ自体のコマンドウィンドウで実行してtest-topic
にテストデータを自動生成します。kafka-producer-perf-test \ --producer-props bootstrap.servers=localhost:9082 \ --topic test-topic \ --record-size 1000 \ --throughput 1000 \ --num-records 3600000
このコマンドでは、送信したメッセージに関するステータスが次のように出力されます。
4999 records sent, 999.8 records/sec (0.95 MB/sec), 1.1 ms avg latency, 240.0 ms max latency. 5003 records sent, 1000.2 records/sec (0.95 MB/sec), 0.5 ms avg latency, 4.0 ms max latency. 5003 records sent, 1000.2 records/sec (0.95 MB/sec), 0.6 ms avg latency, 5.0 ms max latency. 5001 records sent, 1000.2 records/sec (0.95 MB/sec), 0.3 ms avg latency, 3.0 ms max latency. 5001 records sent, 1000.0 records/sec (0.95 MB/sec), 0.3 ms avg latency, 4.0 ms max latency. 5000 records sent, 1000.0 records/sec (0.95 MB/sec), 0.8 ms avg latency, 24.0 ms max latency. 5001 records sent, 1000.2 records/sec (0.95 MB/sec), 0.6 ms avg latency, 3.0 ms max latency. ...
以前のように、kafka-console-consumer を使用して、レプリカトピックがメッセージを受信していることを確認し、これらのメッセージをコマンドラインから消費することができます。
./bin/kafka-console-consumer --from-beginning --topic test-topic.replica --bootstrap-server localhost:9092
これは、Control Center でも確認できます。送信元クラスターの
test-topic
に移動して、元のトピックにあるメッセージを表示し、送信先クラスターのtest-topic.replica
に移動して、レプリケートされたトピックにあるメッセージを表示します。Control Center での Replicator のモニタリングの詳細については、 Control Center ユーザーガイドの「Replicator」 を参照してください。
チュートリアルでの実験を終了したら、必ず、次のようにクリーンアップを実行してください。
- それぞれのコマンドウィンドウで
Ctrl + C キー
("+" はキーを同時に押すことを意味します)を押して、プロデューサーとコンシューマーをすべて停止します。 - 各コマンドウィンドウで
Ctrl + C キー
を押して("+" はキーを同時に押すことを意味します)、各サービスをその起動時と反対の順序で停止します(Control Center、Replicator、Kafka ブローカー、ZooKeeper の順に停止します)。
- それぞれのコマンドウィンドウで
トラブルシューティング¶
各要素の起動と実行について、または Control Center での Replicator のモニタリングについて問題が発生した場合は、以下をチェックしてください。
- すべてのプロパティファイルの構成が正しいことと、「Kafka ブローカーと Confluent Platform コンポーネントのポート」に示されているとおり、送信元と送信先のポート番号が一致していることを確認します。
- Control Center でのモニタリングの場合、「Control Center を使用した Replicator のモニタリング」の手順のとおり、構成がモニタリング要件と一致していることを確認します。この例で示した "開発" バージョンの代わりに本稼働環境対応の Control Center 構成ファイルを使用している場合、「Control Center を使用した Replicator のモニタリング」のヒントに従って Connect エンドポイントを指定したことを確認します。
- モニタリング拡張機能が「Replicator モニタリング拡張機能」に従ってインストールされており、CLASSPATH(特に Kafka ブローカーの起動元のシェル)に存在することを確認します。これをチェックするには、オープンコマンドウィンドウで
echo $CLASSPATH
を実行します。 systemctl
コマンドを使用してモニタリングサービスを開始する場合、「systemctl コマンド構成」の手順に従う必要があります。環境変数を適切に構成しないと、Connect は開始できません。- チュートリアルを再試行するには、以下のようにします。
- 各コマンドウィンドウで
Ctrl + C キー
を押して("+" はキーを同時に押すことを意味します)、各サービスをその起動時と反対の順序で停止します(Control Center、Replicator、Kafka ブローカー、ZooKeeper の順に停止します)。 - 新しく実行されるクラスターおよびトピックと競合する可能性があるため、
/tmp
内の古いログとデータファイルを削除します。たとえば、/tmp/confluent/control-center/
、/tmp/zookeeper
、/tmp/zookeeper_origin
、/tmp/kafka-logs
、/tmp/kafka-logs-origin
、/tmp/control-center-logs
ファイルを削除します。 - これで、Confluent Platform の現在のインストールでもう一度開始するか、Confluent Platform を 再インストール してプロセス全体を最初から試行できます。
- 各コマンドウィンドウで