Apache Kafka® CLI: サンプルコマンド¶
このチュートリアルでは、Apache Kafka® クラスターにメッセージを生成し、このクラスターからメッセージを消費する Apache Kafka® のコマンドを実行します。
チュートリアルの実行後に、提供された ソースコード をリファレンスとして使用して、独自の Kafka クライアントアプリケーションを開発します。
前提条件¶
Kafka クラスター¶
- このチュートリアルは、どの環境の Kafka クラスターにも対応しています。
- Confluent Cloud 内
- ローカルホスト 上
- 任意のリモート Kafka クラスター
- Confluent Cloud で実行している場合は、API キーおよびシークレットで Confluent Cloud クラスターにアクセスできることが必要です。
- Confluent Cloud にサインアップし、プロモーションコード
C50INTEG
を使用した最初の 20 名のユーザーは、$50 相当を無料で使用できます(詳細)。 - Confluent Cloud で Kafka クラスター、資格情報、および ACL を自動的に作成する方法については、「Confluent Cloud 向け ccloud-stack ユーティリティ」を参照してください。
- Confluent Cloud にサインアップし、プロモーションコード
セットアップ¶
confluentinc/examples GitHub リポジトリのクローンを作成し、
6.1.5-post
ブランチをチェックアウトします。git clone https://github.com/confluentinc/examples cd examples git checkout 6.1.5-post
Apache Kafka® のコマンドのサンプルのディレクトリに変更します。
cd clients/cloud/kafka-commands/
Kafka クラスターに接続するための構成パラメーターを含むローカルファイル(
$HOME/.confluent/java.config
など)を作成します。以下のテンプレートのいずれかをベースとして、クラスターへの接続情報でこのファイルをカスタマイズします。{{ BROKER_ENDPOINT }}
、{{CLUSTER_API_KEY }}
、および{{ CLUSTER_API_SECRET }}
に値を代入します(この値を手作業で見つける方法については、「Configure Confluent Cloud Clients」を参照してください。 または、Confluent Cloud 向け ccloud-stack ユーティリティ を使用して値を自動作成します)。Confluent Cloud のテンプレート構成ファイル
# Required connection configs for Kafka producer, consumer, and admin bootstrap.servers={{ BROKER_ENDPOINT }} security.protocol=SASL_SSL sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username='{{ CLUSTER_API_KEY }}' password='{{ CLUSTER_API_SECRET }}'; sasl.mechanism=PLAIN # Required for correctness in Apache Kafka clients prior to 2.6 client.dns.lookup=use_all_dns_ips # Best practice for Kafka producer to prevent data loss acks=all
ローカルホストのテンプレート構成ファイル
# Kafka bootstrap.servers=localhost:9092
基本プロデューサーおよびコンシューマー¶
このサンプルでは、プロデューサーアプリケーションが Kafka クラスターのトピックに Kafka データを書き込みます。Kafka クラスターにトピックがまだ存在しない場合、プロデューサーアプリケーションは Kafka Admin Client API を使用してトピックを作成します。Kafka に書き込まれる各レコードには、ユーザー名(alice
など)を表すキーと、json フォーマットのカウント値({"count": 0}
など)があります。コンシューマーアプリケーションは、同じ Kafka トピックを読み取り、各レコードの処理中にカウントのローリング合計を保持します。
レコードの生成¶
Kafka トピックを作成します。
kafka-topics \ --bootstrap-server `grep "^\s*bootstrap.server" $HOME/.confluent/java.config | tail -1` \ --command-config $HOME/.confluent/java.config \ --topic test1 \ --create \ --replication-factor 3 \ --partitions 6
kafka-console-producer
コマンドを実行して、次の引数を渡し、トピックtest1
にメッセージを書き込みます。--property parse.key=true --property key.separator=,
: コンマで区切られたキーと値を渡します
kafka-console-producer \ --topic test1 \ --broker-list `grep "^\s*bootstrap.server" $HOME/.confluent/java.config | tail -1` \ --property parse.key=true \ --property key.separator=, \ --producer.config $HOME/.confluent/java.config
>
プロンプトで、メッセージのキーと値の区切り文字として,
を使用して、メッセージをいくつか作成します。alice,{"count":0} alice,{"count":1} alice,{"count":2}
入力し終わったら、
Ctrl + D
キーを押します("+" はキーを同時に押すことを意味します)。プロデューサーコード を表示します。
レコードの消費¶
kafka-console-consumer
コマンドを実行して、以下の追加の引数を渡し、トピックtest1
からメッセージを読み取ります。--property print.key=true
: キーと値を出力します(デフォルトでは、値のみを出力します)。--from-beginning
: トピックの先頭から、すべてのメッセージを出力します
kafka-console-consumer \ --topic test1 \ --bootstrap-server `grep "^\s*bootstrap.server" $HOME/.confluent/java.config | tail -1` \ --property print.key=true \ --from-beginning \ --consumer.config $HOME/.confluent/java.config
手順 3 で入力したメッセージが表示されます。
alice {"count":0} alice {"count":1} alice {"count":2}
入力し終わったら、
Ctrl + C
キーを押します("+" はキーを同時に押すことを意味します)。コンシューマーコード を表示します。
Avro と Confluent Cloud Schema Registry¶
このサンプルは、前のサンプルと似ていますが、値は Avro フォーマットです。また、Confluent Cloud Schema Registry と統合されています。Confluent Cloud Schema Registry を使用する前に、その 可用性と制限 を確認してください。
Confluent Cloud GUI の「Confluent Cloud におけるスキーマ管理のクイックスタート」にある説明に従って、Confluent Cloud Schema Registry を有効にし、API キーおよびシークレットを作成して接続します。
ご使用の VPC が Confluent Cloud Schema Registry 公衆インターネットエンドポイントに接続できることを確認します。
Schema Registry に接続するための構成パラメーターを含むローカル構成ファイル(
$HOME/.confluent/java.config
など)をアップデートします。Confluent Cloud のテンプレート構成ファイル
# Required connection configs for Kafka producer, consumer, and admin bootstrap.servers={{ BROKER_ENDPOINT }} security.protocol=SASL_SSL sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username='{{ CLUSTER_API_KEY }}' password='{{ CLUSTER_API_SECRET }}'; sasl.mechanism=PLAIN # Required for correctness in Apache Kafka clients prior to 2.6 client.dns.lookup=use_all_dns_ips # Best practice for Kafka producer to prevent data loss acks=all # Required connection configs for Confluent Cloud Schema Registry schema.registry.url=https://{{ SR_ENDPOINT }} basic.auth.credentials.source=USER_INFO basic.auth.user.info={{ SR_API_KEY }}:{{ SR_API_SECRET }}
ローカルホストのテンプレート構成ファイル
# Kafka bootstrap.servers=localhost:9092 # Confluent Schema Registry schema.registry.url=http://localhost:8081
Schema Registry サブジェクトのリストを表示して Confluent Cloud Schema Registry の資格情報を確認します。次の例の
{{ SR_API_KEY }}
、{{ SR_API_SECRET }}
、および{{ SR_ENDPOINT }}
に値を代入します。curl -u {{ SR_API_KEY }}:{{ SR_API_SECRET }} https://{{ SR_ENDPOINT }}/subjects
Avro レコードの生成¶
Confluent Cloud でトピックを作成します。
kafka-topics \ --bootstrap-server `grep "^\s*bootstrap.server" $HOME/.confluent/java.config | tail -1` \ --command-config $HOME/.confluent/java.config \ --topic test2 \ --create \ --replication-factor 3 \ --partitions 6
kafka-avro-console-producer
コマンドを実行して、次の引数を渡し、トピックtest2
にメッセージを書き込みます。--property value.schema
: スキーマを定義します--property schema.registry.url
: Confluent Cloud Schema Registry エンドポイントhttps://<SR ENDPOINT>
に接続します--property basic.auth.credentials.source
:USER_INFO
を指定します--property schema.registry.basic.auth.user.info
:<SR API KEY>:<SR API SECRET>
重要
https://github.com/confluentinc/schema-registry/issues/1052 に記載された理由により、プロパティファイルではなく、追加の Schema Registry パラメーターをプロパティとして渡す必要があります。
kafka-avro-console-producer \ --topic test2 \ --broker-list `grep "^\s*bootstrap.server" $HOME/.confluent/java.config | tail -1` \ --producer.config $HOME/.confluent/java.config \ --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"count","type":"int"}]}' \ --property schema.registry.url=https://<SR ENDPOINT> \ --property basic.auth.credentials.source=USER_INFO \ --property schema.registry.basic.auth.user.info='<SR API KEY>:<SR API SECRET>' # Same as above, as a single bash command to parse the values out of $HOME/.confluent/java.config kafka-avro-console-producer \ --topic test2 \ --broker-list `grep "^\s*bootstrap.server" $HOME/.confluent/java.config | tail -1` \ --producer.config $HOME/.confluent/java.config \ --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"count","type":"int"}]}' \ --property schema.registry.url=$(grep "^schema.registry.url" $HOME/.confluent/java.config | cut -d'=' -f2) \ --property basic.auth.credentials.source=USER_INFO \ --property schema.registry.basic.auth.user.info=$(grep "^schema.registry.basic.auth.user.info" $HOME/.confluent/java.config | cut -d'=' -f2)
>
プロンプトで複数のメッセージを入力します。{"count":0} {"count":1} {"count":2}
入力し終わったら、
Ctrl + D
キーを押します("+" はキーを同時に押すことを意味します)。プロデューサー Avro コード を表示します。
Avro レコードの消費¶
kafka-avro-console-consumer
コマンドを実行して、次の引数を渡し、トピックtest
からメッセージを読み取ります。https://github.com/confluentinc/schema-registry/issues/1052 に記載された理由により、プロパティファイルではなく、追加の Schema Registry パラメーターをプロパティとして渡す必要があります。--property schema.registry.url
: Confluent Cloud Schema Registry エンドポイントhttps://<SR ENDPOINT>
に接続します--property basic.auth.credentials.source
:USER_INFO
を指定します--property schema.registry.basic.auth.user.info
:<SR API KEY>:<SR API SECRET>
重要
https://github.com/confluentinc/schema-registry/issues/1052 に記載された理由により、プロパティファイルではなく、追加の Schema Registry パラメーターをプロパティとして渡す必要があります。
kafka-avro-console-consumer \ --topic test2 \ --from-beginning \ --bootstrap-server `grep "^\s*bootstrap.server" $HOME/.confluent/java.config | tail -1` \ --consumer.config $HOME/.confluent/java.config \ --property schema.registry.url=https://<SR ENDPOINT> \ --property basic.auth.credentials.source=USER_INFO \ --property schema.registry.basic.auth.user.info='<SR API KEY>:<SR API SECRET>' Same as above, as a single bash command to parse the values out of $HOME/.confluent/java.config kafka-avro-console-consumer \ --topic test2 \ --from-beginning \ --bootstrap-server `grep "^\s*bootstrap.server" $HOME/.confluent/java.config | tail -1` \ --consumer.config $HOME/.confluent/java.config \ --property schema.registry.url=$(grep "^schema.registry.url" $HOME/.confluent/java.config | cut -d'=' -f2) \ --property basic.auth.credentials.source=USER_INFO \ --property schema.registry.basic.auth.user.info=$(grep "^schema.registry.basic.auth.user.info" $HOME/.confluent/java.config | cut -d'=' -f2)
前の手順で入力したメッセージが表示されます。
{"count":0} {"count":1} {"count":2}
入力し終わったら、
Ctrl + C
キーを押します("+" はキーを同時に押すことを意味します)。コンシューマー Avro コード を表示します。