Confluent CLI: Apache Kafka® のサンプルコマンド¶
このチュートリアルでは、Confluent CLI を使用して Apache Kafka® クラスターにメッセージを生成し、このクラスターからメッセージを消費します。
チュートリアルの実行後に、提供された ソースコード をリファレンスとして使用して、独自の Kafka クライアントアプリケーションを開発します。
注釈
Confluent CLI は、開発目的での使用のみを想定しており、本稼働環境には適していません。
前提条件¶
Kafka クラスター¶
- このチュートリアルは、どの環境の Kafka クラスターにも対応しています。
- Confluent Cloud 内
- ローカルホスト 上
- 任意のリモート Kafka クラスター
- Confluent Cloud で実行している場合は、API キーおよびシークレットで Confluent Cloud クラスターにアクセスできることが必要です。
- Confluent Cloud にサインアップし、プロモーションコード
C50INTEG
を使用した最初の 20 名のユーザーは、$50 相当を無料で使用できます(詳細)。 - Confluent Cloud で Kafka クラスター、資格情報、および ACL を自動的に作成する方法については、「Confluent Cloud 向け ccloud-stack ユーティリティ」を参照してください。
- Confluent Cloud にサインアップし、プロモーションコード
セットアップ¶
confluentinc/examples GitHub リポジトリのクローンを作成し、
6.0.6-post
ブランチをチェックアウトします。git clone https://github.com/confluentinc/examples cd examples git checkout 6.0.6-post
Confluent CLI のサンプルのディレクトリに変更します。
cd clients/cloud/confluent-cli/
Kafka クラスターに接続するための構成パラメーターを含むローカルファイル(
$HOME/.confluent/java.config
など)を作成します。以下のテンプレートのいずれかをベースとして、クラスターへの接続情報でこのファイルをカスタマイズします。{{ BROKER_ENDPOINT }}
、{{CLUSTER_API_KEY }}
、および{{ CLUSTER_API_SECRET }}
に値を代入します(この値を手作業で見つける方法については、「Configure Confluent Cloud Clients」を参照してください。 または、Confluent Cloud 向け ccloud-stack ユーティリティ を使用して値を自動作成します)。Confluent Cloud のテンプレート構成ファイル
# Required connection configs for Kafka producer, consumer, and admin bootstrap.servers={{ BROKER_ENDPOINT }} security.protocol=SASL_SSL sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username='{{ CLUSTER_API_KEY }}' password='{{ CLUSTER_API_SECRET }}'; sasl.mechanism=PLAIN # Required for correctness in Apache Kafka clients prior to 2.6 client.dns.lookup=use_all_dns_ips # Best practice for Kafka producer to prevent data loss acks=all
ローカルホストのテンプレート構成ファイル
# Kafka bootstrap.servers=localhost:9092
基本プロデューサーおよびコンシューマー¶
このサンプルでは、プロデューサーアプリケーションが Kafka クラスターのトピックに Kafka データを書き込みます。Kafka クラスターにトピックがまだ存在しない場合、プロデューサーアプリケーションは Kafka Admin Client API を使用してトピックを作成します。Kafka に書き込まれる各レコードには、ユーザー名(alice
など)を表すキーと、json フォーマットのカウント値({"count": 0}
など)があります。コンシューマーアプリケーションは、同じ Kafka トピックを読み取り、各レコードの処理中にカウントのローリング合計を保持します。
レコードの生成¶
Kafka トピックを作成します。
kafka-topics --bootstrap-server `grep "^\s*bootstrap.server" $HOME/.confluent/java.config | tail -1` --command-config $HOME/.confluent/java.config --topic test1 --create --replication-factor 3 --partitions 6
Confluent CLI プロデューサー を実行して、次の引数を渡し、トピック
test1
にメッセージを書き込みます。--cloud
: Confluent Cloud クラスターにメッセージを書き込みます--config
: Confluent Cloud 接続情報を含むファイル--property parse.key=true --property key.separator=,
: コンマで区切られたキーと値を渡します
confluent local services kafka produce test1 --cloud --config $HOME/.confluent/java.config --property parse.key=true --property key.separator=,
>
プロンプトで、メッセージのキーと値の区切り文字として,
を使用して、メッセージをいくつか作成します。alice,{"count":0} alice,{"count":1} alice,{"count":2}
入力し終わったら、
Ctrl + D
キーを押します("+" はキーを同時に押すことを意味します)。プロデューサーコード を表示します。
レコードの消費¶
Confluent CLI コンシューマー を実行して、追加の引数を渡し、トピック
test1
からメッセージを読み取ります。--cloud
: Confluent Cloud クラスターにメッセージを書き込みます--config
: Confluent Cloud 接続情報を含むファイル--property print.key=true
: キーと値を出力します(デフォルトでは、値のみを出力します)。--from-beginning
: トピックの先頭から、すべてのメッセージを出力します
confluent local services kafka consume test1 --cloud --config $HOME/.confluent/java.config --property print.key=true --from-beginning
コンシューマーがすべてのメッセージを受信したことを確認します。次のように表示されます。
alice {"count":0} alice {"count":1} alice {"count":2}
入力し終わったら、
Ctrl + C
キーを押します("+" はキーを同時に押すことを意味します)。コンシューマーコード を表示します。
Avro と Confluent Cloud Schema Registry¶
このサンプルは、前のサンプルと似ていますが、値は Avro フォーマットです。また、Confluent Cloud Schema Registry と統合されています。Confluent Cloud Schema Registry を使用する前に、その 可用性と制限 を確認してください。
Confluent Cloud GUI の「Quick Start for Schema Management on Confluent Cloud」にある説明に従って、Confluent Cloud Schema Registry を有効にし、API キーおよびシークレットを作成して接続します。
ご使用の VPC が Confluent Cloud Schema Registry 公衆インターネットエンドポイントに接続できることを確認します。
Schema Registry に接続するための構成パラメーターを含むローカル構成ファイル(
$HOME/.confluent/java.config
など)をアップデートします。Confluent Cloud のテンプレート構成ファイル
# Required connection configs for Kafka producer, consumer, and admin bootstrap.servers={{ BROKER_ENDPOINT }} security.protocol=SASL_SSL sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username='{{ CLUSTER_API_KEY }}' password='{{ CLUSTER_API_SECRET }}'; sasl.mechanism=PLAIN # Required for correctness in Apache Kafka clients prior to 2.6 client.dns.lookup=use_all_dns_ips # Best practice for Kafka producer to prevent data loss acks=all # Required connection configs for Confluent Cloud Schema Registry schema.registry.url=https://{{ SR_ENDPOINT }} basic.auth.credentials.source=USER_INFO basic.auth.user.info={{ SR_API_KEY }}:{{ SR_API_SECRET }}
ローカルホストのテンプレート構成ファイル
# Kafka bootstrap.servers=localhost:9092 # Confluent Schema Registry schema.registry.url=http://localhost:8081
Schema Registry サブジェクトのリストを表示して Confluent Cloud Schema Registry の資格情報を確認します。次の例の
{{ SR_API_KEY }}
、{{ SR_API_SECRET }}
、および{{ SR_ENDPOINT }}
に値を代入します。curl -u {{ SR_API_KEY }}:{{ SR_API_SECRET }} https://{{ SR_ENDPOINT }}/subjects
Avro レコードの生成¶
ホストからの Confluent Cloud Schema Registry 資格情報が有効であることを確認します。次の出力の
<SR API KEY>
、<SR API SECRET>
、および<SR ENDPOINT>
に値を代入します。# View the list of registered subjects curl -u <SR API KEY>:<SR API SECRET> https://<SR ENDPOINT>/subjects # Same as above, as a single bash command to parse the values out of $HOME/.confluent/java.config curl -u $(grep "^schema.registry.basic.auth.user.info" $HOME/.confluent/java.config | cut -d'=' -f2) $(grep "^schema.registry.url" $HOME/.confluent/java.config | cut -d'=' -f2)/subjects
ローカル Confluent Cloud 構成ファイル(
$HOME/.confluent/java.config
)を表示します。cat $HOME/.confluent/java.config
次の例に示すように、この構成ファイルで
<SR API KEY>
、<SR API SECRET>
、および<SR ENDPOINT>
に値を代入します。... basic.auth.credentials.source=USER_INFO schema.registry.basic.auth.user.info=<SR API KEY>:<SR API SECRET> schema.registry.url=https://<SR ENDPOINT> ...
Kafka トピックを作成します。
kafka-topics --bootstrap-server `grep "^\s*bootstrap.server" $HOME/.confluent/java.config | tail -1` --command-config $HOME/.confluent/java.config --topic test2 --create --replication-factor 3 --partitions 6
Confluent CLI プロデューサー を実行して、次の引数を渡し、トピック
test2
にメッセージを書き込みます。--value-format avro
: メッセージの値の部分に Avro データフォーマットを使用します--property value.schema
: スキーマを定義します--property schema.registry.url
: Confluent Cloud Schema Registry エンドポイントhttp://<SR ENDPOINT>
に接続します--property basic.auth.credentials.source
:USER_INFO
を指定します--property schema.registry.basic.auth.user.info
:<SR API KEY>:<SR API SECRET>
重要
https://github.com/confluentinc/schema-registry/issues/1052 に記載された理由により、プロパティファイルではなく、追加の Schema Registry パラメーターをプロパティとして渡す必要があります。
confluent local services kafka produce test2 --cloud --config $HOME/.confluent/java.config --value-format avro --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"count","type":"int"}]}' --property schema.registry.url=https://<SR ENDPOINT> --property basic.auth.credentials.source=USER_INFO --property schema.registry.basic.auth.user.info='<SR API KEY>:<SR API SECRET>'
>
プロンプトで、次のメッセージを入力します。{"count":0} {"count":1} {"count":2}
入力し終わったら、
Ctrl + D
キーを押します("+" はキーを同時に押すことを意味します)。プロデューサー Avro コード を表示します。
Avro レコードの消費¶
Confluent CLI コンシューマー を実行して、次の引数を渡し、トピック
test2
からメッセージを読み取ります。--value-format avro
: メッセージの値の部分に Avro データフォーマットを使用します--property schema.registry.url
: Confluent Cloud Schema Registry エンドポイントhttp://<SR ENDPOINT>
に接続します--property basic.auth.credentials.source
:USER_INFO
を指定します--property schema.registry.basic.auth.user.info
:<SR API KEY>:<SR API SECRET>
重要
https://github.com/confluentinc/schema-registry/issues/1052 に記載された理由により、プロパティファイルではなく、追加の Schema Registry パラメーターをプロパティとして渡す必要があります。
confluent local services kafka consume test2 --cloud --config $HOME/.confluent/java.config --value-format avro --property schema.registry.url=https://<SR ENDPOINT> --property basic.auth.credentials.source=USER_INFO --property schema.registry.basic.auth.user.info='<SR API KEY>:<SR API SECRET>' --from-beginning
コンシューマーがすべてのメッセージを受信したことを確認します。次のように表示されます。
{"count":0} {"count":1} {"count":2}
入力し終わったら、
Ctrl + C
キーを押します("+" はキーを同時に押すことを意味します)。コンシューマー Avro コード を表示します。