Confluent CLI: Apache Kafka® のサンプルコマンド¶
このチュートリアルでは、Confluent CLI を使用して Apache Kafka® クラスターにメッセージを生成し、このクラスターからメッセージを消費します。
チュートリアルの実行後に、提供された ソースコード をリファレンスとして使用して、独自の Kafka クライアントアプリケーションを開発します。
前提条件¶
クライアント¶
- ローカルにインストールされた Confluent CLI (v2.2.0 以降)
timeout
: bash スクリプトにより、一定の期間の後にコンシューマープロセスを終了するために使用されます。timeout
は、ほとんどの Linux ディストリビューションで使用できますが、macOS では使用できません。macOS ユーザーの方は、「macOS の場合のインストール手順」を参照してください。
Kafka クラスター¶
このチュートリアルは、Confluent Cloud を使用して取り組むのが最も簡単です。そうすればローカル Kafka クラスターを実行する必要はありません。Confluent Cloud にサインアップするときは、プロモーションコード C50INTEG
を適用すると、$50 相当を無料で使用できます(詳細)。Console から LEARN をクリックしてクラスターをプロビジョニングし、Clients
をクリックして、クライアントアプリケーションに対して設定するクラスター固有の構成と認証情報を取得します。サポートされている CLI や REST API、またはコミュニティでサポートされている Confluent Cloud 向け ccloud-stack ユーティリティ を使用してもかまいません。
Confluent Cloud の使用を希望しない場合は、ローカルホスト や他のリモートサーバーで実行されている Kafka クラスターでこのチュートリアルに取り組むこともできます。
セットアップ¶
confluentinc/examples GitHub リポジトリのクローンを作成し、
7.1.1-post
ブランチをチェックアウトします。git clone https://github.com/confluentinc/examples cd examples git checkout 7.1.1-post
Confluent CLI(Cloud)のサンプルのディレクトリに変更します。
cd clients/cloud/ccloud/
コマンド
confluent login
で、Confluent Cloud のユーザー名とパスワードを使用して Confluent CLI にログインします。--save
引数により、Confluent Cloud ユーザーログイン資格情報が保存されるか、ローカルのnetrc
ファイルに対してトークン(SSO の場合)が更新されます。confluent login --save
基本プロデューサーおよびコンシューマー¶
このサンプルでは、プロデューサーアプリケーションが Kafka クラスターのトピックに Kafka データを書き込みます。Kafka クラスターにトピックがまだ存在しない場合、プロデューサーアプリケーションは Kafka Admin Client API を使用してトピックを作成します。Kafka に書き込まれる各レコードには、ユーザー名(alice
など)を表すキーと、json フォーマットのカウント値({"count": 0}
など)があります。コンシューマーアプリケーションは、同じ Kafka トピックを読み取り、各レコードの処理中にカウントのローリング合計を保持します。
レコードの生成¶
Confluent Cloud でトピックを作成します。
confluent kafka topic create test1
Confluent CLI プロデューサー を実行して、次の引数を渡し、トピック
test1
にメッセージを書き込みます。--parse-key --delimiter ,
: コンマで区切られたキーと値を渡します
confluent kafka topic produce test1 --parse-key --delimiter ,
メッセージのキーと値の区切り文字として
,
を使用して、メッセージをいくつか作成します。alice,{"count":0} alice,{"count":1} alice,{"count":2}
入力し終わったら、
Ctrl + C
キーを押します("+" はキーを同時に押すことを意味します)。プロデューサーコード を表示します。
レコードの消費¶
Confluent CLI コンシューマー を実行して、次の引数を渡し、トピック
test1
からメッセージを読み取ります。-b
: トピックの先頭から、すべてのメッセージを出力します--print-key
: キーと値を出力します(デフォルトでは、値のみを出力します)
confluent kafka topic consume test1 -b --print-key
コンシューマーがすべてのメッセージを受信したことを確認します。次のように表示されます。
alice {"count":0} alice {"count":1} alice {"count":2}
入力し終わったら、
Ctrl + C
キーを押します("+" はキーを同時に押すことを意味します)。コンシューマーコード を表示します。
Avro と Confluent Cloud Schema Registry¶
このサンプルは、前のサンプルと似ていますが、値は Avro フォーマットです。また、Confluent Cloud Schema Registry と統合されています。Confluent Cloud Schema Registry を使用する前に、その 可用性と制限 を確認してください。
- Confluent Cloud Console の「Confluent Cloud におけるスキーマ管理のクイックスタート」にある説明に従って、Confluent Cloud Schema Registry を有効にし、API キーおよびシークレットを作成して接続します。
- ご使用の VPC が Confluent Cloud Schema Registry 公衆インターネットエンドポイントに接続できることを確認します。
Avro レコードの生成¶
Confluent Cloud でトピックを作成します。
confluent kafka topic create test2
メッセージペイロードのスキーマを含むファイル(
schema.json
など)を作成します。echo '{"type":"record","name":"myrecord","fields":[{"name":"count","type":"int"}]}' > schema.json
Confluent CLI プロデューサー を実行して、次の引数を渡し、トピック
test2
にメッセージを書き込みます。--value-format avro
: メッセージの値の部分に Avro データフォーマットを使用します--schema
: スキーマファイルへのパス--parse-key --delimiter ,
: コンマで区切られたキーと値を渡します
confluent kafka topic produce test2 --value-format avro --schema schema.json --parse-key --delimiter ,
注釈
このコマンドの初回の実行時には、Confluent Cloud Schema Registry のユーザー資格情報を入力する必要があります。
メッセージのキーと値の区切り文字として
,
を使用して、メッセージをいくつか作成します。alice,{"count":3} alice,{"count":4} alice,{"count":5}
入力し終わったら、
Ctrl + C
キーを押します("+" はキーを同時に押すことを意味します)。プロデューサー Avro コード を表示します。
Avro レコードの消費¶
Confluent CLI コンシューマー を実行して、次の引数を渡し、トピック
test2
からメッセージを読み取ります。-b
: トピックの先頭から、すべてのメッセージを出力します--value-format avro
: メッセージの値の部分に Avro データフォーマットを使用します--print-key
: キーと値を出力します(デフォルトでは、値のみを出力します)
confluent kafka topic consume test2 -b --value-format avro --print-key
コンシューマーがすべてのメッセージを受信したことを確認します。次のように表示されます。
alice {"count":3} alice {"count":4} alice {"count":5}
入力し終わったら、
Ctrl + C
キーを押します("+" はキーを同時に押すことを意味します)。コンシューマー Avro コード を表示します。