Confluent CLI: Apache Kafka® のサンプルコマンド¶
このチュートリアルでは、Confluent CLI の confluent local
コマンドを使用して Apache Kafka® クラスターにメッセージを生成し、このクラスターからメッセージを消費します。
チュートリアルの実行後に、提供された ソースコード をリファレンスとして使用して、独自の Kafka クライアントアプリケーションを開発します。
注釈
Confluent CLI の confluent local
コマンドは、開発目的での使用のみを想定しており、本稼働環境には適していません。
前提条件¶
Kafka クラスター¶
このチュートリアルは、Confluent Cloud を使用して取り組むのが最も簡単です。そうすればローカル Kafka クラスターを実行する必要はありません。Confluent Cloud にサインアップするときは、プロモーションコード C50INTEG
を適用すると、$50 相当を無料で使用できます(詳細)。Console から LEARN をクリックしてクラスターをプロビジョニングし、Clients
をクリックして、クライアントアプリケーションに対して設定するクラスター固有の構成と認証情報を取得します。サポートされている CLI や REST API、またはコミュニティでサポートされている Confluent Cloud 向け ccloud-stack ユーティリティ を使用してもかまいません。
Confluent Cloud の使用を希望しない場合は、ローカルホスト や他のリモートサーバーで実行されている Kafka クラスターでこのチュートリアルに取り組むこともできます。
セットアップ¶
confluentinc/examples GitHub リポジトリのクローンを作成し、
7.1.1-post
ブランチをチェックアウトします。git clone https://github.com/confluentinc/examples cd examples git checkout 7.1.1-post
Confluent CLI のサンプルのディレクトリに変更します。
cd clients/cloud/confluent-cli/
Kafka クラスターに接続するための構成パラメーターを含むローカルファイル(
$HOME/.confluent/java.config
など)を作成します。以下のテンプレートのいずれかをベースとして、クラスターへの接続情報でこのファイルをカスタマイズします。{{ BROKER_ENDPOINT }}
、{{CLUSTER_API_KEY }}
、および{{ CLUSTER_API_SECRET }}
に値を代入します(この値を手作業で見つける方法については、「Configure Confluent Cloud Clients」を参照してください。 または、Confluent Cloud 向け ccloud-stack ユーティリティ を使用して値を自動作成します)。Confluent Cloud のテンプレート構成ファイル
# Required connection configs for Kafka producer, consumer, and admin bootstrap.servers={{ BROKER_ENDPOINT }} security.protocol=SASL_SSL sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username='{{ CLUSTER_API_KEY }}' password='{{ CLUSTER_API_SECRET }}'; sasl.mechanism=PLAIN # Required for correctness in Apache Kafka clients prior to 2.6 client.dns.lookup=use_all_dns_ips # Best practice for higher availability in Apache Kafka clients prior to 3.0 session.timeout.ms=45000 # Best practice for Kafka producer to prevent data loss acks=all
ローカルホストのテンプレート構成ファイル
# Kafka bootstrap.servers=localhost:9092
基本プロデューサーおよびコンシューマー¶
このサンプルでは、プロデューサーアプリケーションが Kafka クラスターのトピックに Kafka データを書き込みます。Kafka クラスターにトピックがまだ存在しない場合、プロデューサーアプリケーションは Kafka Admin Client API を使用してトピックを作成します。Kafka に書き込まれる各レコードには、ユーザー名(alice
など)を表すキーと、json フォーマットのカウント値({"count": 0}
など)があります。コンシューマーアプリケーションは、同じ Kafka トピックを読み取り、各レコードの処理中にカウントのローリング合計を保持します。
レコードの生成¶
Kafka トピックを作成します。
kafka-topics --bootstrap-server `grep "^\s*bootstrap.server" $HOME/.confluent/java.config | tail -1` --command-config $HOME/.confluent/java.config --topic test1 --create --replication-factor 3 --partitions 6
Confluent CLI プロデューサー を実行して、次の引数を渡し、トピック
test1
にメッセージを書き込みます。--cloud
: Confluent Cloud クラスターにメッセージを書き込みます--config
: Confluent Cloud 接続情報を含むファイル--property parse.key=true --property key.separator=,
: コンマで区切られたキーと値を渡します
confluent local services kafka produce test1 --cloud --config $HOME/.confluent/java.config --property parse.key=true --property key.separator=,
>
プロンプトで、メッセージのキーと値の区切り文字として,
を使用して、メッセージをいくつか作成します。alice,{"count":0} alice,{"count":1} alice,{"count":2}
入力し終わったら、
Ctrl + D
キーを押します("+" はキーを同時に押すことを意味します)。プロデューサーコード を表示します。
レコードの消費¶
Confluent CLI コンシューマー を実行して、追加の引数を渡し、トピック
test1
からメッセージを読み取ります。--cloud
: Confluent Cloud クラスターにメッセージを書き込みます--config
: Confluent Cloud 接続情報を含むファイル--property print.key=true
: キーと値を出力します(デフォルトでは、値のみを出力します)。--from-beginning
: トピックの先頭から、すべてのメッセージを出力します
confluent local services kafka consume test1 --cloud --config $HOME/.confluent/java.config --property print.key=true --from-beginning
コンシューマーがすべてのメッセージを受信したことを確認します。次のように表示されます。
alice {"count":0} alice {"count":1} alice {"count":2}
入力し終わったら、
Ctrl + C
キーを押します("+" はキーを同時に押すことを意味します)。コンシューマーコード を表示します。
Avro と Confluent Cloud Schema Registry¶
このサンプルは、前のサンプルと似ていますが、値は Avro フォーマットです。また、Confluent Cloud Schema Registry と統合されています。Confluent Cloud Schema Registry を使用する前に、その 可用性と制限 を確認してください。
Confluent Cloud Console の「Confluent Cloud におけるスキーマ管理のクイックスタート」にある説明に従って、Confluent Cloud Schema Registry を有効にし、API キーおよびシークレットを作成して接続します。
ご使用の VPC が Confluent Cloud Schema Registry 公衆インターネットエンドポイントに接続できることを確認します。
Schema Registry に接続するための構成パラメーターを含むローカル構成ファイル(
$HOME/.confluent/java.config
など)をアップデートします。Confluent Cloud のテンプレート構成ファイル
# Required connection configs for Kafka producer, consumer, and admin bootstrap.servers={{ BROKER_ENDPOINT }} security.protocol=SASL_SSL sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username='{{ CLUSTER_API_KEY }}' password='{{ CLUSTER_API_SECRET }}'; sasl.mechanism=PLAIN # Required for correctness in Apache Kafka clients prior to 2.6 client.dns.lookup=use_all_dns_ips # Best practice for higher availability in Apache Kafka clients prior to 3.0 session.timeout.ms=45000 # Best practice for Kafka producer to prevent data loss acks=all # Required connection configs for Confluent Cloud Schema Registry schema.registry.url=https://{{ SR_ENDPOINT }} basic.auth.credentials.source=USER_INFO basic.auth.user.info={{ SR_API_KEY }}:{{ SR_API_SECRET }}
ローカルホストのテンプレート構成ファイル
# Kafka bootstrap.servers=localhost:9092 # Confluent Schema Registry schema.registry.url=http://localhost:8081
Schema Registry サブジェクトのリストを表示して Confluent Cloud Schema Registry の資格情報を確認します。次の例の
{{ SR_API_KEY }}
、{{ SR_API_SECRET }}
、および{{ SR_ENDPOINT }}
に値を代入します。curl -u {{ SR_API_KEY }}:{{ SR_API_SECRET }} https://{{ SR_ENDPOINT }}/subjects
Avro レコードの生成¶
ホストからの Confluent Cloud Schema Registry 資格情報が有効であることを確認します。次の出力の
<SR API KEY>
、<SR API SECRET>
、および<SR ENDPOINT>
に値を代入します。# View the list of registered subjects curl -u <SR API KEY>:<SR API SECRET> https://<SR ENDPOINT>/subjects # Same as above, as a single bash command to parse the values out of $HOME/.confluent/java.config curl -u $(grep "^schema.registry.basic.auth.user.info" $HOME/.confluent/java.config | cut -d'=' -f2) $(grep "^schema.registry.url" $HOME/.confluent/java.config | cut -d'=' -f2)/subjects
ローカル Confluent Cloud 構成ファイル(
$HOME/.confluent/java.config
)を表示します。cat $HOME/.confluent/java.config
次の例に示すように、この構成ファイルで
<SR API KEY>
、<SR API SECRET>
、および<SR ENDPOINT>
に値を代入します。... basic.auth.credentials.source=USER_INFO schema.registry.basic.auth.user.info=<SR API KEY>:<SR API SECRET> schema.registry.url=https://<SR ENDPOINT> ...
Kafka トピックを作成します。
kafka-topics --bootstrap-server `grep "^\s*bootstrap.server" $HOME/.confluent/java.config | tail -1` --command-config $HOME/.confluent/java.config --topic test2 --create --replication-factor 3 --partitions 6
Confluent CLI プロデューサー を実行して、次の引数を渡し、トピック
test2
にメッセージを書き込みます。--value-format avro
: メッセージの値の部分に Avro データフォーマットを使用します--property value.schema
: スキーマを定義します--property schema.registry.url
: Confluent Cloud Schema Registry エンドポイントhttp://<SR ENDPOINT>
に接続します--property basic.auth.credentials.source
:USER_INFO
を指定します--property schema.registry.basic.auth.user.info
:<SR API KEY>:<SR API SECRET>
重要
https://github.com/confluentinc/schema-registry/issues/1052 に記載された理由により、プロパティファイルではなく、追加の Schema Registry パラメーターをプロパティとして渡す必要があります。
confluent local services kafka produce test2 --cloud --config $HOME/.confluent/java.config --value-format avro --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"count","type":"int"}]}' --property schema.registry.url=https://<SR ENDPOINT> --property basic.auth.credentials.source=USER_INFO --property schema.registry.basic.auth.user.info='<SR API KEY>:<SR API SECRET>'
>
プロンプトで、次のメッセージを入力します。{"count":0} {"count":1} {"count":2}
入力し終わったら、
Ctrl + D
キーを押します("+" はキーを同時に押すことを意味します)。プロデューサー Avro コード を表示します。
Avro レコードの消費¶
Confluent CLI コンシューマー を実行して、次の引数を渡し、トピック
test2
からメッセージを読み取ります。--value-format avro
: メッセージの値の部分に Avro データフォーマットを使用します--property schema.registry.url
: Confluent Cloud Schema Registry エンドポイントhttp://<SR ENDPOINT>
に接続します--property basic.auth.credentials.source
:USER_INFO
を指定します--property schema.registry.basic.auth.user.info
:<SR API KEY>:<SR API SECRET>
重要
https://github.com/confluentinc/schema-registry/issues/1052 に記載された理由により、プロパティファイルではなく、追加の Schema Registry パラメーターをプロパティとして渡す必要があります。
confluent local services kafka consume test2 --cloud --config $HOME/.confluent/java.config --value-format avro --property schema.registry.url=https://<SR ENDPOINT> --property basic.auth.credentials.source=USER_INFO --property schema.registry.basic.auth.user.info='<SR API KEY>:<SR API SECRET>' --from-beginning
コンシューマーがすべてのメッセージを受信したことを確認します。次のように表示されます。
{"count":0} {"count":1} {"count":2}
入力し終わったら、
Ctrl + C
キーを押します("+" はキーを同時に押すことを意味します)。コンシューマー Avro コード を表示します。