Docker での Replicator デモ

このデモでは、データセンター間でデータの双方向コピーを行う Confluent Replicator の 2 つのインスタンスを含む、アクティブ/アクティブのマルチデータセンター設計をデプロイします。

この Docker 環境は、本稼働環境ではなくデモのみを目的としたものです。

デモのセットアップ

前提条件:
  • Docker
    • Docker バージョン 1.11 またはそれ以降が インストールされ動作している
    • Docker Compose が インストール済みである 。Docker Compose は、Docker for Mac ではデフォルトでインストールされます。
    • Docker メモリーに最小でも 6 GB が割り当てられている。Docker Desktop for Mac を使用しているとき、Docker メモリーの割り当てはデフォルトで 2 GB です。Docker のデフォルトの割り当てを 6 GB に変更できます。PreferencesResourcesAdvanced の順に移動します。
  • インターネット接続
  • Confluent Platform で現在サポートされる オペレーティングシステム
  • Docker でのネットワークと Kafka
    • 内部コンポーネントと外部コンポーネントが Docker ネットワークに通信できるように、ホストとポートを構成します。詳細については、こちらの記事 を参照してください。

設計

これは、データセンター dc1dc2 間でデータの双方向コピーを行う Confluent Replicator の 2 つのインスタンスを含む、アクティブ/アクティブのマルチデータセンター設計です。

Confluent Control Center は、クラスターを管理およびモニタリングするために実行されています。

画像

Confluent Platform コンポーネントのポート

dc1 dc2
ブローカー 9091 9092
ZooKeeper 2181 2182
Schema Registry 8081(プライマリ) 8082(セカンダリ)
Connect 8381(dc2→dc1 にコピー) 8382(dc1→dc2 にコピー)
Control Center   9021

データ生成とトピック名

データ生成 Docker コンテナー 送信元 DC 送信元トピック Replicator インスタンス 送信先 DC 送信先トピック
datagen-dc1- topic1 dc1 topic1、_schemas replicator-dc1 -to-dc2-topic1 dc2 topic1、_schemas
datagen-dc1- topic2 dc1 topic2 replicator-dc1 -to-dc2-topic2 dc2 topic2.replica
datagen-dc2- topic1 dc2 topic1 replicator-dc2 -to-dc1-topic1 dc1 topic1

Confluent Replicator (バージョン 5.0.1 以降)は、メッセージヘッダー内の来歴情報を使用して dc1 topic1 と dc2 topic1 の間のデータ循環反復を回避します。

デモの実行

環境

このデモは、以下のソフトウェアで動作が検証されています。

  • Docker バージョン 17.06.1-ce
  • Docker Compose ファイル形式 2.1 の Docker Compose バージョン 1.14.0

サービスの起動

  1. confluentinc/examples GitHub リポジトリのクローンを作成し、6.2.4-post ブランチをチェックアウトします。

    git clone https://github.com/confluentinc/examples
    cd examples
    git checkout 6.2.4-post
    
  2. multi-datacenter サンプルディレクトリに移動します。

    cd multi-datacenter/
    
  3. 各データセンターのすべてのサービスとトピックのサンプルメッセージを開始します。

    ./start.sh
    
  4. docker-compose.yml ファイル内の Kafka ブローカー、Schema Registry、および Connect ワーカーの完全な構成を表示します。

    cat docker-compose.yml
    

マルチデータセンターセットアップの検証

Replicator

このマルチデータセンター環境には、2 つの Apache Kafka® クラスター、dc1dc2 があります。これらのクラスターは Confluent Control Center によって管理されています。Confluent Replicator は、dc1dc2 間の双方向でデータをコピーしますが、その仕組みをわかりやすく説明するために、以下のセクションでは dc1 から dc2 へのレプリケーションのみについて検討します。

  1. デモを起動(前のセクション を参照)したら、Chrome ブラウザーで Control Center UI(http://localhost:9021)に移動して、2 つの Kafka クラスター、dc1dc2 が存在することを確認します。

    画像
  2. このデモでは、2 つの Kafka Connect クラスター、connect-dc1connect-dc2 を使用します。Replicator は、通常、送信先 Connect クラスターで実行されるソースコネクターです。そのため、connect-dc1dc2 から dc1 にコピーする Replicator を実行し、connect-dc2dc1 から dc2 にコピーする Replicator を実行します。バージョン 5.2 以降、Control Center は複数の Kafka Connect クラスターを管理できますが、このデモでは、connect-dc2 で実行され、dc1 から dc2 にコピーする Replicator インスタンスのみを取り上げます。

  3. dc1 から dc2 にコピーする Replicator の場合: Connect メニューに移動して、Kafka Connect(connect-dc2 に構成されています)が Replicator の 2 つのインスタンス、replicator-dc1-to-dc2-topic1replicator-dc1-to-dc2-topic2 を実行していることを確認します。

    画像

トピックの検査

  1. 各データセンターについて、さまざまなトピックのデータ、来歴情報、タイムスタンプ情報、およびクラスター ID を検査します。

    ./read-topics.sh
    
  2. 以下のように出力されることを確認します。

    -----dc1-----
    
    list topics:
    __consumer_offsets
    __consumer_timestamps
    _confluent-command
    _confluent-license
    _confluent-telemetry-metrics
    _confluent_balancer_api_state
    _confluent_balancer_broker_samples
    _confluent_balancer_partition_samples
    _schemas
    connect-configs-dc1
    connect-offsets-dc1
    connect-status-dc1
    topic1
    topic2
    
    topic1:
    {"userid":{"string":"User_7"},"dc":{"string":"dc1"}}
    {"userid":{"string":"User_7"},"dc":{"string":"dc2"}}
    {"userid":{"string":"User_9"},"dc":{"string":"dc2"}}
    {"userid":{"string":"User_2"},"dc":{"string":"dc1"}}
    {"userid":{"string":"User_5"},"dc":{"string":"dc2"}}
    {"userid":{"string":"User_1"},"dc":{"string":"dc1"}}
    {"userid":{"string":"User_3"},"dc":{"string":"dc2"}}
    {"userid":{"string":"User_7"},"dc":{"string":"dc1"}}
    {"userid":{"string":"User_1"},"dc":{"string":"dc2"}}
    {"userid":{"string":"User_8"},"dc":{"string":"dc1"}}
    Processed a total of 10 messages
    
    topic2:
    {"registertime":{"long":1513471082347},"userid":{"string":"User_2"},"regionid":{"string":"Region_7"},"gender":{"string":"OTHER"}}
    {"registertime":{"long":1496006007512},"userid":{"string":"User_5"},"regionid":{"string":"Region_6"},"gender":{"string":"OTHER"}}
    {"registertime":{"long":1494319368203},"userid":{"string":"User_7"},"regionid":{"string":"Region_2"},"gender":{"string":"FEMALE"}}
    {"registertime":{"long":1493150028737},"userid":{"string":"User_1"},"regionid":{"string":"Region_5"},"gender":{"string":"FEMALE"}}
    {"registertime":{"long":1517151907191},"userid":{"string":"User_5"},"regionid":{"string":"Region_3"},"gender":{"string":"OTHER"}}
    {"registertime":{"long":1489672305692},"userid":{"string":"User_2"},"regionid":{"string":"Region_6"},"gender":{"string":"OTHER"}}
    {"registertime":{"long":1511471447951},"userid":{"string":"User_2"},"regionid":{"string":"Region_5"},"gender":{"string":"MALE"}}
    {"registertime":{"long":1488018372941},"userid":{"string":"User_7"},"regionid":{"string":"Region_2"},"gender":{"string":"OTHER"}}
    {"registertime":{"long":1500952152251},"userid":{"string":"User_2"},"regionid":{"string":"Region_1"},"gender":{"string":"MALE"}}
    {"registertime":{"long":1493556444692},"userid":{"string":"User_1"},"regionid":{"string":"Region_8"},"gender":{"string":"FEMALE"}}
    Processed a total of 10 messages
    
    _schemas:
    null
    null
    null
    {"subject":"topic1-value","version":1,"id":1,"schema":"{\"type\":\"record\",\"name\":\"KsqlDataSourceSchema\",\"namespace\":\"io.confluent.ksql.avro_schemas\",\"fields\":[{\"name\":\"userid\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"dc\",\"type\":[\"null\",\"string\"],\"default\":null}]}","deleted":false}
    {"subject":"topic2-value","version":1,"id":2,"schema":"{\"type\":\"record\",\"name\":\"KsqlDataSourceSchema\",\"namespace\":\"io.confluent.ksql.avro_schemas\",\"fields\":[{\"name\":\"registertime\",\"type\":[\"null\",\"long\"],\"default\":null},{\"name\":\"userid\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"regionid\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"gender\",\"type\":[\"null\",\"string\"],\"default\":null}]}","deleted":false}
    {"subject":"topic2.replica-value","version":1,"id":2,"schema":"{\"type\":\"record\",\"name\":\"KsqlDataSourceSchema\",\"namespace\":\"io.confluent.ksql.avro_schemas\",\"fields\":[{\"name\":\"registertime\",\"type\":[\"null\",\"long\"],\"default\":null},{\"name\":\"userid\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"regionid\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"gender\",\"type\":[\"null\",\"string\"],\"default\":null}]}","deleted":false}
    [2021-01-04 19:16:09,579] ERROR Error processing message, terminating consumer process:  (kafka.tools.ConsoleConsumer$)
    org.apache.kafka.common.errors.TimeoutException
    Processed a total of 6 messages
    
    provenance info (cluster, topic, timestamp):
    2qHo2TsdTIaTjvkCyf3qdw,topic1,1609787778125
    2qHo2TsdTIaTjvkCyf3qdw,topic1,1609787779123
    2qHo2TsdTIaTjvkCyf3qdw,topic1,1609787780125
    2qHo2TsdTIaTjvkCyf3qdw,topic1,1609787781246
    2qHo2TsdTIaTjvkCyf3qdw,topic1,1609787782125
    Processed a total of 10 messages
    
    timestamp info (group: topic-partition):
    replicator-dc1-to-dc2-topic2: topic2-0    1609787797164
    replicator-dc1-to-dc2-topic1: topic1-0    1609787797117
    Processed a total of 2 messages
    
    cluster id:
    ZagAAEfORQG-lxwq6OsV5Q
    
    -----dc2-----
    
    list topics:
    __consumer_offsets
    __consumer_timestamps
    _confluent-command
    _confluent-controlcenter-6-1-0-1-AlertHistoryStore-changelog
    _confluent-controlcenter-6-1-0-1-AlertHistoryStore-repartition
    _confluent-controlcenter-6-1-0-1-Group-ONE_MINUTE-changelog
    _confluent-controlcenter-6-1-0-1-Group-ONE_MINUTE-repartition
    _confluent-controlcenter-6-1-0-1-Group-THREE_HOURS-changelog
    _confluent-controlcenter-6-1-0-1-Group-THREE_HOURS-repartition
    _confluent-controlcenter-6-1-0-1-KSTREAM-OUTEROTHER-0000000106-store-changelog
    _confluent-controlcenter-6-1-0-1-KSTREAM-OUTEROTHER-0000000106-store-repartition
    _confluent-controlcenter-6-1-0-1-KSTREAM-OUTERTHIS-0000000105-store-changelog
    _confluent-controlcenter-6-1-0-1-KSTREAM-OUTERTHIS-0000000105-store-repartition
    _confluent-controlcenter-6-1-0-1-MetricsAggregateStore-changelog
    _confluent-controlcenter-6-1-0-1-MetricsAggregateStore-repartition
    _confluent-controlcenter-6-1-0-1-MonitoringMessageAggregatorWindows-ONE_MINUTE-changelog
    _confluent-controlcenter-6-1-0-1-MonitoringMessageAggregatorWindows-ONE_MINUTE-repartition
    _confluent-controlcenter-6-1-0-1-MonitoringMessageAggregatorWindows-THREE_HOURS-changelog
    _confluent-controlcenter-6-1-0-1-MonitoringMessageAggregatorWindows-THREE_HOURS-repartition
    _confluent-controlcenter-6-1-0-1-MonitoringStream-ONE_MINUTE-changelog
    _confluent-controlcenter-6-1-0-1-MonitoringStream-ONE_MINUTE-repartition
    _confluent-controlcenter-6-1-0-1-MonitoringStream-THREE_HOURS-changelog
    _confluent-controlcenter-6-1-0-1-MonitoringStream-THREE_HOURS-repartition
    _confluent-controlcenter-6-1-0-1-MonitoringTriggerStore-changelog
    _confluent-controlcenter-6-1-0-1-MonitoringTriggerStore-repartition
    _confluent-controlcenter-6-1-0-1-MonitoringVerifierStore-changelog
    _confluent-controlcenter-6-1-0-1-MonitoringVerifierStore-repartition
    _confluent-controlcenter-6-1-0-1-TriggerActionsStore-changelog
    _confluent-controlcenter-6-1-0-1-TriggerActionsStore-repartition
    _confluent-controlcenter-6-1-0-1-TriggerEventsStore-changelog
    _confluent-controlcenter-6-1-0-1-TriggerEventsStore-repartition
    _confluent-controlcenter-6-1-0-1-actual-group-consumption-rekey
    _confluent-controlcenter-6-1-0-1-aggregate-topic-partition-store-changelog
    _confluent-controlcenter-6-1-0-1-aggregate-topic-partition-store-repartition
    _confluent-controlcenter-6-1-0-1-aggregatedTopicPartitionTableWindows-ONE_MINUTE-changelog
    _confluent-controlcenter-6-1-0-1-aggregatedTopicPartitionTableWindows-ONE_MINUTE-repartition
    _confluent-controlcenter-6-1-0-1-aggregatedTopicPartitionTableWindows-THREE_HOURS-changelog
    _confluent-controlcenter-6-1-0-1-aggregatedTopicPartitionTableWindows-THREE_HOURS-repartition
    _confluent-controlcenter-6-1-0-1-cluster-rekey
    _confluent-controlcenter-6-1-0-1-expected-group-consumption-rekey
    _confluent-controlcenter-6-1-0-1-group-aggregate-store-ONE_MINUTE-changelog
    _confluent-controlcenter-6-1-0-1-group-aggregate-store-ONE_MINUTE-repartition
    _confluent-controlcenter-6-1-0-1-group-aggregate-store-THREE_HOURS-changelog
    _confluent-controlcenter-6-1-0-1-group-aggregate-store-THREE_HOURS-repartition
    _confluent-controlcenter-6-1-0-1-group-stream-extension-rekey
    _confluent-controlcenter-6-1-0-1-metrics-trigger-measurement-rekey
    _confluent-controlcenter-6-1-0-1-monitoring-aggregate-rekey-store-changelog
    _confluent-controlcenter-6-1-0-1-monitoring-aggregate-rekey-store-repartition
    _confluent-controlcenter-6-1-0-1-monitoring-message-rekey-store
    _confluent-controlcenter-6-1-0-1-monitoring-trigger-event-rekey
    _confluent-license
    _confluent-metrics
    _confluent-monitoring
    _confluent-telemetry-metrics
    _confluent_balancer_api_state
    _confluent_balancer_broker_samples
    _confluent_balancer_partition_samples
    _schemas
    connect-configs-dc2
    connect-offsets-dc2
    connect-status-dc2
    topic1
    topic2.replica
    
    topic1:
    {"userid":{"string":"User_2"},"dc":{"string":"dc2"}}
    {"userid":{"string":"User_1"},"dc":{"string":"dc1"}}
    {"userid":{"string":"User_6"},"dc":{"string":"dc2"}}
    {"userid":{"string":"User_9"},"dc":{"string":"dc1"}}
    {"userid":{"string":"User_9"},"dc":{"string":"dc2"}}
    {"userid":{"string":"User_9"},"dc":{"string":"dc1"}}
    {"userid":{"string":"User_9"},"dc":{"string":"dc2"}}
    {"userid":{"string":"User_9"},"dc":{"string":"dc1"}}
    {"userid":{"string":"User_9"},"dc":{"string":"dc2"}}
    {"userid":{"string":"User_9"},"dc":{"string":"dc1"}}
    Processed a total of 10 messages
    
    topic2.replica:
    {"registertime":{"long":1488571887136},"userid":{"string":"User_2"},"regionid":{"string":"Region_4"},"gender":{"string":"FEMALE"}}
    {"registertime":{"long":1496554479008},"userid":{"string":"User_3"},"regionid":{"string":"Region_9"},"gender":{"string":"OTHER"}}
    {"registertime":{"long":1515819037639},"userid":{"string":"User_1"},"regionid":{"string":"Region_7"},"gender":{"string":"FEMALE"}}
    {"registertime":{"long":1498630829454},"userid":{"string":"User_9"},"regionid":{"string":"Region_5"},"gender":{"string":"FEMALE"}}
    {"registertime":{"long":1491954362758},"userid":{"string":"User_6"},"regionid":{"string":"Region_6"},"gender":{"string":"FEMALE"}}
    {"registertime":{"long":1498308706008},"userid":{"string":"User_2"},"regionid":{"string":"Region_2"},"gender":{"string":"OTHER"}}
    {"registertime":{"long":1509409463384},"userid":{"string":"User_5"},"regionid":{"string":"Region_8"},"gender":{"string":"OTHER"}}
    {"registertime":{"long":1494736574275},"userid":{"string":"User_4"},"regionid":{"string":"Region_4"},"gender":{"string":"OTHER"}}
    {"registertime":{"long":1513254638109},"userid":{"string":"User_3"},"regionid":{"string":"Region_5"},"gender":{"string":"FEMALE"}}
    {"registertime":{"long":1499607488391},"userid":{"string":"User_4"},"regionid":{"string":"Region_2"},"gender":{"string":"OTHER"}}
    Processed a total of 10 messages
    
    _schemas:
    null
    null
    null
    {"subject":"topic1-value","version":1,"id":1,"schema":"{\"type\":\"record\",\"name\":\"KsqlDataSourceSchema\",\"namespace\":\"io.confluent.ksql.avro_schemas\",\"fields\":[{\"name\":\"userid\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"dc\",\"type\":[\"null\",\"string\"],\"default\":null}]}","deleted":false}
    {"subject":"topic2-value","version":1,"id":2,"schema":"{\"type\":\"record\",\"name\":\"KsqlDataSourceSchema\",\"namespace\":\"io.confluent.ksql.avro_schemas\",\"fields\":[{\"name\":\"registertime\",\"type\":[\"null\",\"long\"],\"default\":null},{\"name\":\"userid\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"regionid\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"gender\",\"type\":[\"null\",\"string\"],\"default\":null}]}","deleted":false}
    {"subject":"topic2.replica-value","version":1,"id":2,"schema":"{\"type\":\"record\",\"name\":\"KsqlDataSourceSchema\",\"namespace\":\"io.confluent.ksql.avro_schemas\",\"fields\":[{\"name\":\"registertime\",\"type\":[\"null\",\"long\"],\"default\":null},{\"name\":\"userid\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"regionid\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"gender\",\"type\":[\"null\",\"string\"],\"default\":null}]}","deleted":false}
    [2021-01-04 19:17:26,336] ERROR Error processing message, terminating consumer process:  (kafka.tools.ConsoleConsumer$)
    org.apache.kafka.common.errors.TimeoutException
    Processed a total of 6 messages
    
    provenance info (cluster, topic, timestamp):
    ZagAAEfORQG-lxwq6OsV5Q,topic1,1609787854055
    ZagAAEfORQG-lxwq6OsV5Q,topic1,1609787854057
    ZagAAEfORQG-lxwq6OsV5Q,topic1,1609787856052
    ZagAAEfORQG-lxwq6OsV5Q,topic1,1609787857052
    ZagAAEfORQG-lxwq6OsV5Q,topic1,1609787857054
    Processed a total of 10 messages
    
    timestamp info (group: topic-partition):
    replicator-dc2-to-dc1-topic1: topic1-0    1609787867007
    replicator-dc2-to-dc1-topic1: topic1-0    1609787877008
    Processed a total of 2 messages
    
    cluster id:
    2qHo2TsdTIaTjvkCyf3qdw
    

注釈

Confluent Platform 6.2.1 以降では、_confluent-command 内部トピックが提供され、_confluent-license トピックの代わりに使用することが推奨されており、新しいクラスターでもこれがデフォルトになります。今後は両方がサポートされます。詳細については「コンポーネントライセンスの構成」を参照してください。

Replicator のパフォーマンスのモニタリング

Replicator のモニタリングは、以下のために重要です。

  1. 複数のデータセンター間のデータ同期状態の測定
  2. ネットワークと Confluent Platform のパフォーマンス最適化

Replicator モニタリング

Control Center では、Replicators メニューから Replicator の詳細なモニタリングを確認できます。これを選択すると、現在クラスターで実行されている Replicator インスタンスの概要が表示されます。Replicator ごとに、以下の重要なメトリクスが表示されます。

  • Source Cluster - Replicator のレプリケート元になるクラスター
  • Source Topics - レプリケートされているトピックの数
  • Messages Replicated in/s - 1 秒あたりにレプリケートされたメッセージの数
  • Message Lag - 送信元クラスターに存在し、送信先にまだレプリケートされていないメッセージの数
  • Latency - メッセージが送信先に存在する時刻と送信元に存在する時刻の差異
画像

ここに表示される Replicator インスタンスを選択すると、特定のインスタンスに関する詳細が表示されます。Status タブでは前述のメトリクスが Connect タスクごとに表示され、Source Topics タブではそれらがトピックごとに表示されます。

画像

さらに Throughput カードを選択すると、詳しいメッセージラグが Connect タスクごとに表示されます。表示されているいずれかのタスクを選択すると、そのタスクがレプリケートしているパーティションのメトリクスが表示されます。

画像

ストリームモニタリング

Control Center のモニタリング機能には、ストリームパフォーマンスのモニタリングが含まれています。これにより、すべてのデータが消費されていること、さらにスループットとレイテンシを検証できます。モニタリングは、コンシューマーグループまたはトピック単位で表示されます。Confluent Replicator は、送信元クラスターからデータを読み取る組み込みのコンシューマーを備えているため、Control Center でそのパフォーマンスをモニタリングできます。Replicator のストリームモニタリングを有効にするには、モニタリングコンシューマーインターセプター でこれを構成します。 を参照してください。

画像
  1. 左側のメニューから dc1 Kafka クラスターを選択し、次に Data Streams を選択します。2 つのコンシューマーグループが存在し、それぞれ dc1 から dc2 へと実行されている Replicator インスタンス、replicator-dc1-to-dc2-topic1replicator-dc1-to-dc2-topic2 のものであることを確認します。

    画像
  2. 左側のメニューから dc2 Kafka クラスターを選択し、次に Data Streams を選択します。dc2 から dc1 へと実行されている 1 つのコンシューマーグループ、replicator-dc2-to-dc1-topic1 が存在することを確認します。

    画像

コンシューマーラグ

Replicator には送信元クラスターからデータを読み取る組み込みのコンシューマーがあり、Connect ワーカーのプロデューサーが送信先クラスターにデータをコミットした後にのみ、そのオフセットをコミットします(コミットの頻度は offset.flush.interval.ms パラメーターで構成します)。送信元クラスターで Replicator の組み込みコンシューマーのコンシューマーラグをモニタリングできます(dc1 から dc2 にデータをコピーする Replicator インスタンスの場合、送信元クラスターは dc1 です)。Replicator のコンシューマーラグをモニタリングする機能は、offset.topic.commit=true が設定されている場合に有効になります(デフォルトは true)。これにより、Replicator は、送信先クラスターにメッセージが書き込まれた後に自身のコンシューマーオフセットを送信元クラスター dc1 にコミットします。

  1. dc1 から dc2 にコピーする Replicator の場合 : 左側のメニューから dc1 (送信元クラスター)を選択し、次に Consumers を選択します。2 つのコンシューマーグループが存在し、それぞれ dc1 から dc2 へと実行されている Replicator インスタンス、replicator-dc1-to-dc2-topic1replicator-dc1-to-dc2-topic2 のものであることを確認します。Replicator のコンシューマーラグ情報は Control Center および kafka-consumer-groups で確認できますが、JMX を通じて取得することはできません。

    1. replicator-dc1-to-dc2-topic1 をクリックして、topic1 および _schemas の読み取り時の Replicator のコンシューマーラグを表示します。この表示は、以下と同等です。

      docker-compose exec broker-dc1 kafka-consumer-groups --bootstrap-server broker-dc1:29091 --describe --group replicator-dc1-to-dc2-topic1
      
      画像
    2. replicator-dc1-to-dc2-topic2 をクリックして、topic2 の読み取り時の Replicator のコンシューマーラグを表示します(これは docker-compose exec broker-dc1 kafka-consumer-groups --bootstrap-server broker-dc1:29091 --describe --group replicator-dc1-to-dc2-topic2 に相当します)。

      画像
  2. dc1 から dc2 にコピーする Replicator の場合 : 送信先クラスター dc2 で Replicator コンシューマーラグのモニタリングを試行しないでください。Control Center では dc2 内のトピック(つまり topic1_schemastopic2.replica)のコンシューマーラグも表示されますが、これは Replicator がそれらから消費していることを意味しません。dc2 でこのコンシューマーラグが表示されるのは、デフォルトで Replicator に offset.timestamps.commit=true が構成されているためです。これにより、Replicator は送信先クラスター dc2__consumer_offsets トピックにそのコンシューマーグループの自身のオフセットタイムスタンプをコミットします。ディザスターリカバリが発生した場合、これにより Replicator はセカンダリクラスターに切り替えたときでも中止したポイントから再開できるようになります。

  3. コンシューマーラグと、Replicator の組み込みコンシューマーに関連付けられている MBean 属性 records-lag を混同しないでください。この属性は、Replicator の組み込みコンシューマーが元のデータ生成レートを維持できているかどうかを反映しますが、送信先クラスターにメッセージを生成するときのレプリケーションラグは考慮していません。records-lag はリアルタイムであり、通常は値が 0.0 です。

    docker-compose exec connect-dc2 \
        kafka-run-class kafka.tools.JmxTool \
        --object-name "kafka.consumer:type=consumer-fetch-manager-metrics,partition=0,topic=topic1,client-id=replicator-dc1-to-dc2-topic1-0" \
        --attributes "records-lag" \
        --jmx-url service:jmx:rmi:///jndi/rmi://connect-dc2:9892/jmxrmi
    

フェイルオーバー後のアプリケーションの再開

障害が発生したら、Java コンシューマーアプリケーションを別のデータセンターに切り替えます。送信先クラスターでは、アプリケーションは送信元クラスターで中止したポイントからデータの消費を自動的に再開できます。

この機能を使用するには、コンシューマータイムスタンプインターセプター で Java コンシューマーアプリケーションを構成します。このサンプルコードは こちら で公開されています。

  1. デモの開始(前のセクション を参照)後に、コンシューマーを実行して dc1 Kafka クラスターに接続します。コンシューマーグループ ID が自動的に java-consumer-topic1 として構成され、コンシューマータイムスタンプインターセプターとモニタリングインターセプターが使用されます。

    mvn clean package
    mvn exec:java -Dexec.mainClass=io.confluent.examples.clients.ConsumerMultiDatacenterExample -Dexec.args="topic1 localhost:9091 http://localhost:8081 localhost:9092"
    
  2. コンシューマー出力で、dc1 と dc2 の両方からデータを読み取っていることを確認します。

    ...
    key = User_1, value = {"userid": "User_1", "dc": "dc1"}
    key = User_9, value = {"userid": "User_9", "dc": "dc2"}
    key = User_6, value = {"userid": "User_6", "dc": "dc2"}
    ...
    
  3. コンシューマーが dc1 から消費している場合でも、コンシューマーグループ java-consumer-topic1 に対してコミットされた dc2 コンシューマーオフセットが存在します。以下のコマンドを実行して、dc2 の __consumer_offsets トピックから読み取ります。

    docker-compose exec broker-dc2 \
        kafka-console-consumer \
        --topic __consumer_offsets \
        --bootstrap-server localhost:9092 \
        --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" | grep java-consumer
    
  4. コミットされたオフセットが存在することを確認します。

    ...
    [java-consumer-topic1,topic1,0]::OffsetAndMetadata(offset=1142, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1547146285084, expireTimestamp=None)
    [java-consumer-topic1,topic1,0]::OffsetAndMetadata(offset=1146, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1547146286082, expireTimestamp=None)
    [java-consumer-topic1,topic1,0]::OffsetAndMetadata(offset=1150, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1547146287084, expireTimestamp=None)
    ...
    
  5. Kafka クライアントで Confluent Monitoring Interceptor が構成されている場合、これらはメタデータを _confluent-monitoring というトピックに書き込みます。Kafka クライアントには、Apache Kafka クライアント API を使用して Kafka ブローカーに接続するアプリケーション(カスタムクライアントコードなど)、または組み込みのプロデューサーまたはコンシューマーを備えたサービス(Kafka Connect、ksqlDB、Kafka Streams アプリケーションなど)が含まれています。Control Center はそのトピックを使用して、すべてのメッセージが配信されていることを確認し、スループットとレイテンシのパフォーマンスに関する統計を提供します。その同じトピックから、どのプロデューサーがどのトピックに書き込んでいるか、どのコンシューマーがどのトピックから読み取っているかも確認できます。また、リポジトリにはサンプルの スクリプト が含まれています。

    ./map_topics_clients.py
    

    注釈

    このスクリプトはデモ専用です。本稼働環境には適していません。

  6. Java コンシューマーが実行されている安定した状態では、以下のように表示されます。

    Reading topic _confluent-monitoring for 60 seconds...please wait
    
    __consumer_timestamps
      producers
        consumer-1
        producer-10
        producer-11
        producer-6
        producer-8
      consumers
        replicator-dc1-to-dc2-topic1
        replicator-dc1-to-dc2-topic2
        replicator-dc2-to-dc1-topic1
    
    _schemas
      producers
        connect-worker-producer-dc2
      consumers
        replicator-dc1-to-dc2-topic1
    
    topic1
      producers
        connect-worker-producer-dc1
        connect-worker-producer-dc2
        datagen-dc1-topic1
        datagen-dc2-topic1
      consumers
        java-consumer-topic1
        replicator-dc1-to-dc2-topic1
        replicator-dc2-to-dc1-topic1
    
    topic2
      producers
        datagen-dc1-topic2
      consumers
        replicator-dc1-to-dc2-topic2
    
    topic2.replica
      producers
        connect-worker-producer-dc2
    
  7. dc1 をシャットダウンします。

    docker-compose stop connect-dc1 schema-registry-dc1 broker-dc1 zookeeper-dc1
    
  8. コンシューマーを停止してから再起動して、dc2 Kafka クラスターに接続します。同じコンシューマーグループ ID、java-consumer-topic1 を引き続き使用するため、中止したポイントから再開できます。

    mvn exec:java -Dexec.mainClass=io.confluent.examples.clients.ConsumerMultiDatacenterExample -Dexec.args="topic1 localhost:9092 http://localhost:8082 localhost:9092"
    
  9. dc2 からのデータのみが表示されることを確認します。

    ...
    key = User_8, value = {"userid": "User_8", "dc": "dc2"}
    key = User_9, value = {"userid": "User_9", "dc": "dc2"}
    key = User_5, value = {"userid": "User_5", "dc": "dc2"}
    ...
    

デモの停止

デモを終了する場合は、デモを停止して Docker コンテナーとイメージを削除します。

  1. 以下のスクリプトを実行してすべてのサービスを停止し、Docker コンテナーとイメージを削除します。

    ./stop.sh
    
  2. 以下のコマンドを実行して、実行されているコンテナーが存在しないことを確認します。

    docker container ls
    

次のステップ

  • 障害シナリオの発生時の回復性を高めるように複数の Apache Kafka クラスターを設計および構成するための実用的なガイドについては、ディザスターリカバリに関するホワイトペーパー を参照してください。このホワイトペーパーでは、フェイルオーバーとフェイルバックを行い、最終的にリカバリを成功させるための計画について説明しています。
  • データレプリケーションに Confluent Platform を使用する方法の概要については、「概要」を参照してください。
  • Replicator を構成して独自のマルチクラスターデプロイをセットアップする方法のクイックスタートについては、「チュートリアル: クラスターの境界を越えたデータのレプリケーション」を参照してください。
  • Replicator の概要については、「Replicator の概要」を参照してください。
  • Confluent Platform を使用してフォロワー、オブザーバー、レプリカ配置を含むストレッチクラスターを作成する方法については、「Multi-Region Clusters」を参照してください。