Confluent Cloud CLI: Apache Kafka® のサンプルコマンド

このチュートリアルでは、Confluent Cloud CLI を使用して Apache Kafka® クラスターにメッセージを生成し、このクラスターからメッセージを消費します。

チュートリアルの実行後に、提供された ソースコード をリファレンスとして使用して、独自の Kafka クライアントアプリケーションを開発します。

前提条件

クライアント

  • ローカルにインストールされた Confluent Cloud CLI (v1.25.0 以降)
  • timeout: bash スクリプトにより、一定の期間の後にコンシューマープロセスを終了するために使用されます。timeout は、ほとんどの Linux ディストリビューションで使用できますが、macOS では使用できません。macOS ユーザーの方は、「macOS の場合のインストール手順」を参照してください。

Kafka クラスター

このチュートリアルは、Confluent Cloud を使用して取り組むのが最も簡単です。そうすればローカル Kafka クラスターを実行する必要はありません。Confluent Cloud にサインアップするときは、プロモーションコード C50INTEG を適用すると、$50 相当を無料で使用できます(詳細)。Console から LEARN をクリックしてクラスターをプロビジョニングし、Clients をクリックして、クライアントアプリケーションに対して設定するクラスター固有の構成と認証情報を取得します。サポートされている CLIREST API、またはコミュニティでサポートされている Confluent Cloud 向け ccloud-stack ユーティリティ を使用してもかまいません。

Confluent Cloud の使用を希望しない場合は、ローカルホスト や他のリモートサーバーで実行されている Kafka クラスターでこのチュートリアルに取り組むこともできます。

セットアップ

  1. confluentinc/examples GitHub リポジトリのクローンを作成し、6.2.4-post ブランチをチェックアウトします。

    git clone https://github.com/confluentinc/examples
    cd examples
    git checkout 6.2.4-post
    
  2. Confluent Cloud CLI のサンプルのディレクトリに変更します。

    cd clients/cloud/ccloud/
    
  3. コマンド ccloud login で、Confluent Cloud のユーザー名とパスワードを使用して Confluent Cloud にログインします。--save 引数により、Confluent Cloud ユーザーログイン資格情報が保存されるか、ローカルの netrc ファイルに対してトークン(SSO の場合)が更新されます。

    ccloud login --save
    

基本プロデューサーおよびコンシューマー

このサンプルでは、プロデューサーアプリケーションが Kafka クラスターのトピックに Kafka データを書き込みます。Kafka クラスターにトピックがまだ存在しない場合、プロデューサーアプリケーションは Kafka Admin Client API を使用してトピックを作成します。Kafka に書き込まれる各レコードには、ユーザー名(alice など)を表すキーと、json フォーマットのカウント値({"count": 0} など)があります。コンシューマーアプリケーションは、同じ Kafka トピックを読み取り、各レコードの処理中にカウントのローリング合計を保持します。

レコードの生成

  1. Confluent Cloud でトピックを作成します。

    ccloud kafka topic create test1
    
  2. Confluent Cloud CLI プロデューサー を実行して、次の引数を渡し、トピック test1 にメッセージを書き込みます。

    • --parse-key --delimiter ,: コンマで区切られたキーと値を渡します
    ccloud kafka topic produce test1 --parse-key --delimiter ,
    
  3. メッセージのキーと値の区切り文字として , を使用して、メッセージをいくつか作成します。

    alice,{"count":0}
    alice,{"count":1}
    alice,{"count":2}
    
  4. 入力し終わったら、Ctrl + C キーを押します("+" はキーを同時に押すことを意味します)。

  5. プロデューサーコード を表示します。

レコードの消費

  1. Confluent Cloud CLI コンシューマー を実行して、次の引数を渡し、トピック test1 からメッセージを読み取ります。

    • -b: トピックの先頭から、すべてのメッセージを出力します
    • --print-key: キーと値を出力します(デフォルトでは、値のみを出力します)
    ccloud kafka topic consume test1 -b --print-key
    
  2. コンシューマーがすべてのメッセージを受信したことを確認します。次のように表示されます。

    alice   {"count":0}
    alice   {"count":1}
    alice   {"count":2}
    
  3. 入力し終わったら、Ctrl + C キーを押します("+" はキーを同時に押すことを意味します)。

  4. コンシューマーコード を表示します。

Avro と Confluent Cloud Schema Registry

このサンプルは、前のサンプルと似ていますが、値は Avro フォーマットです。また、Confluent Cloud Schema Registry と統合されています。Confluent Cloud Schema Registry を使用する前に、その 可用性と制限 を確認してください。

  1. Confluent Cloud Console の「Confluent Cloud におけるスキーマ管理のクイックスタート」にある説明に従って、Confluent Cloud Schema Registry を有効にし、API キーおよびシークレットを作成して接続します。
  2. ご使用の VPC が Confluent Cloud Schema Registry 公衆インターネットエンドポイントに接続できることを確認します。

Avro レコードの生成

  1. Confluent Cloud でトピックを作成します。

    ccloud kafka topic create test2
    
  2. メッセージペイロードのスキーマを含むファイル(schema.json など)を作成します。

    echo '{"type":"record","name":"myrecord","fields":[{"name":"count","type":"int"}]}' > schema.json
    
  3. Confluent Cloud CLI プロデューサー を実行して、次の引数を渡し、トピック test2 にメッセージを書き込みます。

    • --value-format avro: メッセージの値の部分に Avro データフォーマットを使用します
    • --schema: スキーマファイルへのパス
    • --parse-key --delimiter ,: コンマで区切られたキーと値を渡します
    ccloud kafka topic produce test2 --value-format avro --schema schema.json --parse-key --delimiter ,
    

    注釈

    このコマンドの初回の実行時には、Confluent Cloud Schema Registry のユーザー資格情報を入力する必要があります。

  4. メッセージのキーと値の区切り文字として , を使用して、メッセージをいくつか作成します。

    alice,{"count":3}
    alice,{"count":4}
    alice,{"count":5}
    
  5. 入力し終わったら、Ctrl + C キーを押します("+" はキーを同時に押すことを意味します)。

  6. プロデューサー Avro コード を表示します。

Avro レコードの消費

  1. Confluent Cloud CLI コンシューマー を実行して、次の引数を渡し、トピック test2 からメッセージを読み取ります。

    • -b: トピックの先頭から、すべてのメッセージを出力します
    • --value-format avro: メッセージの値の部分に Avro データフォーマットを使用します
    • --print-key: キーと値を出力します(デフォルトでは、値のみを出力します)
    ccloud kafka topic consume test2 -b --value-format avro --print-key
    
  2. コンシューマーがすべてのメッセージを受信したことを確認します。次のように表示されます。

    alice   {"count":3}
    alice   {"count":4}
    alice   {"count":5}
    
  3. 入力し終わったら、Ctrl + C キーを押します("+" はキーを同時に押すことを意味します)。

  4. コンシューマー Avro コード を表示します。