チュートリアル: クラスターの境界を越えたデータのレプリケーション

このガイドでは、2 つの Apache Kafka® クラスターを起動し、続いて Replicator プロセスを開始してそれらのクラスター間でデータをレプリケートする方法について説明します。チュートリアルの目的上、同じマシンで両方のクラスターを実行します。これを行うために、各クラスターが固有のポートとデータディレクトリを持っていることを、あらゆる手段を使って確認します。各クラスターが固有のサーバーを持っている通常の環境で実行している場合、ZooKeeper とブローカーの構成にこれらの変更を加える必要はありません。

../../_images/replicator-quickstart-configuration.ja.png

Replicator クイックスタート構成

ちなみに

インストールの前提条件とコマンド例について

  • 以下の手順では、Confluent Platform、Confluent CLI、Java 1.8 または 1.11(Confluent Platform に必要)がローカルにインストールされていることを前提としています。Confluent Platform を初めて使用する場合は、「Confluent Platform のクイックスタート(ローカルインストール)」を終えてからこのチュートリアルに進んでください。

  • サーバーと Replicator を起動するためのコマンドでは、Confluent Platform のホームディレクトリから実行するか、Confluent Platform が CLASSPATH に存在することを前提としています。また、特に明記しない限り、以下の例は、インストールされている Confluent Platform のデフォルトの場所にプロパティファイルが存在することを前提としています。これにより、ほとんどの場合、サンプルコマンドを簡単にターミナルに直接コピーして貼り付けることができます。たとえば、以下のコマンドを実行するとします。

    ./bin/zookeeper-server-start etc/kafka/zookeeper.properties
    

    これは以下と同じになります。

    $CONFLUENT_HOME/bin/zookeeper-server-start $CONFLUENT_HOME/etc/kafka/zookeeper.properties
    

Kafka ブローカーと Confluent Platform コンポーネントのポート

このチュートリアルの例では、以下のポート構成が定義されています。

送信元 送信先
Kafka ブローカー 9082 9092
ZooKeeper 2171 2181
Connect Replicator ワーカー   8083(送信元から送信先にトピックをコピー)
Control Center   9021

送信先クラスターの起動

  1. 最初に、ZooKeeper サーバーを起動します。このガイドでは、サービスが localhost で実行されることを前提としています。

    • 以下のコマンドを固有のターミナルで実行して、ZooKeeper を起動します。

      ./bin/zookeeper-server-start etc/kafka/zookeeper.properties
      
  2. 次に、単一ノード Kafka クラスターとして動作する Kafka ブローカーを起動します。

    • 以下のコマンドを固有のターミナルで実行して Kafka を起動します。

      ./bin/kafka-server-start etc/kafka/server.properties
      

これらのサービスを稼働させる方法の詳細については、Confluent Platform の クイックスタート の説明を参照してください。

注釈

送信先クラスターは、送信元クラスターと同じ(またはそれ以降の)バージョンの Confluent Platform を実行している必要があります。Replicator は、送信先クラスターにリンクされている Connect クラスター内で実行され、送信元クラスターからメッセージを読み取ります。そのため、送信先が古いバージョンを実行している場合、Replicator はメッセージフォーマットを解釈できなくなります。

送信元クラスターの起動

送信先クラスターはデフォルトポートで実行するように構成しましたが、競合を回避するために、送信元クラスターは別のポートで実行する必要があります。ポートマッピング で示したように、送信元クラスター内の Kafka ブローカーはポート 9082 上で構成されており、ZooKeeper は 2171 上で構成されています。構成ファイルを一時的な場所にコピーし、送信先クラスターと競合しないようにそれらのポートを変更します。

  1. $CONFLUENT_HOME の下にチュートリアルのプロパティファイル用の "クイックスタート" ディレクトリ(例: my-examples/)を作成します。

    ちなみに

    • $CONFLUENT_HOME<path-to-confluent> を表します。Linux システムでは、この表記をシェル環境変数の設定に使用できます。
    • このセクションの例では、$CONFLUENT_HOME からコマンドを実行し、my-examples/ を使用してプロパティファイルを保存することを前提としています。このモデルに従うと、コードブロックに対してコピーアンドペーストを使用できます。
  2. 構成ファイルを my-examples (または選択したディレクトリ)にコピーします。

    cp etc/kafka/zookeeper.properties my-examples/zookeeper_origin.properties
    
    cp etc/kafka/server.properties my-examples/server_origin.properties
    
  3. ポート番号をアップデートします。

    sed -i '' -e "s/2181/2171/g" my-examples/zookeeper_origin.properties
    sed -i '' -e "s/9092/9082/g" my-examples/server_origin.properties
    sed -i '' -e "s/2181/2171/g" my-examples/server_origin.properties
    sed -i '' -e "s/#listeners/listeners/g" my-examples/server_origin.properties
    sed -i '' -e "s/8090/8091/g" my-examples/server_origin.properties
    sed -i '' -e "s/#confluent.metadata.server.listeners/confluent.metadata.server.listeners/g" my-examples/server_origin.properties
    
  4. 送信元のブローカー ID をアップデートします。

    sed -i '' -e "s/broker.id=0/broker.id=1/g" my-examples/server_origin.properties
    

    ちなみに

    ブローカーは 2 つの異なるクラスターに存在するため、これは常に必要というわけではありませんが、コマンドラインから Kafka でこれらを管理する場合、一意のブローカー ID を設定しておくと便利です。

  5. データディレクトリをアップデートします。

    sed -i '' -e "s/zookeeper/zookeeper_origin/g" my-examples/zookeeper_origin.properties
    sed -i '' -e "s/kafka-logs/kafka-logs-origin/g" my-examples/server_origin.properties
    
  6. ここで、送信元クラスターを起動できます。

    • 以下のコマンドを固有のターミナルで実行して ZooKeeper を起動します。

      ./bin/zookeeper-server-start my-examples/zookeeper_origin.properties
      
    • 以下のコマンドを固有のターミナルで実行して Kafka を起動します。

      ./bin/kafka-server-start my-examples/server_origin.properties
      

トピックの作成

新しいコマンドウィンドウを開き、Kafka コマンドを実行します。

以下のコマンドを使用して、送信元クラスターで "test-topic" という名前のトピックを作成します。

kafka-topics --create --topic test-topic --replication-factor 1 --partitions 1 --bootstrap-server localhost:9082

ちなみに

環境によっては、Kafka コマンドで .sh 拡張子を使用する必要が生じる場合があります(例: kafka-topics.sh)。

トピックが正常に作成されたことを以下のように確認します。

kafka-topics --list --bootstrap-server localhost:9082

出力は以下のようになります(_confluent トピックは内部トピックです)。

__confluent.support.metrics
_confluent-command
test-topic

Replicator を構成して実行する場合、前述したように定義した構成を使用して、この "test-topic" が送信先クラスター(ポート 2181)にレプリケートされます。この例では便宜上、テストトピックは 1 つのパーティションで作成しています。Replicator では、任意の数のトピックとパーティションに設定できます。

Replicator の構成および実行

Confluent Replicator は、実行可能ファイルとして実行するか、Kafka Connect フレームワーク内のコネクターとして実行できます。このクイックスタートでは、Replicator を実行可能ファイルとして起動します。

コンシューマー、プロデューサー、および Replicator 構成ファイルの作成

Replicator 実行可能スクリプトでは、以下の 3 つの構成ファイルを使用します。

  • 送信元クラスター用の構成
  • 送信先クラスター用の構成
  • Replicator の構成

ちなみに

これらの構成ファイルは、選択した任意のディレクトリに配置できますが、Confluent Platform に付属のデフォルトプロパティファイルとの競合を回避するために、以下の手順では $CONFLUENT_HOME/my-examples/ のパスに配置することを前提とします。(producer.propertiesconsumer.properties のデフォルト構成は etc/kafka/ に存在し、replicator.properties のデフォルト構成は etc/kafka-connect-replicator/ に存在します)。

以下のファイルを $CONFLUENT_HOME/my-examples/ に作成します。

  1. consumer.properties という名前の新しいファイルで送信元クラスターを構成します。このファイルを編集し、送信元 クラスターのブローカーのアドレスが含まれていることを確認します。デフォルトのブローカーリストは、先ほど起動した送信元クラスターと一致します。

    # Origin cluster connection configuration
    bootstrap.servers=localhost:9082
    
  2. producer.properties という名前の新しいファイルで送信先クラスターを構成します。このファイルを編集し、送信先 クラスターのブローカーのアドレスが含まれていることを確認します。デフォルトのブローカーリストは、先ほど起動した送信先クラスターと一致します。

    # Destination cluster connection configuration
    bootstrap.servers=localhost:9092
    
  3. Connect ワーカー用の replication.properties という名前の新しいファイルで Replicator 構成を定義します。このクイックスタートでは topic.rename.format の構成を示しますが、接続に関連しない任意の Confluent Replicator 構成プロパティ をこのファイルに指定できます。

    # Replication configuration
    topic.rename.format=${topic}.replica
    replication.factor=1
    config.storage.replication.factor=1
    offset.storage.replication.factor=1
    status.storage.replication.factor=1
    confluent.topic.replication.factor=1
    

ちなみに

  • ポートが replication.properties に定義されていない場合、このワーカーはそのデフォルトポート 8083 で実行されますが、このデプロイではこれは望ましい構成です。
  • レプリケーション係数プロパティ(すべて 1 に設定)が使用されているのは、これらのテストクラスターが小規模であるためです。本稼働環境で推奨される最小クラスターサイズは 3 で、これらのプロパティではこれがデフォルトです。

Replicator の起動

必要な構成ファイルを作成したら、以下のコマンドを使用して Replicator 実行可能ファイルをその固有のターミナルで起動します(プロパティファイルは my-examples に存在するものとします)。

./bin/replicator --cluster.id replicator --consumer.config my-examples/consumer.properties --producer.config my-examples/producer.properties --replication.config my-examples/replication.properties --whitelist 'test-topic'

Replicator 実行可能ファイルの一部のパラメーターはコマンドラインで渡すことができます。

  • --cluster.id: この実行可能ファイルが参加する Replicator クラスターを特定するために使用する識別子です。同じ cluster.id を持つ複数の Replicator 実行可能ファイルのインスタンスは一緒に動作します。
  • --consumer.config: 送信元クラスター構成のパスです。
  • --producer.config: 送信先クラスター構成のパスです。
  • --replication.config: 接続に固有のものではない構成が含まれるファイルのパスです。これらの構成はコマンドライン引数でオーバーライドされます。
  • --whitelist: 送信元から送信先にレプリケートするトピックのリスト

コマンドラインオプションの完全なリストについては、「Replicator 実行可能ファイルのコマンドラインパラメーター」を参照してください。

送信元タスクの作成と Replicator が稼働中であることを示すレプリケートされるトピックの作成に関連する成功メッセージを確認し、トピックをコピーします。

クラスター間でのトピックのレプリケーションの検証

Replicator は、初期化を完了すると、送信元クラスターをチェックしてレプリケートする必要があるトピックがあるかどうかを確認します。

この例では、test-topic を発見し、対応するトピックを送信先クラスターに作成します。これは、以下のコマンドで確認できます。

./bin/kafka-topics --describe --topic test-topic.replica --bootstrap-server localhost:9092

ここで test-topic.replica の存在をチェックしているのは、送信先クラスターにレプリケートされたときに、構成に従って test-topic の名前が変更されたためです。

出力は以下のようになります。

./bin/kafka-topics --describe --topic test-topic.replica --bootstrap-server localhost:9092
Topic: test-topic.replica    PartitionCount: 1       ReplicationFactor: 1    Configs: message.timestamp.type=CreateTime,segment.bytes=1073741824
      Topic: test-topic.replica      Partition: 0    Leader: 0       Replicas: 0     Isr: 0  Offline: 0

また、送信先クラスターのトピックのリストと詳細を表示することもできます。レプリケートされたトピック(test-topic.replica など)が表示されます。

./bin/kafka-topics --list --bootstrap-server localhost:9092

ちなみに

  • 送信元クラスターのトピックのリストを表示するには、kafka-topics --listlocalhost:9082 に対して実行します。
  • 元のトピックの説明を表示するには、kafka-topics --describe を実行しますが、test-topic を検索してターゲット localhost:9082 に対して実行します。

送信元クラスターでトピックを作成したら、Kafka プロデューサーを使用してそのトピックにデータを送信し、送信元クラスターの test-topic に書き込むことができます。その後、送信先クラスターの test-topic.replica から消費することでデータがレプリケートされたことを確認できます。たとえば、Kafka のコンソールプロデューサーを使用して一連の数字を送信するために、新しいターミナルウィンドウで以下のコマンドを実行します。

seq 10000 | ./bin/kafka-console-producer --topic test-topic --broker-list localhost:9082

コンソールコンシューマーを、それ自体のウィンドウで使用して、送信先クラスターへのデリバリーを確認できます。

./bin/kafka-console-consumer --from-beginning --topic test-topic.replica --bootstrap-server localhost:9092

コンシューマー出力に 1 ~ 10,000 の数字が表示された場合、これはマルチクラスターレプリケーションが正しく作成されたことを示します。

Ctrl + C キー を押して("+" はキーを同時に押すことを意味します)、コンシューマー読み取りを終了し、コマンドプロンプトに戻ります。

Control Center を使用した Replicator のモニタリング

Control Center を使用して現在のデプロイで Replicator をモニタリングできます。

  1. 送信元クラスターと送信先クラスターの両方にある Replicator とブローカーを停止し、次に ZooKeeper インスタンスを停止します(この順番に従ってください)。

    各コマンドウィンドウで Ctrl + C キー を押して("+" はキーを同時に押すことを意味します)プロセスを停止します。簡単に再起動できるようにウィンドウは開いたままにします。

  2. 以下の手順に従って Replicator のモニタリング拡張機能をアクティブ化します。詳細については、「Replicator モニタリング拡張機能」を参照してください。

    • replicator-rest-extension-<version>.jar の完全なパスを CLASSPATH に追加します。
    • rest.extension.classes=io.confluent.connect.replicator.monitoring.ReplicatorMonitoringExtensionmy-examples/replication.properties に追加します。
  3. 送信先と送信元の両方の Kafka 構成ファイル(それぞれ etc/kafka/server.propertiesmy-examples/server_origin.properties)に以下の行を追加するか、これらの行のコメントを解除します。両方のファイルの confluent.metrics.reporter.bootstrap.servers の構成では localhost のポート 9092 が指定されている必要があるため、これらのポート番号の一方または両方の変更が必要になる場合があります(confluent.metrics でファイル内を検索するとこれらの行に移動します)。

    confluent.metrics.reporter.topic.replicas=1
    metric.reporters=io.confluent.metrics.reporter.ConfluentMetricsReporter
    confluent.metrics.reporter.bootstrap.servers=localhost:9092
    
    • 最初の行は、レプリケーション係数 1 を使用して、デプロイが開発モードであることを Control Center に示しています。
    • 残りの 2 行により、Control Center でのメトリクスのレポートが有効になり、モニタリングデータを収集および格納する Confluent の内部トピックへのアクセスが提供されます。

    ちなみに

    • この例では、Metrics Reporter は、Confluent Control Center のブートストラップ先のクラスター、つまり送信先クラスターを参照している必要があります。これが適切に設定されていないと、送信元トピックに対するメトリクスは Control Center に表示されません。etc/kafka/server.propertiesmy-examples/server_origin.properties に同じ構成値 confluent.metrics.reporter.bootstrap.servers=localhost:9092 が必要なのはこのためです。
    • これらの手順をより複雑な実際の環境に適用する際は、別のアプローチが必要になる場合があります。たとえば、送信元と送信先用に複数の Control Center インスタンスがあり、それぞれが固有のクラスターをモニタリングするデプロイの場合、confluent.metrics.reporter.bootstrap.servers は必要に応じて送信元または送信先を参照している必要があります。詳細については、Control Center を使用する「マルチクラスター構成」、「Replicator のモニタリング」、および「Multi-Region Clusters」を参照してください。
  4. my-examples/producer.properties を編集してプロデューサー用のモニタリングインターセプターを追加します。

    # Monitoring interceptor for producer
    interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor
    
  5. my-examples/consumer.properties を編集してコンシューマー用のモニタリングインターセプターを追加します。

    # Monitoring interceptor for consumer
    interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor
    
  6. etc/confluent-control-center/control-center-dev.properties を編集して、Control Center の送信元と送信先のブートストラップサーバーを指定する以下の 2 つの行を追加します。これは、複数のクラスターをモニタリングするために必要です(これらの行を追加する便利な場所は、ファイルの先頭付近、confluent.controlcenter.id を指定する行の直後にある "Control Center Settings" です)。

    # multi-cluster monitoring
    confluent.controlcenter.kafka.origin.bootstrap.servers=localhost:9082
    confluent.controlcenter.kafka.destination.bootstrap.servers=localhost:9092
    

    ちなみに

    Control Center では、Replicator モニタリングメトリクスを検索する場所を知るために、Connect REST エンドポイントのホストとポートが必要です。この例で使用する構成ファイル(control-center-dev.properties )では、これがデフォルトポートに構成されているため、そのままで正常に動作します。

    # A comma separated list of Connect host names
    confluent.controlcenter.connect.cluster=http://localhost:8083
    

    本稼働環境対応の構成ファイル(control-center-production.properties )には、デフォルトのコメントアウトがあります。このファイルを代わりに使用する場合や、複数のコネクターがある場合、Connect クラスターを異なる構成にする場合は、Connect エンドポイントを指定する必要があります。そのためには、デフォルトをコメント解除するか、独自の Connect クラスターのホストを指定します。詳細については、 Control Center 構成リファレンスconfluent.controlcenter.connect.<connect-cluster-name>.clusterconfluent.controlcenter.connect.cluster (非推奨)の説明を参照してください。

  7. 上記と同じコマンドを使用して送信先クラスターと送信元クラスターの ZooKeeper インスタンスを再起動します。以下に例を示します。

    ./bin/zookeeper-server-start etc/kafka/zookeeper.properties
    
    ./bin/zookeeper-server-start my-examples/zookeeper_origin.properties
    
  8. 上記と同じコマンドを使用して送信先クラスターと送信元クラスターブローカーを再起動します。以下に例を示します。

    ./bin/kafka-server-start etc/kafka/server.properties
    
    ./bin/kafka-server-start my-examples/server_origin.properties
    
  9. 上記と同じコマンドを使用して Replicator と Connect ワーカーを再起動します。以下に例を示します。

    ./bin/replicator --cluster.id replicator --consumer.config my-examples/consumer.properties --producer.config my-examples/producer.properties --replication.config my-examples/replication.properties --whitelist 'test-topic'
    
  10. 以下のコマンドを使用して Control Center を起動します。

    ./bin/control-center-start etc/confluent-control-center/control-center-dev.properties
    

    control-center-dev.properties にポートが定義されていない場合、Control Center はデフォルトでポート 9021 で実行されます(「Control Center ユーザーガイド」を参照してください)。これは、このデプロイの望ましい構成です。

  11. ウェブブラウザーで http://localhost:9021/ にアクセスして Control Center を開きます。

    クラスターは、構成に基づいて自動生成された名前を使って Control Center 上にレンダリングされます。

    ../../_images/c3-replicators-multi-cluster.png
  12. (省略可) Control Center で、ユースケースに合わせて、Control Center ユーザーガイドの「Replicator」の「送信元および送信先のクラスター」で説明されているようにクラスター名を編集します。

  13. Control Center で、送信先クラスターを選択し、ナビゲーションパネルで Replicators をクリックします。次に、Control Center を使用してレプリケーションのパフォーマンスをモニタリングし、送信元とレプリケートされたトピックの詳細を表示します。

    ../../_images/c3-replicators-all.png

    Control Center で、元のトピックとレプリケートされたトピックの両方にメッセージが作成されたことを確認するために、kafka-consumer-perf-test をそれ自体のコマンドウィンドウで実行して test-topic にテストデータを自動生成します。

    kafka-producer-perf-test \
       --producer-props bootstrap.servers=localhost:9082 \
       --topic test-topic \
       --record-size 1000 \
       --throughput 1000 \
       --num-records 3600000
    

    このコマンドでは、送信したメッセージに関するステータスが次のように出力されます。

    4999 records sent, 999.8 records/sec (0.95 MB/sec), 1.1 ms avg latency, 240.0 ms max latency.
    5003 records sent, 1000.2 records/sec (0.95 MB/sec), 0.5 ms avg latency, 4.0 ms max latency.
    5003 records sent, 1000.2 records/sec (0.95 MB/sec), 0.6 ms avg latency, 5.0 ms max latency.
    5001 records sent, 1000.2 records/sec (0.95 MB/sec), 0.3 ms avg latency, 3.0 ms max latency.
    5001 records sent, 1000.0 records/sec (0.95 MB/sec), 0.3 ms avg latency, 4.0 ms max latency.
    5000 records sent, 1000.0 records/sec (0.95 MB/sec), 0.8 ms avg latency, 24.0 ms max latency.
    5001 records sent, 1000.2 records/sec (0.95 MB/sec), 0.6 ms avg latency, 3.0 ms max latency.
    ...
    

    以前のように、kafka-console-consumer を使用して、レプリカトピックがメッセージを受信していることを確認し、これらのメッセージをコマンドラインから消費することができます。

    ./bin/kafka-console-consumer --from-beginning --topic test-topic.replica --bootstrap-server localhost:9092
    

    これは、Control Center でも確認できます。送信元クラスターの test-topic に移動して、元のトピックにあるメッセージを表示し、送信先クラスターの test-topic.replica に移動して、レプリケートされたトピックにあるメッセージを表示します。

    ../../_images/c3-replicator-topic-drilldown-messages.png
  14. Control Center での Replicator のモニタリングの詳細については、 Control Center ユーザーガイドの「Replicator」 を参照してください。

  15. チュートリアルでの実験を終了したら、必ず、次のようにクリーンアップを実行してください。

    • それぞれのコマンドウィンドウで Ctrl + C キー ("+" はキーを同時に押すことを意味します)を押して、プロデューサーとコンシューマーをすべて停止します。
    • 各コマンドウィンドウで Ctrl + C キー を押して("+" はキーを同時に押すことを意味します)、各サービスをその起動時と反対の順序で停止します(Control Center、Replicator、Kafka ブローカー、ZooKeeper の順に停止します)。

トラブルシューティング

各要素の起動と実行について、または Control Center での Replicator のモニタリングについて問題が発生した場合は、以下をチェックしてください。

  • すべてのプロパティファイルの構成が正しいことと、「Kafka ブローカーと Confluent Platform コンポーネントのポート」に示されているとおり、送信元と送信先のポート番号が一致していることを確認します。
  • Control Center でのモニタリングの場合、「Control Center を使用した Replicator のモニタリング」の手順のとおり、構成がモニタリング要件と一致していることを確認します。この例で示した "開発" バージョンの代わりに本稼働環境対応の Control Center 構成ファイルを使用している場合、「Control Center を使用した Replicator のモニタリング」のヒントに従って Connect エンドポイントを指定したことを確認します。
  • モニタリング拡張機能が「Replicator モニタリング拡張機能」に従ってインストールされており、CLASSPATH(特に Kafka ブローカーの起動元のシェル)に存在することを確認します。これをチェックするには、オープンコマンドウィンドウで echo $CLASSPATH を実行します。
  • systemctl コマンドを使用してモニタリングサービスを開始する場合、「systemctl コマンド構成」の手順に従う必要があります。環境変数を適切に構成しないと、Connect は開始できません。
  • チュートリアルを再試行するには、以下のようにします。
    1. 各コマンドウィンドウで Ctrl + C キー を押して("+" はキーを同時に押すことを意味します)、各サービスをその起動時と反対の順序で停止します(Control Center、Replicator、Kafka ブローカー、ZooKeeper の順に停止します)。
    2. 新しく実行されるクラスターおよびトピックと競合する可能性があるため、/tmp 内の古いログとデータファイルを削除します。たとえば、/tmp/confluent/control-center//tmp/zookeeper/tmp/zookeeper_origin/tmp/kafka-logs/tmp/kafka-logs-origin/tmp/control-center-logs ファイルを削除します。
    3. これで、Confluent Platform の現在のインストールでもう一度開始するか、Confluent Platform を 再インストール してプロセス全体を最初から試行できます。