オンプレミス Kafka からクラウドへの移行¶
この Confluent Cloud サンプルでは、ハイブリッド Kafka クラスターを紹介します。一方のクラスターは、ローカルで動作する自己管理型 Kafka クラスターで、他方は Confluent Cloud クラスターです。このユースケースは、オンプレミスからクラウドへの移行における "クラウドへのブリッジ" となります。

概要¶
このサンプルの主なコンポーネントは、次のとおりです。
- 2 つの Kafka クラスター。一方のクラスターは、ローカルで動作する自己管理型 Kafka クラスターで、他方は Confluent Cloud クラスターです。
- Confluent Control Center: デプロイを管理し、モニタリングします。これを使用して、トピックの検査、スキーマの表示、ksqlDB クエリの表示と作成、ストリームのモニタリングなどを行います。
- ksqlDB: Confluent Cloud で、入力トピックの "users" および "pageviews" に対するクエリを実行する Confluent Cloud ksqlDB。
- 2 つの Kafka Connect クラスター: 一方のクラスターは、ローカルの自己管理型クラスターに接続し、他方は Confluent Cloud クラスターに接続します。両方の Connect ワーカープロセス自体はローカルで実行されます。
- "kafka-connect-datagen" インスタンス 1 つ: 模擬データを生成してトピック "pageview"s をローカルで事前に取り込むソースコネクター
- "kafka-connect-datagen" インスタンス 1 つ: 模擬データを生成して Confluent Cloud クラスターでトピック "users" を事前に取り込むソースコネクター
- Confluent Replicator: ローカルクラスターから Confluent Cloud クラスターにトピック "pageviews" をコピーします。
- Confluent Schema Registry: このサンプルは、Confluent Cloud Schema Registry を使って実行され、Kafka データは Avro フォーマットで書き込まれます。
注釈
これはサンプル環境であり、多くのサービスが 1 台のホストで実行されています。このサンプルは本稼働環境で実行しないでください。また、Confluent CLI は本稼働環境で使用しないでください。このサンプルは、Confluent Platform と Confluent Cloud のデモを容易に行うためにのみ用意されています。
前提条件¶
- 初期化された Confluent Cloud クラスター
- ローカルでインストールされた Confluent Cloud CLI (v1.7.0 以降)
- ローカルインストールを使用している場合は、Confluent Platform を ダウンロード (Docker には必要ではありません)
- Confluent Platform は、さまざまなオペレーティングシステムおよびソフトウェアバージョンでサポートされています(詳細については「サポートされているバージョンおよび相互運用性」を参照)。このサンプルは、以下に説明する特定の構成で検証されています。この例の Windows での実行は正式にサポートされていませんが、GitHub のサンプルコードを変更して symlink
.env
を config.env の内容で置き換えると、Windows でも動作する可能性があります。- macOS 10.15.3
- Confluent Platform 6.0.6
- Java 11.0.6 2020-01-14 LTS
- bash バージョン 3.2.57
- jq 1.6
- (Docker ベースのサンプル)Docker バージョン 19.03.8
- (Docker ベースのサンプル)Docker Compose docker-compose バージョン 1.25.4
チュートリアルの実行コスト¶
注意¶
Confluent Cloud のすべてのサンプルでは、課金される可能性のある実際の Confluent Cloud リソースを使用しています。サンプルで、新しい Confluent Cloud 環境、Kafka クラスター、トピック、ACL、サービスアカウントに加えて、コネクターや ksqlDB アプリケーションのように時間で課金されるリソースを作成する場合があります。想定外の課金を避けるために、慎重に リソースのコストを確認 してから開始してください。Confluent Cloud のサンプルの実行を終了したら、サービスへの時間単位の課金を回避するためにすべての Confluent Cloud リソースを破棄し、リソースが削除されたことを確認します。
サンプルの起動¶
セットアップ¶
このサンプルでは、サンプルの実行に必要なリソースを備えた、新しい Confluent Cloud 環境を作成します。既に説明したように、このサンプルでは実際の Confluent Cloud リソースを使用するため、課金が発生する可能性があります。
confluentinc/examples GitHub リポジトリのクローンを作成し、
6.0.6-post
ブランチをチェックアウトします。git clone https://github.com/confluentinc/examples cd examples git checkout 6.0.6-post
Confluent Cloud サンプルのディレクトリに変更します。
cd ccloud
Run¶
コマンド
ccloud login
で、Confluent Cloud のユーザー名とパスワードを使用して Confluent Cloud にログインします。--save
引数により、Confluent Cloud ユーザーログイン資格情報が保存されるか、ローカルのnetrc
ファイルに対してトークン(SSO の場合)が更新されます。ccloud login --save
1 つのコマンドを実行してサンプル全体を起動します。Docker Compose と Confluent Platform ローカルインストールのどちらを使用するかを選択できます。Confluent Cloud に新しいリソースが作成されます。完了するまでに数分間かかります。
# For Docker Compose ./start-docker.sh
# For Confluent Platform local ./start.sh
このスクリプトを実行すると、フルマネージド型リソースの新規 Confluent Cloud スタックが作成され、すべての接続情報、クラスター ID、資格情報を含むローカル構成ファイルが生成されます。これは、他のデモや自動化の際に便利です。このローカル構成ファイルを表示すると、
SERVICE ACCOUNT ID
がスクリプトによって自動生成されています。cat stack-configs/java-service-account-<SERVICE ACCOUNT ID>.config
出力は以下のようになります。
# ------------------------------ # Confluent Cloud connection information for demo purposes only # Do not use in production # ------------------------------ # ENVIRONMENT ID: <ENVIRONMENT ID> # SERVICE ACCOUNT ID: <SERVICE ACCOUNT ID> # KAFKA CLUSTER ID: <KAFKA CLUSTER ID> # SCHEMA REGISTRY CLUSTER ID: <SCHEMA REGISTRY CLUSTER ID> # KSQLDB APP ID: <KSQLDB APP ID> # ------------------------------ security.protocol=SASL_SSL sasl.mechanism=PLAIN bootstrap.servers=<BROKER ENDPOINT> sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username='<API KEY>' password='<API SECRET>'; basic.auth.credentials.source=USER_INFO schema.registry.basic.auth.user.info=<SR API KEY>:<SR API SECRET> schema.registry.url=https://<SR ENDPOINT> ksql.endpoint=<KSQLDB ENDPOINT> ksql.basic.auth.user.info=<KSQLDB API KEY>:<KSQLDB API SECRET>
http://confluent.cloud で Confluent Cloud UI にログインします。
Google Chrome を使用して、http://localhost:9021 の Confluent Control Center GUI に移動します。
プレイブック¶
Confluent Cloud CLI¶
クラスターでトピックのリストを表示できることを確認します。
ccloud kafka topic list
最初にこのサンプル用に作成したサービスアカウント
<SERVICE ACCOUNT ID>
に関連付けられた ACL を表示します。リソース名は、それぞれのクラスター、Kafka のトピック名、またはコンシューマーグループ名に対応しています。注意: 本稼働環境では、ワイルドカード*
は使用しません。これは、デモを目的として使用されています。ccloud kafka acl list --service-account <SERVICE ACCOUNT ID>
たとえば、サービスアカウント ID が 69995 の場合の出力は、次のようになります。
ServiceAccountId | Permission | Operation | Resource | Name | Type +------------------+------------+------------------+----------+-----------------------+----------+ User:69995 | ALLOW | WRITE | TOPIC | _confluent-monitoring | PREFIXED User:69995 | ALLOW | READ | TOPIC | _confluent-monitoring | PREFIXED User:69995 | ALLOW | READ | TOPIC | _confluent-command | PREFIXED User:69995 | ALLOW | WRITE | TOPIC | _confluent-command | PREFIXED User:69995 | ALLOW | READ | TOPIC | _confluent | PREFIXED User:69995 | ALLOW | CREATE | TOPIC | _confluent | PREFIXED User:69995 | ALLOW | WRITE | TOPIC | _confluent | PREFIXED User:69995 | ALLOW | CREATE | GROUP | * | LITERAL User:69995 | ALLOW | WRITE | GROUP | * | LITERAL User:69995 | ALLOW | READ | GROUP | * | LITERAL User:69995 | ALLOW | WRITE | TOPIC | connect-demo-statuses | PREFIXED User:69995 | ALLOW | READ | TOPIC | connect-demo-statuses | PREFIXED User:69995 | ALLOW | READ | TOPIC | connect-demo-offsets | PREFIXED User:69995 | ALLOW | WRITE | TOPIC | connect-demo-offsets | PREFIXED User:69995 | ALLOW | DESCRIBE | TOPIC | pageviews | LITERAL User:69995 | ALLOW | DESCRIBE_CONFIGS | TOPIC | pageviews | LITERAL User:69995 | ALLOW | CREATE | TOPIC | pageviews | LITERAL User:69995 | ALLOW | ALTER_CONFIGS | TOPIC | pageviews | LITERAL User:69995 | ALLOW | READ | TOPIC | pageviews | LITERAL User:69995 | ALLOW | WRITE | TOPIC | pageviews | LITERAL User:69995 | ALLOW | WRITE | TOPIC | users | LITERAL User:69995 | ALLOW | WRITE | TOPIC | * | LITERAL User:69995 | ALLOW | CREATE | TOPIC | * | LITERAL User:69995 | ALLOW | READ | TOPIC | * | LITERAL User:69995 | ALLOW | DESCRIBE | TOPIC | * | LITERAL User:69995 | ALLOW | DESCRIBE_CONFIGS | TOPIC | * | LITERAL User:69995 | ALLOW | READ | GROUP | connect-cloud | LITERAL User:69995 | ALLOW | DESCRIBE | CLUSTER | kafka-cluster | LITERAL User:69995 | ALLOW | CREATE | CLUSTER | kafka-cluster | LITERAL User:69995 | ALLOW | READ | GROUP | connect-replicator | LITERAL User:69995 | ALLOW | WRITE | TOPIC | connect-demo-configs | PREFIXED User:69995 | ALLOW | READ | TOPIC | connect-demo-configs | PREFIXED User:69995 | ALLOW | WRITE | GROUP | _confluent | PREFIXED User:69995 | ALLOW | READ | GROUP | _confluent | PREFIXED User:69995 | ALLOW | CREATE | GROUP | _confluent | PREFIXED
kafka-connect-datagen¶
サンプルで、このコード を確認してください。このコードにより、Kafka のトピック
pageviews
用のkafka-connect-datagen
コネクターがconnect-local
クラスターに自動的に読み込まれます。このクラスターは、後で Replicator によって Confluent Cloud にレプリケートされます(Replicator については後で詳しく説明します)。{ "name": "datagen-pageviews", "config": { "connector.class": "io.confluent.kafka.connect.datagen.DatagenConnector", "kafka.topic": "pageviews", "quickstart": "pageviews", "key.converter": "org.apache.kafka.connect.storage.StringConverter", "value.converter": "io.confluent.connect.avro.AvroConverter", "value.converter.basic.auth.credentials.source": "$BASIC_AUTH_CREDENTIALS_SOURCE", "value.converter.schema.registry.basic.auth.user.info": "$SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO", "value.converter.schema.registry.url": "$SCHEMA_REGISTRY_URL", "producer.interceptor.classes": "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor", "max.interval": 100, "iterations": 1000000000, "tasks.max": "1" } }
Confluent Control Center で、ローカルクラスターの
pageviews
トピック内のデータを表示します。サンプルで、このコード を確認してください。このコードにより、Kafka のトピック
users
のkafka-connect-datagen
コネクターがconnect-cloud
クラスターに自動的に読み込まれます。{ "name": "datagen-users", "config": { "connector.class": "io.confluent.kafka.connect.datagen.DatagenConnector", "kafka.topic": "users", "quickstart": "users", "key.converter": "org.apache.kafka.connect.storage.StringConverter", "value.converter": "io.confluent.connect.avro.AvroConverter", "value.converter.basic.auth.credentials.source": "$BASIC_AUTH_CREDENTIALS_SOURCE", "value.converter.schema.registry.basic.auth.user.info": "$SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO", "value.converter.schema.registry.url": "$SCHEMA_REGISTRY_URL", "producer.interceptor.classes": "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor", "max.interval": 1000, "iterations": 1000000000, "tasks.max": "1" } }
Confluent Control Center で、Confluent Cloud の
users
トピック内のデータを表示します。
ksqlDB¶
サンプルでは、適切な認証情報を使用して、このコード で REST API を使用して、:devx-examples:`statements.sql|ccloud/statements.sql`(ksqlDB バージョン 0.10.0 の場合)から Confluent Cloud ksqlDB クエリが作成されます。
done # Submit KSQL queries echo -e "\nSubmit KSQL queries\n" properties='"ksql.streams.auto.offset.reset":"earliest","ksql.streams.cache.max.bytes.buffering":"0"' while read ksqlCmd; do echo -e "\n$ksqlCmd\n" response=$(curl -X POST $KSQLDB_ENDPOINT/ksql \ -H "Content-Type: application/vnd.ksql.v1+json; charset=utf-8" \ -u $KSQLDB_BASIC_AUTH_USER_INFO \ --silent \ -d @<(cat <<EOF { "ksql": "$ksqlCmd", "streamsProperties": {$properties} } EOF )) echo $response if [[ ! "$response" =~ "SUCCESS" ]]; then echo -e "\nERROR: KSQL command '$ksqlCmd' did not include \"SUCCESS\" in the response. Please troubleshoot." exit 1
Confluent Cloud UI から ksqlDB アプリケーションフローを表示します。
いずれかのストリームをクリックして、そのメッセージとスキーマを表示します。
Confluent Replicator¶
Confluent Replicator は、ソース Kafka クラスターからデスティネーション Kafka クラスターにデータをコピーします。このサンプルでは、送信元クラスターは、ローカルインストールの自己管理型クラスターで、送信先クラスターは Confluent Cloud です。Replicator は、Kafka のトピック pageviews
をローカルインストールから Confluent Cloud にレプリケートしています。また、Confluent Monitoring インターセプターとともに動作して Confluent Control Center ストリームをモニタリングしています。
サンプルで、このコード を確認してください。このコードにより、Replicator コネクターが
connect-cloud
クラスターに自動的に読み込まれます。Replicator 構成ではconfluent.topic.replication.factor=3
が設定されていることに注意してください。これは、送信元クラスターではreplication.factor=1
であり、Confluent Cloud ではreplication.factor=3
が求めれらるため、必須の設定です。{ "name": "replicator", "config": { "connector.class": "io.confluent.connect.replicator.ReplicatorSourceConnector", "topic.whitelist": "pageviews", "key.converter": "io.confluent.connect.replicator.util.ByteArrayConverter", "value.converter": "io.confluent.connect.replicator.util.ByteArrayConverter", "dest.topic.replication.factor": 3, "dest.kafka.bootstrap.servers": "$BOOTSTRAP_SERVERS", "dest.kafka.security.protocol": "SASL_SSL", "dest.kafka.sasl.mechanism": "PLAIN", "dest.kafka.sasl.jaas.config": "$REPLICATOR_SASL_JAAS_CONFIG", "confluent.topic.replication.factor": 3, "src.kafka.bootstrap.servers": "kafka:29092", "src.consumer.group.id": "connect-replicator", "src.consumer.interceptor.classes": "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor", "src.consumer.confluent.monitoring.interceptor.bootstrap.servers": "$BOOTSTRAP_SERVERS", "src.consumer.confluent.monitoring.interceptor.security.protocol": "SASL_SSL", "src.consumer.confluent.monitoring.interceptor.sasl.mechanism": "PLAIN", "src.consumer.confluent.monitoring.interceptor.sasl.jaas.config": "$REPLICATOR_SASL_JAAS_CONFIG", "src.kafka.timestamps.topic.replication.factor": 1, "src.kafka.timestamps.producer.interceptor.classes": "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor", "src.kafka.timestamps.producer.confluent.monitoring.interceptor.bootstrap.servers": "$BOOTSTRAP_SERVERS", "src.kafka.timestamps.producer.confluent.monitoring.interceptor.security.protocol": "SASL_SSL", "src.kafka.timestamps.producer.confluent.monitoring.interceptor.sasl.mechanism": "PLAIN", "src.kafka.timestamps.producer.confluent.monitoring.interceptor.sasl.jaas.config": "$REPLICATOR_SASL_JAAS_CONFIG", "tasks.max": "1" } }
Confluent Control Center は、ローカルで実行され、ポート 8087 で動作している、
connect-cloud
という Connect クラスターを管理するよう構成されています。このクラスターは、kafka-connect-datagen
(Kafka のトピックusers
の)コネクターおよび Replicator コネクターを実行しています。Confluent Control Center UI で、Connect クラスターを表示します。replicator をクリックして Replicator 構成を表示します。ローカル Kafka クラスターのトピック
pageviews
が Confluent Cloud にレプリケートされていることがわかります。ローカルの
pageviews
トピックから Confluent Cloud のpageviews
トピックにメッセージがレプリケートされることを確認します。Confluent Cloud UI で、このトピックのメッセージを表示します。Confluent Cloud UI で、Replicator のコンシューマーラグを表示します。
Consumers
ビューでconnect-replicator
をクリックします。出力は以下のようになります。
Confluent Schema Registry¶
この例で使用するコネクターは、Confluent Cloud Schema Registry を利用して、Avro フォーマットのデータを自動的に書き込むように構成されています。
すべての Schema Registry サブジェクトを表示します。
# Confluent Cloud Schema Registry curl -u <SR API KEY>:<SR API SECRET> https://<SR ENDPOINT>/subjects
Confluent Cloud UI で、
pageviews
トピックのスキーマを表示します。トピック値は、Schema Registry で登録されたスキーマを使用しています(トピックキーは単なる文字列です)。オンプレミスの Schema Registry から Confluent Cloud Schema Registry にスキーマを移行する必要がある場合は、この 手順を追ったガイド に従ってください。スキーマの移行に実際に使用できる Replicator 構成の例については、ファイル submit_replicator_schema_migration_config.sh を参照してください。
Confluent Cloud の構成¶
Confluent Cloud に接続するために、Confluent Platform コンポーネントおよびクライアント用のテンプレートデルタ構成を表示します。
ls template_delta_configs/
Confluent Cloud 構成ファイルから自動的に派生する、コンポーネントごとのデルタ構成パラメーターを生成します。
./ccloud-generate-cp-configs.sh
このサンプルを start-docker.sh として実行した場合は、docker-compose.yml ファイル にあるすべての Confluent Platform コンポーネントの構成を使用できます。
# For Docker Compose cat docker-compose.yml
このサンプルを、Confluent CLI を使用する "start.s"h として実行した場合は、現在の Confluent CLI 一時ディレクトリにある、それぞれのコンポーネントサブフォルダーにすべての構成ファイルとログファイルが保存されます(アクティブに実行されているサンプルが必要です)。
# For Confluent Platform local install using Confluent CLI ls `confluent local current | tail -1`
サンプルのトラブルシューティング¶
Docker を使用して実行した場合は、docker-compose logs | grep ERROR を実行します。
ログファイルを表示するには、現在の Confluent CLI 一時ディレクトリを調べます(アクティブに動作しているサンプルが必要)。
# View all files ls `confluent local current | tail -1` # View log file per service, e.g. for the Kafka broker confluent local services kafka log
リソースの破棄¶
Confluent Cloud のすべてのサンプルでは、実際の Confluent Cloud リソースを使用しています。Confluent Cloud のサンプルの実行を終了したら、予定外の課金を回避するために、すべての Confluent Cloud リソースが破棄されていることを直接確認してください。
サンプルを停止し、Confluent Cloud 内のすべてのリソースとローカルコンポーネントを破棄します。スクリプトの引数として、ローカル構成ファイルにパスを渡し、
<SERVICE ACCOUNT ID>
に、デモの開始時に自動生成された ID を代入します。# For Docker Compose ./stop-docker.sh stack-configs/java-service-account-<SERVICE ACCOUNT ID>.config
# For Confluent Platform local install using Confluent CLI ./stop.sh stack-configs/java-service-account-<SERVICE ACCOUNT ID>.config
Confluent Cloud 内のリソースが破棄されたことを必ず確認してください。
その他のリソース¶
- その他の Confluent Cloud サンプルをお探しの場合は、Confluent Cloud のサンプル を参照してください。