チュートリアル: トピックデータ共有での Cluster Linking の使用

Looking for Confluent Cloud Cluster Linking docs? You are currently viewing Confluent Platform documentation. If you are looking for Confluent Cloud docs, check out Cluster Linking on Confluent Cloud.

重要

この機能はプレビュー機能として利用できます。プレビュー機能とは、開発者から早い段階でフィードバックを受けるために提供している Confluent Platform のコンポーネントのことです。この機能は、評価用、本稼働環境以外でのテスト用、あるいは Confluent にフィードバックを提供するために使用できます。

このチュートリアルでは、トピックデータ共有のユースケースで Cluster Linking を使用する方法について具体的に説明します。

チュートリアルの内容

このチュートリアルを完了すると、2 つのクラスターを構成し、Cluster Linking を使用してミラートピックを作成して、クラスター間でトピックデータを共有できるようになります。また、ミラーリングを停止してトピックを書き込み可能にする方法、および 2 つのトピックに不一致が生じているかどうかを検証する方法についても学びます。

../../_images/kafka-basics-multi-cluster.ja.png

前提条件とコマンド例について

  • 以下の手順は、Confluent Platform 6.0.0 以降、および Java 1.8 または 1.11( Confluent Platform で必要です)のローカルインストールがあることを前提としています。Confluent Platform を初めて使用する場合は、最初に「Confluent Platform のクイックスタート(ローカルインストール)」を終えてからこれらのチュートリアルに進んでください。
  • 特に明記しない限り、以下の例は、インストールされている Confluent Platform のデフォルトの場所にプロパティファイルが存在することを前提としています。これにより、ほとんどの場合、サンプルコマンドを簡単にターミナルに直接コピーして貼り付けることができます。
  • Confluent Platform がインストールされている場合、Confluent CLI コマンド自体は任意のディレクトリ( kafka-topicskafka-console-producerkafka-console-consumer )から実行できます。ただし、以下の例では、$CONFLUENT_HOMEkafka-server-start )のプロパティファイルにアクセスするコマンドは、このディレクトリから実行します。
  • Confluent CLI コマンドでは、コマンドの最初または最後にブートストラップサーバーを指定できます。つまり、kafka-topics --list --bootstrap-server localhost:9092kafka-topics --bootstrap-server localhost:9092 --list と同じです。これらのチュートリアルでは、ターゲットブートストラップサーバーはコマンドの最後に指定します。

残りのチュートリアルでは、$CONFLUENT_HOME への参照を使用します。これは Confluent Platform インストールディレクトリ内の etc/kafka を表します。以下の例のように、これを環境変数として設定します。

export CONFLUENT_HOME=$HOME/confluent-6.2.0-0
PATH=$CONFLUENT_HOME/bin:$PATH

ポートと構成のマッピング

このチュートリアルのデプロイ例では、以下のポートと機能の構成を使用し、サーバーが localhost で実行されているものと仮定します。

送信元 送信先
Kafka ブローカー 9092(元の demo トピックの場所) 9093(demo ミラートピックの場所)
ZooKeeper 2181 2182
HTTP リスナー 8090 8091
クラスターリンク   クラスターリンクは送信先で有効にされます。

Kafka および ZooKeeper ファイルの構成

$CONFLUENT_HOME/etc/kafka/ で、以下のファイルを構成して送信元クラスターと送信先クラスターをセットアップします。既存の zookeeper.properties および server.properties ファイルをコピーして変更し、開始点として使用することができます。値を変更する行のコメントを必ず解除してください。

Cluster Linking を有効にするには、新しいプロパティ(confluent.cluster.link.enable=true )を送信先サーバーに追加する必要があります。また、confluent.http.server.listeners を追加し、各ブローカーで一意になるように構成する必要があります。

ファイル 構成
zookeeper-src.properties

dataDir=/tmp/zookeeper-1

clientPort=2181 (これはデフォルトであり、変更不要です)

zookeeper-dst.properties

dataDir=/tmp/zookeeper-2

clientPort=2182

server-src.properties

listeners=PLAINTEXT:// :9092 (これはデフォルトです。コメントアウトのままにします。変更は不要です)

log.dirs=/tmp/kafka-logs-1

zookeeper.connect=localhost:2181 (これはデフォルトです。変更は不要です)

以下のリスナー構成を追加して、このブローカーの一意の REST エンドポイントを指定します。

confluent.http.server.listeners=http://localhost:8090

server-dst.properties

confluent.cluster.link.enable=true (これを追加する必要があります)

listeners=PLAINTEXT:// :9093

log.dirs=/tmp/kafka-logs-2

zookeeper.connect=localhost:2182

以下のリスナー構成を追加して、このブローカーの一意の REST エンドポイントを指定します。

confluent.http.server.listeners=http://localhost:8091

送信元クラスターの起動

$CONFLUENT_HOME から以下のコマンドをそれぞれ個別のコマンドウィンドウで実行します。

  1. 送信元クラスター用の ZooKeeper サーバーを起動します。

    ./bin/zookeeper-server-start etc/kafka/zookeeper-src.properties
    
  2. 送信元クラスター用の Kafka ブローカーを起動します。

    ./bin/kafka-server-start etc/kafka/server-src.properties
    

注釈

送信元クラスターは Confluent Platform 5.4 以降(Apache Kafka® 2.4)である必要があります。

送信先クラスターの起動

$CONFLUENT_HOME から以下のコマンドをそれぞれ個別のコマンドウィンドウで実行します。

  1. 送信先クラスター用の ZooKeeper サーバーを起動します。

    ./bin/zookeeper-server-start etc/kafka/zookeeper-dst.properties
    
  2. 送信先クラスター用の Kafka ブローカーを起動します。

    ./bin/kafka-server-start etc/kafka/server-dst.properties
    

注釈

送信先クラスターは、Confluent Platform 6.0.0 以降である必要があります。

送信元クラスターへの情報の入力

  1. 送信元クラスターでトピックを作成します。

    kafka-topics --create --topic demo --bootstrap-server localhost:9092
    

    トピックが正常に作成されたことを示す確認メッセージが表示されます。

    Created topic demo.
    

    ちなみに

    以下のとおり、既存のトピックのリストを取得できます。

    kafka-topics --list --bootstrap-server localhost:9092
    

    --describe オプションでトピックの詳細情報を表示します。

    kafka-topics --describe --topic demo --bootstrap-server localhost:9092
    
  2. 新しいコマンドウィンドウでプロデューサーを実行して、送信元クラスター上の demo トピックに複数のメッセージを送信し、トピックにデータを入力します。

    kafka-console-producer --topic demo --bootstrap-server localhost:9092
    

    プロデューサーが起動すると、プロンプト > が表示されます。

    プロンプトで複数のメッセージを入力します。各メッセージの後に Enter キーを押します。プロデューサーウィンドウは以下のようになります。

    >first
    >second
    >third
    

    ちなみに

    上の例をコピーしてプロデューサーに貼り付けるのではなく、メッセージを入力し、各メッセージの後に Enter キーを押してください。

  3. 送信元クラスターのトピックから消費します。

    新しいターミナルで、コンシューマーを実行して demo トピックからメッセージを消費します。

    kafka-console-consumer --topic demo --from-beginning --bootstrap-server localhost:9092
    

    トピックでメッセージが正しく消費されると、出力は以下のようになります。

    first
    second
    third
    

トピックミラーの作成と検証

  1. 送信先クラスターでクラスターリンクを作成します。

    kafka-cluster-links --bootstrap-server localhost:9093 \
          --create --link-name demo-link --config bootstrap.servers=localhost:9092
    

    以下のような確認メッセージが表示されます。

    Cluster link 'demo-link' creation successfully completed.
    

    ちなみに

    Cluster Linking が送信先クラスター(/etc/kafka/server-dst.properties )で有効になっている必要があります。有効になっていない場合、この操作は失敗します。

  2. ミラートピックを初期化します。

    以下のコマンドで、クラスターリンク demo-link を使用して元の demo トピックのミラーを確立します。

    kafka-topics --create --topic demo --mirror-topic demo --link-name demo-link \
    --bootstrap-server localhost:9093
    

    以下のような確認メッセージが表示されます。

    Created topic demo.
    

    ちなみに

    • プレビューでは、ミラートピック名は元のトピック名と一致している必要があります。詳細については、「既知の制限」を参照してください。
    • ミラートピックでは、作成時にその送信元トピックへのリンクを指定する必要があります。これにより、ミラートピックはデータまたはメタデータの不一致がないクリーンな状態になります。
  3. 送信先クラスターのミラートピックから消費して、そのミラートピックを検証します。

    新しいターミナルで、コンシューマーを実行してミラートピックからメッセージを消費します。

    kafka-console-consumer --topic demo --from-beginning --bootstrap-server localhost:9093
    

    出力は以下のようになります。

    first
    second
    third
    

送信先のレプリカステータスのチェック

以下のコマンドを実行して、送信先クラスターのレプリカをモニタリングします。

kafka-replica-status --topics demo --include-linked --bootstrap-server localhost:9093

出力は以下の例のようになります。

Topic Partition Replica ClusterLink IsLeader IsObserver IsIsrEligible IsInIsr IsCaughtUp LastCaughtUpLagMs LastFetchLagMs LogStartOffset LogEndOffset
demo  0         0       -           true     false      true          true    true       0                 0              0              3
demo  0         0       demo-link   true     false      true          true    true       2                 2              0              3

ちなみに

出力には、送信元と送信先のレプリカが ClusterLink フィールドで区別されて示されます。ここで - はローカルを意味し、リンク名(この例では demo-link)はこのクラスターリンクによるレプリカを示しています。

ミラートピックが読み取り専用であることの検証

送信先のミラートピックに対するメッセージを生成し、そのミラートピックが読み取り専用であることを検証してみます。

  1. ミラートピックをターゲットとするプロデューサーを起動します。

    kafka-console-producer --topic demo --bootstrap-server localhost:9093
    
  2. > プロンプトでメッセージを入力し、Enter キーを押します。

    ミラートピックは書き込み不可であるため、以下のようなエラーが返されるはずです。

    >hi
    2020-08-13 16:55:53,571] ERROR Error when sending message to topic demo with key: null, value: 2 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
    

送信元トピック構成の変更

  1. ミラートピックの詳細を表示して、送信先とミラートピックの構成を検証します。

    kafka-configs --describe --topic demo --bootstrap-server localhost:9093
    

    出力には、厳密にミラーリングされた構成に設定されているパラメーターが表示されます。

    Dynamic configs for topic demo are:
    compression.type=producer sensitive=false synonyms={DYNAMIC_TOPIC_CONFIG:compression.type=producer, DEFAULT_CONFIG:compression.type=producer}
    cleanup.policy=delete sensitive=false synonyms={DYNAMIC_TOPIC_CONFIG:cleanup.policy=delete, DEFAULT_CONFIG:log.cleanup.policy=delete}
    segment.bytes=1073741824 sensitive=false synonyms={DYNAMIC_TOPIC_CONFIG:segment.bytes=1073741824, STATIC_BROKER_CONFIG:log.segment.bytes=1073741824, DEFAULT_CONFIG:log.segment.bytes=1073741824}
    max.message.bytes=1048588 sensitive=false synonyms={DYNAMIC_TOPIC_CONFIG:max.message.bytes=1048588, DEFAULT_CONFIG:message.max.bytes=1048588}
    
  2. 送信元セット retention.ms を変更します。

    これは、送信元で明示的に設定されている場合にのみミラーリングされる構成の 1 つです(retention.ms は前の手順の出力には含まれていませんでした)。

    kafka-configs --alter --topic demo --add-config retention.ms=123456890 --bootstrap-server localhost:9092
    

    以下のような確認メッセージが表示されます。

    Completed updating config for topic demo.
    
  3. kafka-configs コマンドを再実行して、retention.ms がミラートピックに存在することを検証します。

    kafka-configs --describe --topic demo --bootstrap-server localhost:9093
    

    動的な構成には、retention.ms が含まれています。

    Dynamic configs for topic demo are:
    compression.type=producer sensitive=false synonyms={DYNAMIC_TOPIC_CONFIG:compression.type=producer, DEFAULT_CONFIG:compression.type=producer}
    cleanup.policy=delete sensitive=false synonyms={DYNAMIC_TOPIC_CONFIG:cleanup.policy=delete, DEFAULT_CONFIG:log.cleanup.policy=delete}
    segment.bytes=1073741824 sensitive=false synonyms={DYNAMIC_TOPIC_CONFIG:segment.bytes=1073741824, STATIC_BROKER_CONFIG:log.segment.bytes=1073741824, DEFAULT_CONFIG:log.segment.bytes=1073741824}
    retention.ms=123456890 sensitive=false synonyms={DYNAMIC_TOPIC_CONFIG:retention.ms=123456890}
    max.message.bytes=1048588 sensitive=false synonyms={DYNAMIC_TOPIC_CONFIG:max.message.bytes=1048588, DEFAULT_CONFIG:message.max.bytes=1048588}
    

    ちなみに

    前の手順で加えた変更が伝播されて、送信元トピックの retention.ms が設定されるまで最大 5 秒かかる場合があります。

送信元トピックのパーティションの変更

  1. 送信元トピックのパーティションの数を変更します。

    kafka-topics --alter --topic demo --partitions 8 --bootstrap-server localhost:9092
    
  2. 送信元トピックの変更を検証します。

    kafka-topics --describe --topic demo --bootstrap-server localhost:9092
    

    出力は以下のようになります。

    Topic: demo       PartitionCount: 8       ReplicationFactor: 1    Configs: segment.bytes=1073741824,retention.ms=123456890
      Topic: demo     Partition: 0    Leader: 0       Replicas: 0     Isr: 0  Offline:
      Topic: demo     Partition: 1    Leader: 0       Replicas: 0     Isr: 0  Offline:
      Topic: demo     Partition: 2    Leader: 0       Replicas: 0     Isr: 0  Offline:
      Topic: demo     Partition: 3    Leader: 0       Replicas: 0     Isr: 0  Offline:
      Topic: demo     Partition: 4    Leader: 0       Replicas: 0     Isr: 0  Offline:
      Topic: demo     Partition: 5    Leader: 0       Replicas: 0     Isr: 0  Offline:
      Topic: demo     Partition: 6    Leader: 0       Replicas: 0     Isr: 0  Offline:
      Topic: demo     Partition: 7    Leader: 0       Replicas: 0     Isr: 0  Offline:
    
  3. ミラートピックの変更を検証します。

    kafka-topics --describe --topic demo --bootstrap-server localhost:9093
    

    ミラートピックに対する同じコマンドの出力は、元のトピックと正確に一致している必要があります。

    Topic: demo       PartitionCount: 8       ReplicationFactor: 1    Configs: segment.bytes=1073741824,retention.ms=123456890
      Topic: demo     Partition: 0    Leader: 0       Replicas: 0     Isr: 0  Offline:
      Topic: demo     Partition: 1    Leader: 0       Replicas: 0     Isr: 0  Offline:
      Topic: demo     Partition: 2    Leader: 0       Replicas: 0     Isr: 0  Offline:
      Topic: demo     Partition: 3    Leader: 0       Replicas: 0     Isr: 0  Offline:
      Topic: demo     Partition: 4    Leader: 0       Replicas: 0     Isr: 0  Offline:
      Topic: demo     Partition: 5    Leader: 0       Replicas: 0     Isr: 0  Offline:
      Topic: demo     Partition: 6    Leader: 0       Replicas: 0     Isr: 0  Offline:
      Topic: demo     Partition: 7    Leader: 0       Replicas: 0     Isr: 0  Offline:
    

    ちなみに

    デフォルトのメタデータリフレッシュは 5 分であるため、変更が伝播されるまで数分かかる場合があります。これは、 クラスターリンクを作成する ときに構成 metadata.max.age.ms を設定することで改善できます。以下に例を示します。

    ./bin/kafka-cluster-links --bootstrap-server localhost:9093 --create --link-name demo-link \
    --config bootstrap.servers=localhost:9092,metadata.max.age.ms=5000
    

ミラートピックのリストの表示

ミラートピックのリストを表示するには、--include-topics オプションを指定して kafka-cluster-links --list を実行します。

kafka-cluster-links --list --link-name demo-link --include-topics --bootstrap-server localhost:9093

この出力は以下のようになります。

Link name: 'demo-link', link ID: '123-some-link-id', cluster ID: '123-some-cluster-id', topics: [demo]

書き込み可能にするためのミラートピックの切り替え

  1. 送信先の demo トピックのミラーリングを停止します。

    kafka-mirrors --failover --topics demo --link demo-link --bootstrap-server localhost:9093
    

    以下のような確認メッセージが表示されます。

    Topic 'demo's mirror was successfully stopped.
    

両方のトピックに対する生成による不一致の検証

  1. 送信元の元のトピックにメッセージを送信するために、プロデューサーを実行します(このプロデューサーがまだ実行されている場合は、このコマンドウィンドウに戻ります)。

    kafka-console-producer --topic demo --bootstrap-server localhost:9092
    

    プロンプトで old というメッセージを入力します。

    > old
    
  2. プロデューサーを実行して、メッセージを送信先のトピック(もうミラーリングされていません)に送信します。

    kafka-console-producer --topic demo --bootstrap-server localhost:9093
    

    プロンプトで new というメッセージを入力します。

    > new
    
  3. コンシューマーを実行して、両方のトピックから読み取ります。出力には、これらが一致していないことが示されます。

    • コンシューマーを実行して、送信元の元のトピックからメッセージを読み取ります(このコンシューマーがまだ実行されている場合は、このウィンドウの出力を確認します)。

      kafka-console-consumer --topic demo --from-beginning --bootstrap-server localhost:9092
      

      送信元トピックの出力に old メッセージが含まれています。

      first
      second
      third
      old
      
    • コンシューマーを実行して、送信先のトピックからメッセージを読み取ります(このコンシューマーがまだ実行されている場合は、このウィンドウの出力を確認します)。

      kafka-console-consumer --topic demo --from-beginning --bootstrap-server localhost:9093
      

      送信先トピックの出力に new メッセージが含まれています。

      first
      second
      third
      new
      

終了後の手順

シャットダウンとクリーンアップのタスクを実行します。

  • それぞれのコマンドウィンドウで Ctrl キーを押しながら C キーを押して、コンシューマーとプロデューサーを停止します。
  • 他のすべてのコンポーネントを、起動したときと逆の順序で、それぞれのコマンドウィンドウで Ctrl キーを押しながら C キーを押して停止します。たとえば、Kafka ブローカーを停止してから、それぞれの ZooKeeper を停止します。

おすすめの関連情報

Cluster Linking のデモ(Docker)