ハイブリッドクラウドとクラウドへのブリッジ

Looking for Confluent Cloud Cluster Linking docs? You are currently viewing Confluent Platform documentation. If you are looking for Confluent Cloud docs, check out Cluster Linking on Confluent Cloud.

はじめに

このチュートリアルでは、Confluent Platform クラスターと Confluent Cloud クラスターをリンクするハイブリッドのユースケースでの Cluster Linking の実践的な使用方法について説明します。

次のように両方向に移動するデータのデプロイを作成します。

  • Confluent Cloud から Confluent Platform へ

  • Confluent Platform から Confluent Cloud へ

    • この方向では、Confluent Platform 7.0.0 の新機能である “ソース開始” クラスターリンクが必要です。

      ../../_images/source-initiated-cluster-link.png

どちらの場合も、Confluent Platform ブローカーが Confluent Cloud ブローカーへの接続を開始します。このため、Confluent Cloud が Confluent Platform ブローカーに接続できるようにファイアウォールを開く必要はありません。

プロセスの中で、Confluent Platform コマンドと Confluent Cloud コマンドで使用するさまざまなセキュリティ認証情報と構成ファイルを作成します。これらをまとめたリストについては、このチュートリアルの最後にある「構成の概要」を参照してください。

Cluster Linking を使用できるクラスターについては、「サポートされるクラスタータイプ」を参照してください。

../../_images/cluster-link-hybrid.ja.png

チュートリアルの内容

このチュートリアルでは、2 つのクラスター(Confluent Platform と Confluent Cloud に 1 つずつ)を構成し、Confluent Cloud に対してファイアウォールを開くことなく Cluster Linking を使用してクラスター間でトピックデータを双方向に共有します。

Confluent Platform のインストール

Confluent Platform バージョン 7.0.0 をダウンロードして展開します。

チュートリアルの続きの手順のために、次の変数を設定する必要があります。

export CONFLUENT_HOME=<CP installation directory>
export CONFLUENT_CONFIG=$CONFLUENT_HOME/etc/kafka

ターミナルウィンドウを開いたときに常に実行されるように、これらの 2 つの行を .bashrc または .bash-profile に追加します。

前提条件とコマンド例について

注釈

As a general guideline (not just for this tutorial), any customer-owned firewall that allows the cluster link connection from source cluster brokers to destination cluster brokers must allow the TCP connection to persist in order for Cluster Linking to work.

  • ここで紹介する手順では、Confluent Platform 7.0.0 以降 と、Java 1.8 または 1.11(Confluent Platform に必要)がローカルにインストールされていることを前提としています。セルフマネージド型デプロイのインストール手順 はドキュメントに記載されています。Confluent Platform を初めて使用する場合は、最初に「Confluent Platform(ローカル)を使用した Apache Kafka のクイックスタート」を参照してからこのチュートリアルに進んでください。
  • 特に明記しない限り、以下の例は、Confluent Platform のプロパティファイルがデフォルトのインストール場所に存在することを前提としています。これにより、例からコマンドを簡単にコピーしてターミナルに直接貼り付けることができます。
  • Confluent Platform のデフォルトのインストールでは、Confluent CLICluster Linking コマンド$CONFLUENT_HOME/bin にあり、プロパティファイルはディレクトリ CONFLUENT_CONFIG$CONFLUENT_HOME/etc/kafka/)にあります。これらのコマンドにアクセスするには Confluent Platform を実行しておく必要があります。Confluent Platform を 構成 して 実行 すると、どのコマンドも引数を付けずに入力することでヘルプを表示できるようになります(kafka-cluster-links など)。
  • このチュートリアルでは Confluent Cloud へのログインと Confluent Cloud CLI が必要です。詳細については、Confluent Cloud の Cluster Linking のクイックスタート の「Confluent Cloud の最新バージョンの取得」と、複数の CLI を同時実行する 方法に関する説明が記載されている「Confluent CLI v2 への移行」を参照してください(後者は特に Confluent Platform の 7.1 リリースへの対応にも役立ちます)。Confluent Cloud を初めて使用する場合は、最初にクイックスタートを参照してからこのチュートリアルに進んでください。
  • このチュートリアルでは、Confluent Cloud で 専用クラスター を実行している必要があります。この場合、Confluent Cloud の料金が発生します。

ポートと構成のマッピング

このチュートリアルのデプロイ例では、以下のポートと機能の構成を使用し、サーバーが localhost で実行されているものと仮定します。

Confluent Platform
Kafka ブローカー 9092
ZooKeeper 2181

ちなみに

  • これらは、このチュートリアルで使用するサンプルポートです。Cluster Linking ではこれらと同じポートを使用する必要はありません。必要に応じて変更することができます。
  • これらのポートを使用する他のプロセスがある場合は、他のプロセスを終了するか、チュートリアルの手順を変更して別のポートを使用してください。

Kafka および ZooKeeper ファイルの構成

$CONFLUENT_CONFIG で、以下のファイルを構成して Confluent Platform クラスターをセットアップします。

$CONFLUENT_CONFIG/zookeeper.properties をコピーして zookeeper-clusterlinking.properties のベースとして使用します。

$CONFLUENT_CONFIG/server.properties をコピーして server-clusterlinking.properties のベースとして使用します。

ファイル 構成
zookeeper-clusterlinking.properties

dataDir=/tmp/zookeeper-clusterlinking (変更が必要です)

clientPort=2181 (これがデフォルトです)

server-clusterlinking.properties

以下を既存のファイルに追加する必要があります。

inter.broker.listener.name=SASL_PLAINTEXT

sasl.enabled.mechanisms=SCRAM-SHA-512

sasl.mechanism.inter.broker.protocol=SCRAM-SHA-512

listener.name.sasl_plaintext.scram-sha-512.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="kafka" password="kafka-secret";

confluent.reporters.telemetry.auto.enable=false

confluent.cluster.link.enable=true

password.encoder.secret=encoder-secret

以下については既存の構成を変更するか、既存の構成を使用します。

listeners=SASL_PLAINTEXT://:9092

advertised.listeners=SASL_PLAINTEXT://:9092

log.dirs=/tmp/kafka-logs-1

zookeeper.connect=localhost:2181 (既にこのように設定されているはずです)

offsets.topic.replication.factor=1 (既にこのように設定されているはずです)

confluent.license.topic.replication.factor=1 (既にこのように設定されているはずです)

注釈

  • この例では、SSL で保護された 1 つの ZooKeeper と 1 つの Confluent Server ブローカーのみを構成します。ローカルマシンでのテストの場合はこれで問題ありませんが、本稼働環境ではフォールトトレランスと高可用性を目的として複数の Zookeeper とブローカーを複数のマシンに分散させ、すべてを認証と暗号化で保護する必要があります。
  • この例では、ブローカーが 1 つのテストセットアップであるため、重要な内部トピックのレプリケーション係数は 1 に設定されています。本稼働環境のデプロイでは、これらのトピックのレプリケーション係数を 1 に設定しないでください。一般に、レプリケーション係数はブローカーの数に応じて 3 以上にする必要があります。
  • パラメーター password.encoder.secret は、クラスターリンクに保管される認証情報を暗号化するために必要です。

Confluent Platform クラスターの起動

別々のコマンドウィンドウで以下のコマンドを実行します。

ZooKeeper コマンドと Confluent Server コマンドはユーザーが停止するまで ”完了” しないため、アプリケーションの実行中はこれらのウィンドウを開いておく必要があります。

別のコマンドウィンドウを、完了させるコマンドを実行するメインのターミナルとして使用します(これらのコマンドの例としては、kafka-configskafka-topicskafka-cluster-links などがあります。場合によっては kafka-console-producerkafka-console-consumer も含まれますが、これら 2 つのコマンドは実行し続ける必要があることがあります)。

../../_images/cluster-link-hybrid-command-windows.ja.png
  1. 新しいコマンドウィンドウで Confluent Platform クラスターの ZooKeeper サーバーを起動します。

    zookeeper-server-start $CONFLUENT_CONFIG/zookeeper-clusterlinking.properties
    
  2. 2 つのユーザーのクラスターに SASL SCRAM 認証情報を作成するコマンドを実行します。一方のユーザーは Kafka クラスターが使用し、もう一方のユーザーはクラスターに対してコマンドを実行するためのものです。

    • 次のコマンドを実行して、Kafka クラスター自体が使用する “kafka” と呼ばれるユーザーのクラスターに認証情報を作成します。

      kafka-configs --zookeeper localhost:2181 --alter --add-config \
        'SCRAM-SHA-512=[iterations=8192,password=kafka-secret]' \
        --entity-type users --entity-name kafka
      
    • 次のコマンドを実行して、クラスターに対してコマンドを実行するために使用する “admin” と呼ばれるユーザーのクラスターに認証情報を作成します。

      kafka-configs --zookeeper localhost:2181 --alter --add-config \
        'SCRAM-SHA-512=[iterations=8192,password=admin-secret]' \
        --entity-type users --entity-name admin
      
  3. Confluent Platform クラスターに対してコマンドを実行するときに認証する admin の認証情報を含むファイルを作成します。

    テキストエディタを開き、$CONFLUENT_CONFIG/CP-command.config という名前のファイルを作成して以下のコンテンツをコピーアンドペーストします。

    sasl.mechanism=SCRAM-SHA-512
    security.protocol=SASL_PLAINTEXT
    sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
      username="admin" \
      password="admin-secret";
    
  4. 新しいコマンドウィンドウで送信元クラスターの Confluent Server ブローカーを起動し、コマンドの一部として認証情報を渡します。

    kafka-server-start $CONFLUENT_CONFIG/server-clusterlinking.properties
    
  5. Confluent Platform のクラスター ID を取得します。

    kafka-cluster cluster-id --bootstrap-server localhost:9092 --config $CONFLUENT_CONFIG/CP-command.config
    

    出力は以下のようになります。

    Cluster ID: G1pnOMOxSjWYIX8xuR2cfQ
    

    この場合、G1pnOMOxSjWYIX8xuR2cfQ が Confluent Platform のクラスター ID であり、ここで紹介する例では $CP_CLUSTER_ID と示します。

    必要に応じて、ローカルシェルあるいは zsh または bash プロファイルでこのための環境変数を設定し、後の手順でコマンドを直接カットアンドペーストできるようにすることもできます。

    export CP_CLUSTER_ID=<CP-CLUSTER-ID>
    

Confluent Cloud クラスターの起動

残りのコマンドを実行するために、パブリックインターネットに接続された 専用 Confluent Cloud クラスターが必要です。今回のデモ用に作成し、チュートリアルが終了したら削除することができます。このクラスターには料金が発生します。

  1. 統合 CLI または Confluent Cloud CLI を使用して Confluent Cloud にログオンします(「前提条件とコマンド例について」を参照)。

    この例では統合 CLI コマンドを使用します。

    confluent login
    
  2. 環境を表示し、使用する環境の ID を選択します。

    confluent environment list
    

    アスタリスクは、リスト内で現在選択されている環境を示します。以下のようにして異なる環境を選択できます。

    confluent environment use <environment-ID>
    
  3. Confluent Cloud の既存の専用クラスターを使用するか、Confluent Cloud Console から、または Confluent CLI から直接、次のように新しいクラスターを作成します。

    confluent kafka cluster create CLOUD-DEMO --type dedicated --cloud aws --region us-east-1 --cku 1 --availability single-zone
    

    出力は以下のようになります。

    It may take up to 5 minutes for the Kafka cluster to be ready.
    +--------------+---------------+
    | Id           | lkc-59oyn     |
    | Name         | MY-CLOUD-DEMO |
    | Type         | DEDICATED     |
    | Ingress      |            50 |
    | Egress       |           150 |
    | Storage      | Infinite      |
    | Provider     | aws           |
    | Availability | single-zone   |
    | Region       | us-east-1     |
    | Status       | PROVISIONING  |
    | Endpoint     |               |
    | ApiEndpoint  |               |
    | RestEndpoint |               |
    | ClusterSize  |             1 |
    +--------------+---------------+
    

    新しい Confluent Cloud クラスターを作成した場合は、そのクラスターがプロビジョニングされるまで待機する必要があります。これには通常数分かかりますが、長引く場合もあります。クラスターを使用できる状態になると、メールで通知が届きます。

  4. クラスターを表示します。

    confluent kafka cluster list
    

    アスタリスクは、現在選択されているクラスターを示します。以下のようにして異なるクラスターを選択できます。

    confluent kafka cluster use <CC-CLUSTER-ID>
    

    ちなみに

    クラスター ID を指定することで、現在選択されていないクラスターの情報を取得したり、複数のタイプのアクションを実行したりできます。たとえば、confluent kafka cluster describe <cluster-ID> のように指定します。

  5. このチュートリアルでは、専用クラスターのクラスター ID を $CC-CLUSTER-ID と示しています。

    必要に応じて、ローカルシェルあるいは zsh または bash プロファイルでこのための環境変数を設定し、後の手順でコマンドを直接カットアンドペーストできるようにすることもできます。

    export CC_CLUSTER_ID=<CC-CLUSTER-ID>
    

Confluent Platform クラスターへの情報の指定

これらのコマンドでは Confluent Platform CLI を使用します。

  1. 順序が見やすくなるように、パーティションが 1 つの Confluent Platform クラスターにトピックを作成します。

    kafka-topics --create --topic from-on-prem --partitions 1 --replication-factor 1 --bootstrap-server localhost:9092 --command-config $CONFLUENT_CONFIG/CP-command.config
    

    トピックが正常に作成されたことを示す確認メッセージが表示されます。

    Created topic from-on-prem.
    

    以下のとおり、既存のトピックのリストを取得できます。

    kafka-topics --list --bootstrap-server localhost:9092 --command-config $CONFLUENT_CONFIG/CP-command.config
    

    --describe オプションでトピックの詳細情報を表示します。

    kafka-topics --describe --topic from-on-prem --bootstrap-server localhost:9092 --command-config $CONFLUENT_CONFIG/CP-command.config
    
  2. 送信元クラスターの from-on-prem トピックに複数のメッセージを送信し、トピックにデータを入力します。

    seq 1 5 | kafka-console-producer --topic from-on-prem --bootstrap-server localhost:9092 --producer.config $CONFLUENT_CONFIG/CP-command.config
    

    このコマンドは出力なしで終了するはずです。

  3. 送信元クラスターのトピックから消費します。

    コンシューマーを実行して from-on-prem トピックからメッセージを消費します。

    kafka-console-consumer --topic from-on-prem --from-beginning --bootstrap-server localhost:9092 --consumer.config $CONFLUENT_CONFIG/CP-command.config
    

    トピックでメッセージが正しく消費されると、出力は以下のようになります。

    1
    2
    3
    4
    5
    

    Ctrl キーを押しながら C キーを押してプロンプトに戻ります。

Confluent Cloud クラスターの権限のセットアップ

Confluent Cloud で以下の操作を行います。

  1. Confluent Platform から Confluent Cloud へのトピックデータのミラーリングの送信先の役割を果たす Confluent Cloud クラスターのユーザー API キーを作成します。

    confluent api-key create --resource $CC_CLUSTER_ID
    
  2. 作成した API キーとシークレットを安全な場所に保存します。このチュートリアルではこれらを <CC-link-api-key> および <CC-link-api-secret> と示します。この API キーとシークレットは、Confluent Platform から Confluent Cloud へのリンクを作成するために使用する Confluent Cloud クラスターと関連付けられています。次の手順で、これらを構成ファイルに追加します。

    重要

    これを本稼働環境で設定する場合、ユーザーに関連付けられたキーではなくサービスアカウントの API キーを使用する必要があります。サービスアカウントで Confluent Cloud クラスターにアクセスするための権限のセットアップ方法 については、トピックデータの共有に関するチュートリアルを参照してください。送信元開始リンクの場合、サービスアカウントで必要な唯一の ACL は送信先クラスターの ALTER です(Cluster: Alter ACL)。クラスターリンクの ACL についての詳細は、「Security for Cluster Linking on Confluent Platform」および「Security for Cluster Linking on Confluent Cloud」を参照してください。

オンプレミスから Confluent Cloud へのデータのミラーリング

以下のセクションでは、Confluent Platform から Confluent Cloud へのリンクのセットアップとテストの方法について説明しています。

トピックの作成と Confluent Cloud へのデータのミラーリング

注釈

  • When using Schema Linking: To use a mirror topic that has a schema with Confluent Cloud Connect, ksqlDB, broker-side schema validation, or the topic viewer, make sure that make sure that Schema Linking puts the schema in the default context of the Confluent Cloud Schema Registry. To learn more, see How Schemas work with Mirror Topics.
  • 最初のコマンドを実行する前に、Confluent Cloud にログインした状態であること、また適切な環境とクラスターが選択されていることを確認します。これらのリソースをリスト表示して選択するには、コマンド confluent kafka environment listconfluent kafka environment useconfluent kafka cluster listconfluent kafka cluster use を使用します。list コマンドの出力で、選択されている環境またはクラスターの横にはアスタリスクが付きます。リソースがまったく選択されていない場合(または不適切なリソースが選択されている場合)、コマンドは適切に機能しません。

Confluent Cloud にログインした状態で以下のタスクを実行します。

  1. ミラートピックを作成します。

    以下のコマンドで、クラスターリンク from-on-prem-link を使用して元の from-on-prem トピックのミラーを確立します。

    confluent kafka mirror create from-on-prem --link from-on-prem-link
    

    コマンドの出力は以下のようになります。

    Created mirror topic "from-on-prem".
    
    • ミラートピック名は元のトピック名と一致している必要があります。詳細については、「既知の制限」を参照してください。
    • ミラートピックでは、作成時にその送信元トピックへのリンクを指定する必要があります。これにより、ミラートピックはデータまたはメタデータの不一致がないクリーンな状態になります。
  2. リンクのミラートピックのリストを表示します。

    confluent kafka mirror list --cluster $CC_CLUSTER_ID
    

    出力は以下のようになります。

          Link Name     | Mirror Topic Name | Num Partition | Max Per Partition Mirror Lag | Source Topic Name | Mirror Status | Status Time Ms
    +-------------------+-------------------+---------------+------------------------------+-------------------+---------------+----------------+
      from-on-prem-link | from-on-prem      |             1 |                            0 | from-on-prem      | ACTIVE        |  1633640214250
    
  3. 送信先クラスターのミラートピックから消費して、そのミラートピックを検証します。

    引き続き Confluent Cloud で、コンシューマーを実行してミラートピックからのメッセージを消費し、前の手順で Confluent Platform トピックに生成したメッセージを消費します。

    confluent kafka topic consume from-on-prem --from-beginning
    

    出力は以下のようになります。

    1
    2
    3
    4
    5
    

    注釈

    コンシューマーを実行しようとしたときに "リソースの API キーが選択されていない" ことを示すエラーが発生した場合、コマンド confluent api-key use <CC-API-KEY> --resource $CC_CLUSTER_ID を実行して Confluent Cloud の送信先クラスターの <CC-API-KEY> を指定してからコンシューマーコマンドを再実行します。または、エラーメッセージで提供される CLI の指示に従います。

Confluent Cloud からオンプレミスへのデータのミラーリング

以下のセクションでは、Confluent Cloud から Confluent Platform へのリンクのセットアップとテストの方法について説明しています。

トピックの作成とオンプレミスへのデータのミラーリング

  1. Confluent Cloud の統合 Confluent CLI を使用して、cloud-topic というパーティションが 1 つのトピックを作成します。

    confluent kafka topic create cloud-topic --partitions 1
    
  2. Confluent Cloud の別のコマンドウィンドウでプロデューサーを起動し、cloud-topic にデータを送信します。

    confluent kafka topic produce cloud-topic --cluster $CC_CLUSTER_ID
    
    • プロデューサーが起動していることを確認します。このコマンドの出力は次のようになり、プロデューサーの準備が完了していることが示されます。

      $ confluent kafka topic produce cloud-topic --cluster lkc-1vgo6
      Starting Kafka Producer. Use Ctrl-C or Ctrl-D to exit.
      
    • 任意の複数のエントリをプロデューサーのウィンドウに入力します。エントリを入力するたびに Enter キーを押して送信してください。

      Riesling
      Pinot Blanc
      Verdejo
      
  3. コマンド kafka-mirrors --create --mirror-topic <topic-name> を使用して cloud-topic を Confluent Platform にミラーリングします。

    以下のコマンドで、クラスターリンク from-cloud-link を使用して元の cloud-topic のミラーを確立します。

    kafka-mirrors --create --mirror-topic cloud-topic --link from-cloud-link --bootstrap-server localhost:9092 --command-config $CONFLUENT_CONFIG/CP-command.config
    

    ミラートピックが作成されたことを示す次の確認メッセージが表示されます。

    Created topic cloud-topic.
    
  4. Confluent Platform で from-cloud-link に対して kafka-mirrors --describe を実行し、ミラートピックのステータスをチェックします。

    kafka-mirrors --describe --link from-cloud-link --bootstrap-server localhost:9092 --command-config $CONFLUENT_CONFIG/CP-command.config
    

    指定したリンクのミラートピックすべてのステータスが出力に表示されます。

    Topic: cloud-topic        LinkName: from-cloud-link       LinkId: b1a56076-4d6f-45e0-9013-ff305abd0e54    MirrorTopic: cloud-topic        State: ACTIVE   StateTime: 2021-10-07 16:36:20
              Partition: 0    State: ACTIVE   DestLogEndOffset: 2     LastFetchSourceHighWatermark: 2 Lag: 0  TimeSinceLastFetchMs: 384566
    
  5. on-prem のミラートピックからデータを消費します。

    kafka-console-consumer --topic cloud-topic --from-beginning --bootstrap-server localhost:9092 --consumer.config $CONFLUENT_CONFIG/CP-command.config
    

    出力は、手順 8 で Confluent Cloud のプロデューサーに入力したエントリと一致するはずです。

    ../../_images/cluster-link-hybrid-produce-consume.png
  6. クラスターリンクの構成を表示します。

    kafka-configs --describe --cluster-link from-cloud-link --bootstrap-server localhost:9092 --command-config $CONFLUENT_CONFIG/CP-command.config
    

    このコマンドの出力は構成のリストであり、一部が以下の例に示されています。

    Dynamic configs for cluster-link from-cloud-link are:
    metadata.max.age.ms=300000 sensitive=false synonyms={}
    reconnect.backoff.max.ms=1000 sensitive=false synonyms={}
    auto.create.mirror.topics.filters= sensitive=false synonyms={}
    ssl.engine.factory.class=null sensitive=false synonyms={}
    sasl.kerberos.ticket.renew.window.factor=0.8 sensitive=false synonyms={}
    reconnect.backoff.ms=50 sensitive=false synonyms={}
    consumer.offset.sync.ms=30000 sensitive=false synonyms={}
    
    ...
    
    link.mode=DESTINATION sensitive=false synonyms={}
    security.protocol=SASL_SSL sensitive=false synonyms={}
    acl.sync.ms=5000 sensitive=false synonyms={}
    ssl.keymanager.algorithm=SunX509 sensitive=false synonyms={}
    sasl.login.callback.handler.class=null sensitive=false synonyms={}
    replica.fetch.max.bytes=5242880 sensitive=false synonyms={}
    availability.check.consecutive.failure.threshold=5 sensitive=false synonyms={}
    sasl.login.refresh.window.jitter=0.05 sensitive=false synonyms={}
    

強制停止

コンシューマーとプロデューサーの停止

それぞれのコマンドウィンドウで Ctl+C キーを押して、コンシューマーとプロデューサーを停止します("+" はキーを同時に押すことを意味します)。

ミラートピックのプロモート

ミラートピックを通常のトピックにプロモートします。

  1. Confluent Cloud で、from-on-prem というミラートピックをプロモートします。

    confluent kafka mirror promote from-on-prem --link from-on-prem-link --cluster $CC_CLUSTER_ID
    

    出力は以下のようになります。

     Mirror Topic Name | Partition | Partition Mirror Lag | Error Message | Error Code | Last Source Fetch Offset
    +-------------------+-----------+----------------------+---------------+------------+--------------------------+
     from-on-prem      |         0 |                    0 |               |            |                        9
    

    ミラーリングが停止したことを確認する場合は、前述のコマンドを再実行します。エラーメッセージの列に Topic 'from-on-prem' has already stopped its mirror from 'from-on-prem-link' というメッセージが表示されるはずです。

  2. Confluent Platform で、cloud-topic というミラートピックをプロモートします。

    kafka-mirrors --promote --topics cloud-topic --bootstrap-server localhost:9092 --command-config $CONFLUENT_CONFIG/CP-command.config
    

    出力は以下のようになります。

    Calculating max offset and ms lag for mirror topics: [cloud-topic]
    Finished calculating max offset lag and max lag ms for mirror topics: [cloud-topic]
    Request for stopping topic cloud-topics mirror was successfully scheduled. Please use the describe command with the --pending-stopped-only option to monitor progress.
    

    このコマンドを再試行すると、Topic 'cloud-topic' has already stopped its mirror 'from-cloud-link' というエラーを受け取ります。

送信元トピックとミラートピックの削除

ちなみに

  • Confluent Cloud のトピックのリストを表示する場合: confluent kafka topic list
  • Confluent Platform のトピックのリストを表示する場合: kafka-topics --list --bootstrap-server localhost:9092 --command-config $CONFLUENT_CONFIG/CP-command.config
  1. Confluent Cloud のトピックを削除します。

    confluent kafka topic delete cloud-topic
    
    confluent kafka topic delete from-on-prem
    
  2. Confluent Platform からトピックを削除します。

    kafka-topics --delete --topic cloud-topic --bootstrap-server localhost:9092 --command-config $CONFLUENT_CONFIG/CP-command.config
    
    kafka-topics --delete --topic from-on-prem --bootstrap-server localhost:9092 --command-config $CONFLUENT_CONFIG/CP-command.config
    

Confluent Platform と ZooKeeper の停止

他のすべてのコンポーネントを、起動したときと逆の順序で、それぞれのコマンドウィンドウで Ctl+C キーを押して停止します。

  1. 最初に Kafka ブローカーを停止します。
  2. Kafka ブローカーが完全にシャットダウンされ、プロンプトが戻ったら、別のウィンドウに移動して関連する ZooKeeper を停止します。

構成の概要

ファイル 目的
zookeeper-clusterlinking.properties ZooKeeper の起動に使用される構成ファイル、「Kafka および ZooKeeper ファイルの構成」の説明を参照
server-clusterlinking.properties Confluent Platform クラスターの起動に使用される構成ファイル、「Kafka および ZooKeeper ファイルの構成」の説明を参照
CP-command.config
  • Confluent Platform クラスターの起動」の手順 3 で作成
  • Confluent Platform クラスターに対してコマンドを実行するときに認証する admin の認証情報を含む
  • Confluent Platform のコマンドで flag --command-config を付けて使用
clusterlink-hybrid-dst.config
clusterlink-CP-src.config
  • Confluent Platform から Confluent Cloud へのリンクの作成」の手順 3 で作成
  • 送信元の役割を果たす Confluent Platform クラスターのリンク構成を指定、Confluent Platform が Confluent Cloud に対して認証を行うための認証情報と接続情報を含む
  • Confluent Platform 側にクラスターリンク from-on-prem-link を作成するために使用
clusterlink-cloud-to-CP.config