Confluent Cloud CLI: Apache Kafka® のサンプルコマンド¶
このチュートリアルでは、Confluent Cloud CLI を使用して Apache Kafka® クラスターにメッセージを生成し、このクラスターからメッセージを消費します。
チュートリアルの実行後に、提供された ソースコード をリファレンスとして使用して、独自の Kafka クライアントアプリケーションを開発します。
前提条件¶
クライアント¶
- ローカルにインストールされた Confluent Cloud CLI (v1.25.0 以降)
timeout
: bash スクリプトにより、一定の期間の後にコンシューマープロセスを終了するために使用されます。timeout
は、ほとんどの Linux ディストリビューションで使用できますが、macOS では使用できません。macOS ユーザーの方は、「macOS の場合のインストール手順」を参照してください。
Kafka クラスター¶
- このチュートリアルは、どの環境の Kafka クラスターにも対応しています。
- Confluent Cloud 内
- ローカルホスト 上
- 任意のリモート Kafka クラスター
- Confluent Cloud で実行している場合は、API キーおよびシークレットで Confluent Cloud クラスターにアクセスできることが必要です。
- Confluent Cloud にサインアップし、プロモーションコード
C50INTEG
を使用した最初の 20 名のユーザーは、$50 相当を無料で使用できます(詳細)。 - Confluent Cloud で Kafka クラスター、資格情報、および ACL を自動的に作成する方法については、「Confluent Cloud 向け ccloud-stack ユーティリティ」を参照してください。
- Confluent Cloud にサインアップし、プロモーションコード
セットアップ¶
confluentinc/examples GitHub リポジトリのクローンを作成し、
6.1.5-post
ブランチをチェックアウトします。git clone https://github.com/confluentinc/examples cd examples git checkout 6.1.5-post
Confluent Cloud CLI のサンプルのディレクトリに変更します。
cd clients/cloud/ccloud/
コマンド
ccloud login
で、Confluent Cloud のユーザー名とパスワードを使用して Confluent Cloud にログインします。--save
引数により、Confluent Cloud ユーザーログイン資格情報が保存されるか、ローカルのnetrc
ファイルに対してトークン(SSO の場合)が更新されます。ccloud login --save
基本プロデューサーおよびコンシューマー¶
このサンプルでは、プロデューサーアプリケーションが Kafka クラスターのトピックに Kafka データを書き込みます。Kafka クラスターにトピックがまだ存在しない場合、プロデューサーアプリケーションは Kafka Admin Client API を使用してトピックを作成します。Kafka に書き込まれる各レコードには、ユーザー名(alice
など)を表すキーと、json フォーマットのカウント値({"count": 0}
など)があります。コンシューマーアプリケーションは、同じ Kafka トピックを読み取り、各レコードの処理中にカウントのローリング合計を保持します。
レコードの生成¶
Confluent Cloud でトピックを作成します。
ccloud kafka topic create test1
Confluent Cloud CLI プロデューサー を実行して、次の引数を渡し、トピック
test1
にメッセージを書き込みます。--parse-key --delimiter ,
: コンマで区切られたキーと値を渡します
ccloud kafka topic produce test1 --parse-key --delimiter ,
メッセージのキーと値の区切り文字として
,
を使用して、メッセージをいくつか作成します。alice,{"count":0} alice,{"count":1} alice,{"count":2}
入力し終わったら、
Ctrl + C
キーを押します("+" はキーを同時に押すことを意味します)。プロデューサーコード を表示します。
レコードの消費¶
Confluent Cloud CLI コンシューマー を実行して、次の引数を渡し、トピック
test1
からメッセージを読み取ります。-b
: トピックの先頭から、すべてのメッセージを出力します--print-key
: キーと値を出力します(デフォルトでは、値のみを出力します)
ccloud kafka topic consume test1 -b --print-key
コンシューマーがすべてのメッセージを受信したことを確認します。次のように表示されます。
alice {"count":0} alice {"count":1} alice {"count":2}
入力し終わったら、
Ctrl + C
キーを押します("+" はキーを同時に押すことを意味します)。コンシューマーコード を表示します。
Avro と Confluent Cloud Schema Registry¶
このサンプルは、前のサンプルと似ていますが、値は Avro フォーマットです。また、Confluent Cloud Schema Registry と統合されています。Confluent Cloud Schema Registry を使用する前に、その 可用性と制限 を確認してください。
- Confluent Cloud GUI の「Confluent Cloud におけるスキーマ管理のクイックスタート」にある説明に従って、Confluent Cloud Schema Registry を有効にし、API キーおよびシークレットを作成して接続します。
- ご使用の VPC が Confluent Cloud Schema Registry パブリックインターネットエンドポイントに接続できることを確認します。
Avro レコードの生成¶
Confluent Cloud でトピックを作成します。
ccloud kafka topic create test2
メッセージペイロードのスキーマを含むファイル(
schema.json
など)を作成します。echo '{"type":"record","name":"myrecord","fields":[{"name":"count","type":"int"}]}' > schema.json
Confluent Cloud CLI プロデューサー を実行して、次の引数を渡し、トピック
test2
にメッセージを書き込みます。--value-format avro
: メッセージの値の部分に Avro データフォーマットを使用します--schema
: スキーマファイルへのパス--parse-key --delimiter ,
: コンマで区切られたキーと値を渡します
ccloud kafka topic produce test2 --value-format avro --schema schema.json --parse-key --delimiter ,
注釈
このコマンドの初回の実行時には、Confluent Cloud Schema Registry のユーザー資格情報を入力する必要があります。
メッセージのキーと値の区切り文字として
,
を使用して、メッセージをいくつか作成します。alice,{"count":3} alice,{"count":4} alice,{"count":5}
入力し終わったら、
Ctrl + C
キーを押します("+" はキーを同時に押すことを意味します)。プロデューサー Avro コード を表示します。
Avro レコードの消費¶
Confluent Cloud CLI コンシューマー を実行して、次の引数を渡し、トピック
test2
からメッセージを読み取ります。-b
: トピックの先頭から、すべてのメッセージを出力します--value-format avro
: メッセージの値の部分に Avro データフォーマットを使用します--print-key
: キーと値を出力します(デフォルトでは、値のみを出力します)
ccloud kafka topic consume test2 -b --value-format avro --print-key
コンシューマーがすべてのメッセージを受信したことを確認します。次のように表示されます。
alice {"count":3} alice {"count":4} alice {"count":5}
入力し終わったら、
Ctrl + C
キーを押します("+" はキーを同時に押すことを意味します)。コンシューマー Avro コード を表示します。