重要
このページの日本語コンテンツは古くなっている可能性があります。最新の英語版コンテンツをご覧になるには、こちらをクリックしてください。
ハイブリッドクラウドとクラウドへのブリッジ¶
Looking for Confluent Platform Cluster Linking docs? You are currently viewing Confluent Cloud documentation. If you are looking for Confluent Platform docs, check out Cluster Linking on Confluent Platform.
はじめに¶
このチュートリアルでは、Confluent Platform クラスターと Confluent Cloud クラスターをリンクするハイブリッドのユースケースにおける Cluster Linking の実践的な使用方法について説明します。
次のように、データが両方向に移動するデプロイを作成します。
Confluent Cloud から Confluent Platform へ
Confluent Platform から Confluent Cloud へ
This direction will require a “source initiated” cluster link; a new feature introduced in Confluent Platform 7.1.0.
どちらの場合も、Confluent Platform ブローカーが Confluent Cloud ブローカーへの接続を開始します。このため、Confluent Cloud が Confluent Platform ブローカーに接続できるようにファイアウォールを開く必要はありません。
プロセスの中で、Confluent Platform コマンドと Confluent Cloud コマンドで使用するさまざまなセキュリティ認証情報と構成ファイルを作成します。これらをまとめたリストについては、このチュートリアルの最後にある「構成の概要」を参照してください。
Cluster Linking を使用できるクラスターについては、「サポートされるクラスタータイプ」を参照してください。
チュートリアルの内容¶
このチュートリアルでは、2 つのクラスター(Confluent Platform と Confluent Cloud に 1 つずつ)を構成し、Confluent Cloud に対してファイアウォールを開くことなく、Cluster Linking を使用してクラスター間でトピックデータを双方向に共有します。
Confluent Platform のインストール¶
Download and extract Confluent Platform version 7.1.0.
チュートリアルの手順を続けるために、次の変数を設定する必要があります。
export CONFLUENT_HOME=<CP installation directory>
export CONFLUENT_CONFIG=$CONFLUENT_HOME/etc/kafka
ターミナルウィンドウを開いたときに常に実行されるように、これらの 2 つの行を .bashrc
または .bash-profile
に追加します。
前提条件とコマンド例について¶
注釈
As a general guideline (not just for this tutorial), any customer-owned firewall that allows the cluster link connection from source cluster brokers to destination cluster brokers must allow the TCP connection to persist in order for Cluster Linking to work.
- These instructions assume you have a local installation of Confluent Platform 7.1.0 or later, and Java 1.8 or 1.11 (needed for Confluent Platform). Install instructions for self-managed deployments are available in the documentation. If you are new to Confluent Platform, first work through the Quick Start for Apache Kafka using Confluent Platform (Local), and then return to this tutorial.
- 特に明記しない限り、以下の例は、Confluent Platform のプロパティファイルがデフォルトのインストール場所に存在することを前提としています。これにより、例からコマンドを簡単にコピーしてターミナルに直接貼り付けることができます。
- Confluent Platform のデフォルトのインストールでは、Confluent CLI と Cluster Linking コマンド は
$CONFLUENT_HOME/bin
にあり、プロパティファイルはディレクトリCONFLUENT_CONFIG
($CONFLUENT_HOME/etc/kafka/
)にあります。これらのコマンドにアクセスするには Confluent Platform を実行しておく必要があります。Confluent Platform を 構成 して 実行 すると、どのコマンドも引数を付けずに入力することでヘルプを表示できます(kafka-cluster-links
など)。 - このチュートリアルでは Confluent Cloud へのログインと Confluent Cloud CLI が必要です。詳細については、Confluent Cloud の Cluster Linking のクイックスタート の「Confluent Cloud の最新バージョンの取得」と、複数の CLI を同時実行する 方法に関する説明が記載されている「Confluent CLI v2 への移行」を参照してください(後者は特に Confluent Platform の 7.1 リリースへの対応にも役立ちます)。Confluent Cloud を初めて使用する場合は、最初にクイックスタートを参照してからこのチュートリアルに進んでください。
- このチュートリアルでは、Confluent Cloud で 専用クラスター を実行している必要があります。これについては、Confluent Cloud の料金が発生します。
ポートと構成のマッピング¶
このチュートリアルのデプロイ例では、以下のポートと機能の構成を使用し、サーバーが localhost
で実行されているものと仮定します。
Confluent Platform | |
---|---|
Kafka ブローカー | 9092 |
ZooKeeper | 2181 |
ちなみに
- これらは、このチュートリアルで使用するサンプルポートです。Cluster Linking ではこれらと同じポートを使用する必要はありません。必要に応じて変更することができます。
- これらのポートを他のプロセスで使用している場合は、他のプロセスを終了するか、チュートリアルの手順を変更して別のポートを使用してください。
Kafka および ZooKeeper ファイルの構成¶
$CONFLUENT_CONFIG
で、以下のファイルを構成して Confluent Platform クラスターをセットアップします。
$CONFLUENT_CONFIG/zookeeper.properties
をコピーして zookeeper-clusterlinking.properties
のベースとして使用します。
$CONFLUENT_CONFIG/server.properties
をコピーして server-clusterlinking.properties
のベースとして使用します。
ファイル | 構成 |
---|---|
zookeeper-clusterlinking.properties |
|
server-clusterlinking.properties | 以下を既存のファイルに追加する必要があります。
以下については既存の構成を変更するか、既存の構成を使用します。
|
注釈
- この例では、SSL で保護された 1 つの ZooKeeper と 1 つの Confluent Server ブローカーのみを構成します。ローカルマシンでのテストの場合はこれで問題ありませんが、本稼働環境ではフォールトトレランスと高可用性を目的として複数の Zookeeper とブローカーを複数のマシンに分散させ、すべてを認証と暗号化で保護する必要があります。
- この例では、ブローカーが 1 つのテスト用セットアップであるため、重要な内部トピックのレプリケーション係数は
1
に設定されています。本稼働環境のデプロイでは、これらのトピックのレプリケーション係数を1
に設定しないでください。一般に、レプリケーション係数はブローカーの数に応じて 3 以上にする必要があります。 - パラメーター
password.encoder.secret
は、クラスターリンクに保管される認証情報を暗号化するために必要です。
Confluent Platform クラスターの起動¶
別々のコマンドウィンドウで以下のコマンドを実行します。
ZooKeeper コマンドと Confluent Server コマンドはユーザーが停止するまで "完了" しないため、アプリケーションの実行中はこれらのウィンドウを開いておく必要があります。
別のコマンドウィンドウをメインのターミナルとして使用して、完了させるコマンドを実行します(これらのコマンドの例としては、kafka-configs、kafka-topics、kafka-cluster-links などがあります。状況によっては kafka-console-producer と kafka-console-consumer も含まれますが、これら 2 つのコマンドは継続して実行しておく場合があります)。
新しいコマンドウィンドウで Confluent Platform クラスターの ZooKeeper サーバーを起動します。
zookeeper-server-start $CONFLUENT_CONFIG/zookeeper-clusterlinking.properties
2 種類のユーザーについてクラスターに SASL SCRAM 認証情報を作成するコマンドを実行します。一方は Kafka クラスターが使用し、もう一方はクラスターに対してコマンドを実行するためのものです。
次のコマンドを実行して、Kafka クラスター自体が使用する "kafka" というユーザーについてクラスターに認証情報を作成します。
kafka-configs --zookeeper localhost:2181 --alter --add-config \ 'SCRAM-SHA-512=[iterations=8192,password=kafka-secret]' \ --entity-type users --entity-name kafka
次のコマンドを実行して、クラスターに対してコマンドを実行するために使用する "admin" というユーザーについてクラスターに認証情報を作成します。
kafka-configs --zookeeper localhost:2181 --alter --add-config \ 'SCRAM-SHA-512=[iterations=8192,password=admin-secret]' \ --entity-type users --entity-name admin
Confluent Platform クラスターに対してコマンドを実行するときに認証する admin の認証情報を含むファイルを作成します。
テキストエディターを開き、
$CONFLUENT_CONFIG/CP-command.config
という名前のファイルを作成して以下の内容をコピーアンドペーストします。sasl.mechanism=SCRAM-SHA-512 security.protocol=SASL_PLAINTEXT sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \ username="admin" \ password="admin-secret";
新しいコマンドウィンドウで送信元クラスターの Confluent Server ブローカーを起動し、コマンドの一部として認証情報を渡します。
kafka-server-start $CONFLUENT_CONFIG/server-clusterlinking.properties
Confluent Platform のクラスター ID を取得します。
kafka-cluster cluster-id --bootstrap-server localhost:9092 --config $CONFLUENT_CONFIG/CP-command.config
出力は以下のようになります。
Cluster ID: G1pnOMOxSjWYIX8xuR2cfQ
この場合、
G1pnOMOxSjWYIX8xuR2cfQ
が Confluent Platform のクラスター ID であり、ここで紹介する例では$CP_CLUSTER_ID
と示します。必要に応じて、ローカルシェルあるいは zsh または bash プロファイルでこのための環境変数を設定しておくと、後の手順でコマンドを直接カットアンドペーストすることもできます。
export CP_CLUSTER_ID=<CP-CLUSTER-ID>
Confluent Cloud クラスターの起動¶
残りのコマンドを実行するためには、パブリックインターネットに接続された 専用 Confluent Cloud クラスターが必要です。今回のデモ用に作成し、チュートリアルが終了したら削除することもできます。このクラスターには料金が発生します。
統合 CLI または Confluent Cloud CLI を使用して Confluent Cloud にログオンします(「前提条件とコマンド例について」を参照)。
この例では統合 CLI コマンドを使用します。
confluent login
環境を表示し、使用する環境の ID を選択します。
confluent environment list
アスタリスクは、リスト内で現在選択されている環境を示します。以下のようにして異なる環境を選択できます。
confluent environment use <environment-ID>
Confluent Cloud の既存の専用クラスターを使用するか、Confluent Cloud Console から、または Confluent CLI から直接、次のように新しいクラスターを作成します。
confluent kafka cluster create CLOUD-DEMO --type dedicated --cloud aws --region us-east-1 --cku 1 --availability single-zone
出力は以下のようになります。
It may take up to 5 minutes for the Kafka cluster to be ready. +--------------+---------------+ | Id | lkc-59oyn | | Name | MY-CLOUD-DEMO | | Type | DEDICATED | | Ingress | 50 | | Egress | 150 | | Storage | Infinite | | Provider | aws | | Availability | single-zone | | Region | us-east-1 | | Status | PROVISIONING | | Endpoint | | | ApiEndpoint | | | RestEndpoint | | | ClusterSize | 1 | +--------------+---------------+
新しい Confluent Cloud クラスターを作成した場合は、そのクラスターがプロビジョニングされるまで待機する必要があります。これには通常数分かかりますが、長引く場合もあります。クラスターを使用できる状態になると、メールで通知が届きます。
クラスターを表示します。
confluent kafka cluster list
アスタリスクは、現在選択されているクラスターを示します。以下のようにして別のクラスターを選択できます。
confluent kafka cluster use <CC-CLUSTER-ID>
ちなみに
クラスター ID を指定することで、現在選択されていないクラスターについて情報を取得したり、複数のタイプのアクションを実行したりできます。たとえば、
confluent kafka cluster describe <cluster-ID>
のように指定します。このチュートリアルでは、専用クラスターのクラスター ID を
$CC-CLUSTER-ID
と示しています。必要に応じて、ローカルシェルあるいは zsh または bash プロファイルでこのための環境変数を設定しておくと、後の手順でコマンドを直接カットアンドペーストすることもできます。
export CC_CLUSTER_ID=<CC-CLUSTER-ID>
Confluent Platform クラスターへの情報の入力¶
これらのコマンドでは Confluent Platform CLI を使用します。
順序が確認しやすいように、1 つのパーティションのみを含む Confluent Platform クラスターにトピックを作成します。
kafka-topics --create --topic from-on-prem --partitions 1 --replication-factor 1 --bootstrap-server localhost:9092 --command-config $CONFLUENT_CONFIG/CP-command.config
トピックが正常に作成されたことを示す確認メッセージが表示されます。
Created topic from-on-prem.
以下のとおり、既存のトピックのリストを取得できます。
kafka-topics --list --bootstrap-server localhost:9092 --command-config $CONFLUENT_CONFIG/CP-command.config
--describe
オプションでトピックの詳細情報を表示します。kafka-topics --describe --topic from-on-prem --bootstrap-server localhost:9092 --command-config $CONFLUENT_CONFIG/CP-command.config
送信元クラスターの from-on-prem トピックに複数のメッセージを送信し、トピックにデータを入力します。
seq 1 5 | kafka-console-producer --topic from-on-prem --bootstrap-server localhost:9092 --producer.config $CONFLUENT_CONFIG/CP-command.config
このコマンドは出力なしで終了します。
送信元クラスターのトピックから消費します。
コンシューマーを実行して
from-on-prem
トピックからメッセージを消費します。kafka-console-consumer --topic from-on-prem --from-beginning --bootstrap-server localhost:9092 --consumer.config $CONFLUENT_CONFIG/CP-command.config
トピックでメッセージが正しく消費されると、出力は以下のようになります。
1 2 3 4 5
Ctrl キーを押しながら C キーを押してプロンプトに戻ります。
Confluent Cloud クラスターの権限のセットアップ¶
Confluent Cloud で以下の手順を実行します。
Confluent Platform から Confluent Cloud へのトピックデータのミラーリングの送信先の役割を果たす Confluent Cloud クラスターのユーザー API キーを作成します。
confluent api-key create --resource $CC_CLUSTER_ID
作成した API キーとシークレットを安全な場所に保存します。このチュートリアルではこれらを
<CC-link-api-key>
および<CC-link-api-secret>
と示します。この API キーとシークレットは、Confluent Platform から Confluent Cloud へのリンクを作成するために使用する Confluent Cloud クラスターと関連付けられています。次の手順で、これらを構成ファイルに追加します。重要
これを本稼働環境で設定する場合、ユーザーに関連付けられたキーではなくサービスアカウントの API キーを使用する必要があります。サービスアカウントで Confluent Cloud クラスターにアクセスするための権限のセットアップ方法 については、トピックデータの共有に関するチュートリアルを参照してください。ソース開始リンクの場合、サービスアカウントで必要な唯一の ACL は送信先クラスターの ALTER です(Cluster: Alter ACL)。クラスターリンクの ACL についての詳細は、Confluent Platform の「Cluster Linking のセキュリティ」および「Confluent Cloud における Cluster Linking のセキュリティ」を参照してください。
オンプレミスから Confluent Cloud へのデータのミラーリング¶
以下のセクションでは、Confluent Platform から Confluent Cloud へのリンクのセットアップとテストの方法について説明しています。
Confluent Platform から Confluent Cloud へのリンクの作成¶
Confluent Platform から Confluent Cloud へデータをミラーリングするクラスターリンクをセットアップします。
ちなみに
This tutorial shows how to create a cluster link from Confluent Platform to Confluent Cloud. That said, you can use the same general configuration if the destination is Confluent Platform 7.0 or later; you would create cluster link in the same way.
This is a source initiated link, meaning that its connection will come from Confluent Platform and go to Confluent Cloud. As such, you won't have to open your on-premise firewall.
To create this source initiated link, you must create both halves of the cluster link: the first half on Confluent Cloud, the second half on Confluent Platform.
Confluent Cloud クラスターにクラスターリンクを作成します。
リンク構成ファイル
$CONFLUENT_CONFIG/clusterlink-hybrid-dst.config
を次のエントリで作成します。link.mode=DESTINATION connection.mode=INBOUND
構成
link.mode=DESTINATION
と構成connection.mode=INBOUND
の組み合わせから、このクラスターリンクがソース開始クラスターリンクの送信先側の半分であることがわかります。これらの 2 つの構成は一緒に使用する必要があります。注釈
If you want to add any configurations to your cluster link (such as consumer offset sync or auto-create mirror topics)
clusterlink-hybrid-dst.config
is the file where you would add them. Cluster link configurations are always set on the Destination cluster link (not the Source cluster link).Confluent Cloud に送信先クラスターリンクを作成します。
confluent kafka link create from-on-prem-link --cluster $CC_CLUSTER_ID \ --source-cluster-id $CP_CLUSTER_ID \ --config-file $CONFLUENT_CONFIG/clusterlink-hybrid-dst.config
このコマンド出力から、リンクが作成されたことがわかります。
Created cluster link "from-on-prem-link".
ちなみに
次のコマンドで、Confluent Cloud のクラスターリンクのリストと詳細を表示できます。
confluent kafka link list --cluster $CC_CLUSTER_ID
confluent kafka link describe <link-name> --cluster $CC_CLUSTER_ID
Confluent Platform のクラスターリンクのセキュリティ認証情報を作成します。このセキュリティ認証情報は、送信元クラスターからトピックデータとメタデータを読み取るために使用されます。
kafka-configs --bootstrap-server localhost:9092 --alter --add-config \ 'SCRAM-SHA-512=[iterations=8192,password=1LINK2RUL3TH3MALL]' \ --entity-type users --entity-name cp-to-cloud-link \ --command-config $CONFLUENT_CONFIG/CP-command.config
出力は以下のようになります。
Completed updating config for user cp-to-cloud-link.
Confluent Platform の送信元クラスターリンクのリンク構成ファイル
$CONFLUENT_CONFIG/clusterlink-CP-src.config
を次のエントリで作成します。link.mode=SOURCE connection.mode=OUTBOUND bootstrap.servers=<CC-BOOTSTRAP-SERVER> ssl.endpoint.identification.algorithm=https security.protocol=SASL_SSL sasl.mechanism=PLAIN sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username='<CC-link-api-key>' password='<CC-link-api-secret>'; local.listener.name=SASL_PLAINTEXT local.security.protocol=SASL_PLAINTEXT local.sasl.mechanism=SCRAM-SHA-512 local.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="cp-to-cloud-link" password="1LINK2RUL3TH3MALL";
- 構成
link.mode=SOURCE
と構成connection.mode=OUTBOUND
の組み合わせから、このクラスターリンクがソース開始クラスターリンクの送信元側の半分であることがわかります。これらの構成は一緒に使用する必要があります。 - 中央のセクションには、クラスターリンクの到達先である Confluent Cloud の送信先クラスターの
bootstrap.servers
と、クラスターリンクで使用する認証情報が記載されています。Cluster Linking から Confluent Cloud へのリンクでは TLS と SASL_PLAIN を使用します。これは、Confluent Cloud クラスターが着信リクエストを受け入れるために必要です。Confluent Cloud ブートストラップサーバーは、confluent kafka cluster describe $CC_CLUSTER_ID
の出力または Confluent Cloud コンソールのクラスター設定でエンドポイントとして示されます。CLI 出力のエンドポイントを使用する場合は、プロトコルのプレフィックスを削除してください。たとえば、エンドポイントがSASL_SSL://pkc-r2ymk.us-east-1.aws.confluent.cloud:9092
と表示されている場合、$CONFLUENT_CONFIG/clusterlink-CP-src.config
のエントリはbootstrap.servers=pkc-r2ymk.us-east-1.aws.confluent.cloud:9092
とする必要があります。 - 最後のセクションでは、各行に
local
というプレフィックスが付いています。ここには、データを読み取るために送信元クラスター( Confluent Platform )で使用するセキュリティ認証情報が記載されています。 - Confluent Platform の認証メカニズムとセキュリティプロトコルは、ブローカー で定義したものになる点に注意してください。Confluent Cloud については、後の手順で
clusterlink-cloud-to-CP.config
というファイルに定義するものになります。使用されている認証とセキュリティプロトコルの詳細については、「JAAS を使用した SASL による認証」を参照してください。
注意
Do not add any cluster link configurations (such as consumer offset sync or auto-create mirror topics) to
clusterlink-CP-src.config
. These configurations must be set on the Destination’s cluster link (not the Source cluster’s cluster link).- 構成
次のコマンドを使用して Confluent Platform に送信元クラスターリンクを作成します。前の手順の構成ファイルを指定してください。
kafka-cluster-links --bootstrap-server localhost:9092 \ --create --link from-on-prem-link \ --config-file $CONFLUENT_CONFIG/clusterlink-CP-src.config \ --cluster-id $CC_CLUSTER_ID --command-config $CONFLUENT_CONFIG/CP-command.config
出力は以下のようになります。
Cluster link 'from-on-prem-link' creation successfully completed.
ちなみに
次のコマンドで、Confluent Platform のクラスターリンクのリストを表示できます。
kafka-cluster-links --list --bootstrap-server localhost:9092 --command-config $CONFLUENT_CONFIG/CP-command.config
クラスターリンクを作成するコマンドでは、Confluent Platform クラスターと通信するために Confluent Platform の kafka-cluster-links を使用します。これは、Confluent Cloud クラスターと通信するために使用される統合 Confluent CLI とは異なります。
トピックの作成と Confluent Cloud へのデータのミラーリング¶
注釈
- When using Schema Linking: To use a mirror topic that has a schema with Confluent Cloud Connect, ksqlDB, broker-side schema validation, or the topic viewer, make sure that make sure that Schema Linking puts the schema in the default context of the Confluent Cloud スキーマレジストリ. To learn more, see How Schemas work with Mirror Topics.
- Before running the first command in the steps below, make sure that you are still logged in to Confluent Cloud and have the appropriate environment and cluster selected.
To list and select these resources, use the commands
confluent kafka environment list
,confluent kafka environment use
,confluent kafka cluster list
, andconfluent kafka cluster use
. A selected environment or cluster is indicated by an asterisk next to it in the output of list commands. The commands won't work properly if no resources are selected (or if the wrong ones are selected).
Confluent Cloud にログインした状態で以下のタスクを実行します。
ミラートピックを作成します。
以下のコマンドで、クラスターリンク
from-on-prem-link
を使用して元のfrom-on-prem
トピックのミラーを確立します。confluent kafka mirror create from-on-prem --link from-on-prem-link
コマンドの出力は以下のようになります。
Created mirror topic "from-on-prem".
- ミラートピック名は元のトピック名と一致している必要があります。詳細については、「既知の制限」を参照してください。
- ミラートピックでは、作成時にその送信元トピックへのリンクを指定する必要があります。これにより、ミラートピックはデータまたはメタデータの不一致がないクリーンな状態になります。
リンクのミラートピックのリストを表示します。
confluent kafka mirror list --cluster $CC_CLUSTER_ID
出力は以下のようになります。
Link Name | Mirror Topic Name | Num Partition | Max Per Partition Mirror Lag | Source Topic Name | Mirror Status | Status Time Ms +-------------------+-------------------+---------------+------------------------------+-------------------+---------------+----------------+ from-on-prem-link | from-on-prem | 1 | 0 | from-on-prem | ACTIVE | 1633640214250
送信先クラスターのミラートピックから消費して、そのミラートピックを検証します。
引き続き Confluent Cloud で、コンシューマーを実行してミラートピックからのメッセージを消費し、前の手順で Confluent Platform トピックに最初に生成したメッセージを消費します。
confluent kafka topic consume from-on-prem --from-beginning
出力は以下のようになります。
1 2 3 4 5
注釈
コンシューマーを実行しようとしたときに "リソースの API キーが選択されていない" ことを示すエラーが発生した場合、このコマンドを実行して Confluent Cloud の送信先クラスターの
<CC-API-KEY>
を指定してから、コンシューマーコマンドconfluent api-key use <CC-API-KEY> --resource $CC_CLUSTER_ID
を再実行します。または、エラーメッセージで提供される CLI の指示に従います。
Confluent Cloud からオンプレミスへのデータのミラーリング¶
以下のセクションでは、Confluent Cloud から Confluent Platform へのリンクのセットアップとテストの方法について説明しています。
Confluent Cloud から Confluent Platform へのリンクの作成¶
Confluent Cloud クラスターで、このクラスターリンク用に別のユーザー API キーを作成します。
confluent api-key create --resource $CC_CLUSTER_ID
以降の手順では、前の手順で送信先として使用したものと同じクラスターを送信元クラスターとして使用できます。このため、同じクラスター用に異なる API キーとシークレットを作成して、この新しい役割で使用します。
作成した API キーとシークレットを安全な場所に保存します。このチュートリアルではこれらを
<CC-src-api-key>
および<CC-src-api-secret>
と示します。次の手順で、これらを構成ファイルに追加します。重要
これを本稼働環境で設定する場合、ユーザーに関連付けられたキーではなくサービスアカウントの API キーを使用する必要があります。そのためには、クラスターリンクのサービスアカウントを作成し、そのサービスアカウントに必要な ACL を付与してから、そのサービスアカウントの API キーを作成します。クラスターリンクごとに API キーとサービスアカウントを作成することをお勧めします。サービスアカウントで Confluent Cloud クラスターにアクセスするための権限のセットアップ方法 については、トピックデータの共有に関するチュートリアルに記載されています。
confluent kafka cluster describe
を使用して Confluent Cloud クラスターのエンドポイント URL を取得します。confluent kafka cluster describe $CC_CLUSTER_ID
以降の手順では、このエンドポイント URL を
<CC-BOOTSTRAP-SERVER>
と示します。API キーとシークレットを、次の構成エントリと一緒にファイル
$CONFLUENT_CONFIG/clusterlink-cloud-to-CP.config
に保存します。Confluent Platform コマンドはこのファイルを使用して Confluent Cloud への認証を行います。<vi | emacs> $CONFLUENT_CONFIG/clusterlink-cloud-to-CP.config
このファイルに必要な構成エントリは次のとおりです。
bootstrap.servers=<CC-BOOTSTRAP-SERVER> security.protocol=SASL_SSL sasl.mechanism=PLAIN sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username='<CC-src-api-key>' password='<CC-src-api-secret>';
ちなみに
- このファイルを vi または Emacs エディターにコピーアンドペーストすると、各ステートメントが 1 つの行になります。行が途中で切れていないことを確認してください。
sasl.jaas.config=
で始まる最後の行もファイル内で 1 行で表示される必要があります(他の行と同様です)。Confluent Cloud ブートストラップサーバーの値、API キー、シークレットを指定してからファイルを保存します。 security.protocol
とsasl.mechanism
の値は、clusterlink-CP-src.config
で Confluent Cloud に定義した値になります。
- このファイルを vi または Emacs エディターにコピーアンドペーストすると、各ステートメントが 1 つの行になります。行が途中で切れていないことを確認してください。
Confluent Platform へのクラスターリンクを作成します。
この例の手順にそのまま従う場合はクラスターリンクの名前を
from-cloud-link
としますが、任意の名前を付けても構いません。指定したクラスターリンク名を使用してミラートピックを作成し、操作してください。クラスターリンクを作成した後に名前を変更することはできません。以下のコマンドでは、セキュアではない Confluent Platform クラスターにクラスターリンクを作成します。Confluent Platform クラスターにセキュリティをセットアップしてある場合は、
--command-config
を付けてセキュリティ認証情報をこのコマンドに渡す必要があります。詳細については、「クラスターリンクに対するプロパティの設定」を参照してください。kafka-cluster-links --bootstrap-server localhost:9092 \ --create --link from-cloud-link \ --config-file $CONFLUENT_CONFIG/clusterlink-cloud-to-CP.config \ --cluster-id $CC_CLUSTER_ID --command-config $CONFLUENT_CONFIG/CP-command.config
出力は次のようになります。
Cluster link 'from-cloud-link' creation successfully completed.
次のように
kafka-cluster-links --list
コマンドを使用してリンクが存在することを確認します。kafka-cluster-links --list --bootstrap-server localhost:9092 --command-config $CONFLUENT_CONFIG/CP-command.config
出力は次のようになります。前に作成した
from-on-prem-link
が新しいfrom-cloud-link
と一緒に表示されます。Link name: 'from-on-prem-link', link ID: '7eb4304e-b513-41d2-903e-147dea62a01c', remote cluster ID: 'lkc-1vgo6', local cluster ID: 'G1pnOMOxSjWYIX8xuR2cfQ' Link name: 'from-cloud-link', link ID: 'b1a56076-4d6f-45e0-9013-ff305abd0e54', remote cluster ID: 'lkc-1vgo6', local cluster ID: 'G1pnOMOxSjWYIX8xuR2cfQ'
トピックの作成とオンプレミスへのデータのミラーリング¶
Confluent Cloud で統合 Confluent CLI を使用して、パーティションが 1 つの
cloud-topic
というトピックを作成します。confluent kafka topic create cloud-topic --partitions 1
Confluent Cloud の別のコマンドウィンドウでプロデューサーを起動し、
cloud-topic
にデータを送信します。confluent kafka topic produce cloud-topic --cluster $CC_CLUSTER_ID
プロデューサーが起動していることを確認します。このコマンドの出力は次のようになり、プロデューサーの準備が完了していることが示されます。
$ confluent kafka topic produce cloud-topic --cluster lkc-1vgo6 Starting Kafka Producer. Use Ctrl-C or Ctrl-D to exit.
任意の複数のエントリをプロデューサーのウィンドウに入力します。エントリを入力するたびに Enter キーを押して送信してください。
Riesling Pinot Blanc Verdejo
コマンド
kafka-mirrors --create --mirror-topic <topic-name>
を使用してcloud-topic
を Confluent Platform にミラーリングします。以下のコマンドで、クラスターリンク
from-cloud-link
を使用して元のcloud-topic
のミラーを確立します。kafka-mirrors --create --mirror-topic cloud-topic --link from-cloud-link --bootstrap-server localhost:9092 --command-config $CONFLUENT_CONFIG/CP-command.config
ミラートピックが作成されたことを示す次の確認メッセージが表示されます。
Created topic cloud-topic.
Confluent Platform で
from-cloud-link
に対してkafka-mirrors --describe
を実行し、ミラートピックのステータスを確認します。kafka-mirrors --describe --link from-cloud-link --bootstrap-server localhost:9092 --command-config $CONFLUENT_CONFIG/CP-command.config
指定したリンクのミラートピックすべてのステータスが出力に表示されます。
Topic: cloud-topic LinkName: from-cloud-link LinkId: b1a56076-4d6f-45e0-9013-ff305abd0e54 MirrorTopic: cloud-topic State: ACTIVE StateTime: 2021-10-07 16:36:20 Partition: 0 State: ACTIVE DestLogEndOffset: 2 LastFetchSourceHighWatermark: 2 Lag: 0 TimeSinceLastFetchMs: 384566
on-prem のミラートピックからデータを消費します。
kafka-console-consumer --topic cloud-topic --from-beginning --bootstrap-server localhost:9092 --consumer.config $CONFLUENT_CONFIG/CP-command.config
出力は、ステップ 8 で Confluent Cloud のプロデューサーに入力したエントリと一致するはずです。
クラスターリンクの構成を表示します。
kafka-configs --describe --cluster-link from-cloud-link --bootstrap-server localhost:9092 --command-config $CONFLUENT_CONFIG/CP-command.config
このコマンドの出力は構成のリストであり、一部を以下の例に示しています。
Dynamic configs for cluster-link from-cloud-link are: metadata.max.age.ms=300000 sensitive=false synonyms={} reconnect.backoff.max.ms=1000 sensitive=false synonyms={} auto.create.mirror.topics.filters= sensitive=false synonyms={} ssl.engine.factory.class=null sensitive=false synonyms={} sasl.kerberos.ticket.renew.window.factor=0.8 sensitive=false synonyms={} reconnect.backoff.ms=50 sensitive=false synonyms={} consumer.offset.sync.ms=30000 sensitive=false synonyms={} ... link.mode=DESTINATION sensitive=false synonyms={} security.protocol=SASL_SSL sensitive=false synonyms={} acl.sync.ms=5000 sensitive=false synonyms={} ssl.keymanager.algorithm=SunX509 sensitive=false synonyms={} sasl.login.callback.handler.class=null sensitive=false synonyms={} replica.fetch.max.bytes=5242880 sensitive=false synonyms={} availability.check.consecutive.failure.threshold=5 sensitive=false synonyms={} sasl.login.refresh.window.jitter=0.05 sensitive=false synonyms={}
強制停止¶
コンシューマーとプロデューサーの停止¶
それぞれのコマンドウィンドウで Ctl+C
キーを押して、コンシューマーとプロデューサーを停止します。
ミラートピックのプロモート¶
ミラートピックを通常のトピックにプロモートします。
Confluent Cloud で、
from-on-prem
というミラートピックをプロモートします。confluent kafka mirror promote from-on-prem --link from-on-prem-link --cluster $CC_CLUSTER_ID
出力は以下のようになります。
Mirror Topic Name | Partition | Partition Mirror Lag | Error Message | Error Code | Last Source Fetch Offset +-------------------+-----------+----------------------+---------------+------------+--------------------------+ from-on-prem | 0 | 0 | | | 9
ミラーリングが停止したことを確認する場合は、前述のコマンドを再実行します。エラーメッセージの列に
Topic 'from-on-prem' has already stopped its mirror from 'from-on-prem-link'
というメッセージが表示されます。Confluent Platform で、
cloud-topic
というミラートピックをプロモートします。kafka-mirrors --promote --topics cloud-topic --bootstrap-server localhost:9092 --command-config $CONFLUENT_CONFIG/CP-command.config
出力は以下のようになります。
Calculating max offset and ms lag for mirror topics: [cloud-topic] Finished calculating max offset lag and max lag ms for mirror topics: [cloud-topic] Request for stopping topic cloud-topics mirror was successfully scheduled. Please use the describe command with the --pending-stopped-only option to monitor progress.
このコマンドを再試行すると、
Topic 'cloud-topic' has already stopped its mirror 'from-cloud-link'
というエラーを受け取ります。
送信元トピックとミラートピックの削除¶
ちなみに
- Confluent Cloud のトピックのリストを表示する場合:
confluent kafka topic list
- Confluent Platform のトピックのリストを表示する場合:
kafka-topics --list --bootstrap-server localhost:9092 --command-config $CONFLUENT_CONFIG/CP-command.config
Confluent Cloud のトピックを削除します。
confluent kafka topic delete cloud-topic
confluent kafka topic delete from-on-prem
Confluent Platform からトピックを削除します。
kafka-topics --delete --topic cloud-topic --bootstrap-server localhost:9092 --command-config $CONFLUENT_CONFIG/CP-command.config
kafka-topics --delete --topic from-on-prem --bootstrap-server localhost:9092 --command-config $CONFLUENT_CONFIG/CP-command.config
クラスターリンクの削除¶
Confluent Platform のクラスターリンクを削除します。
Confluent Platform のクラスターリンクのリストを表示します。
kafka-cluster-links --list --bootstrap-server localhost:9092 --command-config $CONFLUENT_CONFIG/CP-command.config
2 つのクラスターリンクが表示されます。1 つはソース開始リンクに必要だったもので、もう 1 つは Confluent Cloud のデータの送信先としての役割を果たすものです。
Link name: 'from-on-prem-link', link ID: '7eb4304e-b513-41d2-903e-147dea62a01c', remote cluster ID: 'lkc-1vgo6' local cluster ID: ', local cluster ID: 'G1pnOMOxSjWYIX8xuR2cfQ'' remote cluster available: 'true' Link name: 'from-cloud-link', link ID: 'b1a56076-4d6f-45e0-9013-ff305abd0e54', remote cluster ID: 'lkc-1vgo6' local cluster ID: ', local cluster ID: 'G1pnOMOxSjWYIX8xuR2cfQ'' remote cluster available: 'true'
kafka-cluster-links --delete <link-name>
を使用して Confluent Platform のクラスターリンクを削除します。kafka-cluster-links --delete --link from-on-prem-link --bootstrap-server localhost:9092 --command-config $CONFLUENT_CONFIG/CP-command.config
kafka-cluster-links --delete --link from-cloud-link --bootstrap-server localhost:9092 --command-config $CONFLUENT_CONFIG/CP-command.config
各コマンドの出力で、リンクが削除されたことを示す確認メッセージが表示されます。
Confluent Cloud のクラスターリンクを削除します。
Confluent Cloud のクラスターリンクのリストを表示します。
confluent kafka link list
出力は以下のようになります。
Link Name | Source Cluster Id +-------------------+------------------------+ from-on-prem-link | G1pnOMOxSjWYIX8xuR2cfQ
confluent kafka link delete <link-name>
を使用して Confluent Cloud のクラスターリンクを削除します。confluent kafka link delete from-on-prem-link
リンクが削除されたことを示す確認メッセージが表示されます。
Confluent Platform と ZooKeeper の停止¶
他のすべてのコンポーネントを起動したときと逆の順序で、それぞれのコマンドウィンドウで Ctl+C
キーを押して停止します。
- 最初に Kafka ブローカーを停止します。
- Kafka ブローカーが完全にシャットダウンしてプロンプトに戻ったら、別のウィンドウに移動して関連する ZooKeeper を停止します。
構成の概要¶
ファイル | 目的 |
---|---|
zookeeper-clusterlinking.properties |
ZooKeeper の起動に使用される構成ファイル(「Kafka および ZooKeeper ファイルの構成」の説明を参照) |
server-clusterlinking.properties |
Confluent Platform クラスターの起動に使用される構成ファイル(「Kafka および ZooKeeper ファイルの構成」の説明を参照) |
CP-command.config |
|
clusterlink-hybrid-dst.config |
|
clusterlink-CP-src.config |
|
clusterlink-cloud-to-CP.config |
|
おすすめのリソース¶
- ブログ記事: The Link To Cloud: How to Build a Seamless and Secure Hybrid Data Bridge with Cluster Linking
- Cluster Linking のクイックスタート(Confluent Cloud)
- クラスターリンクのセキュリティに関する考慮事項(Confluent Cloud)
- Cluster Linking セキュリティ(Confluent Platform)
- クラスター、リージョン、クラウド間でのデータの共有(Confluent Cloud) (クラスターリンクのサービスアカウントのセットアップ方法や本稼働環境でのデプロイに関するベストプラクティスなどについて詳しく説明するチュートリアル)
- Cluster Linking の構成、コマンド、管理(Confluent Cloud)
- チュートリアル: トピックデータ共有での Cluster Linking の使用(Confluent Platform)
- Cluster Linking のコマンドリファレンス(Confluent Platform)