Apache Kafka® CLI: サンプルコマンド

このチュートリアルでは、Apache Kafka® クラスターにメッセージを生成し、このクラスターからメッセージを消費する Apache Kafka® のコマンドを実行します。

チュートリアルの実行後に、提供された ソースコード をリファレンスとして使用して、独自の Kafka クライアントアプリケーションを開発します。

前提条件

クライアント

Kafka クラスター

このチュートリアルは、Confluent Cloud を使用して取り組むのが最も簡単です。そうすればローカル Kafka クラスターを実行する必要はありません。Confluent Cloud にサインアップするときは、プロモーションコード C50INTEG を適用すると、$50 相当を無料で使用できます(詳細)。Console から LEARN をクリックしてクラスターをプロビジョニングし、Clients をクリックして、クライアントアプリケーションに対して設定するクラスター固有の構成と認証情報を取得します。サポートされている CLIREST API、またはコミュニティでサポートされている Confluent Cloud 向け ccloud-stack ユーティリティ を使用してもかまいません。

Confluent Cloud の使用を希望しない場合は、ローカルホスト や他のリモートサーバーで実行されている Kafka クラスターでこのチュートリアルに取り組むこともできます。

セットアップ

  1. confluentinc/examples GitHub リポジトリのクローンを作成し、7.1.1-post ブランチをチェックアウトします。

    git clone https://github.com/confluentinc/examples
    cd examples
    git checkout 7.1.1-post
    
  2. Apache Kafka® のコマンドのサンプルのディレクトリに変更します。

    cd clients/cloud/kafka-commands/
    
  3. Kafka クラスターに接続するための構成パラメーターを含むローカルファイル($HOME/.confluent/java.config など)を作成します。以下のテンプレートのいずれかをベースとして、クラスターへの接続情報でこのファイルをカスタマイズします。{{ BROKER_ENDPOINT }}{{CLUSTER_API_KEY }}、および {{ CLUSTER_API_SECRET }} に値を代入します(この値を手作業で見つける方法については、「Configure Confluent Cloud Clients」を参照してください。 または、Confluent Cloud 向け ccloud-stack ユーティリティ を使用して値を自動作成します)。

    • Confluent Cloud のテンプレート構成ファイル

      # Required connection configs for Kafka producer, consumer, and admin
      bootstrap.servers={{ BROKER_ENDPOINT }}
      security.protocol=SASL_SSL
      sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username='{{ CLUSTER_API_KEY }}' password='{{ CLUSTER_API_SECRET }}';
      sasl.mechanism=PLAIN
      # Required for correctness in Apache Kafka clients prior to 2.6
      client.dns.lookup=use_all_dns_ips
      
      # Best practice for higher availability in Apache Kafka clients prior to 3.0
      session.timeout.ms=45000
      
      # Best practice for Kafka producer to prevent data loss 
      acks=all
      
    • ローカルホストのテンプレート構成ファイル

      # Kafka
      bootstrap.servers=localhost:9092
      

基本プロデューサーおよびコンシューマー

このサンプルでは、プロデューサーアプリケーションが Kafka クラスターのトピックに Kafka データを書き込みます。Kafka クラスターにトピックがまだ存在しない場合、プロデューサーアプリケーションは Kafka Admin Client API を使用してトピックを作成します。Kafka に書き込まれる各レコードには、ユーザー名(alice など)を表すキーと、json フォーマットのカウント値({"count": 0} など)があります。コンシューマーアプリケーションは、同じ Kafka トピックを読み取り、各レコードの処理中にカウントのローリング合計を保持します。

レコードの生成

  1. Kafka トピックを作成します。

    kafka-topics \
       --bootstrap-server `grep "^\s*bootstrap.server" $HOME/.confluent/java.config | tail -1` \
       --command-config $HOME/.confluent/java.config \
       --topic test1 \
       --create \
       --replication-factor 3 \
       --partitions 6
    
  2. kafka-console-producer コマンドを実行して、次の引数を渡し、トピック test1 にメッセージを書き込みます。

    • --property parse.key=true --property key.separator=,: コンマで区切られたキーと値を渡します
    kafka-console-producer \
       --topic test1 \
       --broker-list `grep "^\s*bootstrap.server" $HOME/.confluent/java.config | tail -1` \
       --property parse.key=true \
       --property key.separator=, \
       --producer.config $HOME/.confluent/java.config
    
  3. > プロンプトで、メッセージのキーと値の区切り文字として , を使用して、メッセージをいくつか作成します。

    alice,{"count":0}
    alice,{"count":1}
    alice,{"count":2}
    
  4. 入力し終わったら、Ctrl + D キーを押します("+" はキーを同時に押すことを意味します)。

  5. プロデューサーコード を表示します。

レコードの消費

  1. kafka-console-consumer コマンドを実行して、以下の追加の引数を渡し、トピック test1 からメッセージを読み取ります。

    • --property print.key=true: キーと値を出力します(デフォルトでは、値のみを出力します)。
    • --from-beginning: トピックの先頭から、すべてのメッセージを出力します
    kafka-console-consumer \
       --topic test1 \
       --bootstrap-server `grep "^\s*bootstrap.server" $HOME/.confluent/java.config | tail -1` \
       --property print.key=true \
       --from-beginning \
       --consumer.config $HOME/.confluent/java.config
    

    手順 3 で入力したメッセージが表示されます。

    alice   {"count":0}
    alice   {"count":1}
    alice   {"count":2}
    
  2. 入力し終わったら、Ctrl + C キーを押します("+" はキーを同時に押すことを意味します)。

  3. コンシューマーコード を表示します。

Avro と Confluent Cloud Schema Registry

このサンプルは、前のサンプルと似ていますが、値は Avro フォーマットです。また、Confluent Cloud Schema Registry と統合されています。Confluent Cloud Schema Registry を使用する前に、その 可用性と制限 を確認してください。

  1. Confluent Cloud Console の「Confluent Cloud におけるスキーマ管理のクイックスタート」にある説明に従って、Confluent Cloud Schema Registry を有効にし、API キーおよびシークレットを作成して接続します。

  2. ご使用の VPC が Confluent Cloud Schema Registry 公衆インターネットエンドポイントに接続できることを確認します。

  3. Schema Registry に接続するための構成パラメーターを含むローカル構成ファイル($HOME/.confluent/java.config など)をアップデートします。

    • Confluent Cloud のテンプレート構成ファイル

      # Required connection configs for Kafka producer, consumer, and admin
      bootstrap.servers={{ BROKER_ENDPOINT }}
      security.protocol=SASL_SSL
      sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username='{{ CLUSTER_API_KEY }}' password='{{ CLUSTER_API_SECRET }}';
      sasl.mechanism=PLAIN
      # Required for correctness in Apache Kafka clients prior to 2.6
      client.dns.lookup=use_all_dns_ips
      
      # Best practice for higher availability in Apache Kafka clients prior to 3.0
      session.timeout.ms=45000
      
      # Best practice for Kafka producer to prevent data loss 
      acks=all
      
      # Required connection configs for Confluent Cloud Schema Registry
      schema.registry.url=https://{{ SR_ENDPOINT }}
      basic.auth.credentials.source=USER_INFO
      basic.auth.user.info={{ SR_API_KEY }}:{{ SR_API_SECRET }}
      
    • ローカルホストのテンプレート構成ファイル

      # Kafka
      bootstrap.servers=localhost:9092
      
      # Confluent Schema Registry
      schema.registry.url=http://localhost:8081
      
  4. Schema Registry サブジェクトのリストを表示して Confluent Cloud Schema Registry の資格情報を確認します。次の例の {{ SR_API_KEY }}{{ SR_API_SECRET }}、および {{ SR_ENDPOINT }} に値を代入します。

    curl -u {{ SR_API_KEY }}:{{ SR_API_SECRET }} https://{{ SR_ENDPOINT }}/subjects
    

Avro レコードの生成

  1. Confluent Cloud でトピックを作成します。

    kafka-topics \
       --bootstrap-server `grep "^\s*bootstrap.server" $HOME/.confluent/java.config | tail -1` \
       --command-config $HOME/.confluent/java.config \
       --topic test2 \
       --create \
       --replication-factor 3 \
       --partitions 6
    
  2. kafka-avro-console-producer コマンドを実行して、次の引数を渡し、トピック test2 にメッセージを書き込みます。

    • --property value.schema: スキーマを定義します
    • --property schema.registry.url: Confluent Cloud Schema Registry エンドポイント https://<SR ENDPOINT> に接続します
    • --property basic.auth.credentials.source: USER_INFO を指定します
    • --property schema.registry.basic.auth.user.info: <SR API KEY>:<SR API SECRET>

    重要

    https://github.com/confluentinc/schema-registry/issues/1052 に記載された理由により、プロパティファイルではなく、追加の Schema Registry パラメーターをプロパティとして渡す必要があります。

    kafka-avro-console-producer \
       --topic test2 \
       --broker-list `grep "^\s*bootstrap.server" $HOME/.confluent/java.config | tail -1` \
       --producer.config $HOME/.confluent/java.config \
       --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"count","type":"int"}]}' \
       --property schema.registry.url=https://<SR ENDPOINT> \
       --property basic.auth.credentials.source=USER_INFO \
       --property schema.registry.basic.auth.user.info='<SR API KEY>:<SR API SECRET>'
    
    # Same as above, as a single bash command to parse the values out of $HOME/.confluent/java.config
    kafka-avro-console-producer \
       --topic test2 \
       --broker-list `grep "^\s*bootstrap.server" $HOME/.confluent/java.config | tail -1` \
       --producer.config $HOME/.confluent/java.config \
       --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"count","type":"int"}]}' \
       --property schema.registry.url=$(grep "^schema.registry.url" $HOME/.confluent/java.config | cut -d'=' -f2) \
       --property basic.auth.credentials.source=USER_INFO \
       --property schema.registry.basic.auth.user.info=$(grep "^schema.registry.basic.auth.user.info" $HOME/.confluent/java.config | cut -d'=' -f2)
    
  3. > プロンプトで複数のメッセージを入力します。

    {"count":0}
    {"count":1}
    {"count":2}
    
  4. 入力し終わったら、Ctrl + D キーを押します("+" はキーを同時に押すことを意味します)。

  5. プロデューサー Avro コード を表示します。

Avro レコードの消費

  1. kafka-avro-console-consumer コマンドを実行して、次の引数を渡し、トピック test からメッセージを読み取ります。https://github.com/confluentinc/schema-registry/issues/1052 に記載された理由により、プロパティファイルではなく、追加の Schema Registry パラメーターをプロパティとして渡す必要があります。

    • --property schema.registry.url: Confluent Cloud Schema Registry エンドポイント https://<SR ENDPOINT> に接続します
    • --property basic.auth.credentials.source: USER_INFO を指定します
    • --property schema.registry.basic.auth.user.info: <SR API KEY>:<SR API SECRET>

    重要

    https://github.com/confluentinc/schema-registry/issues/1052 に記載された理由により、プロパティファイルではなく、追加の Schema Registry パラメーターをプロパティとして渡す必要があります。

    kafka-avro-console-consumer \
       --topic test2 \
       --from-beginning \
       --bootstrap-server `grep "^\s*bootstrap.server" $HOME/.confluent/java.config | tail -1` \
       --consumer.config $HOME/.confluent/java.config \
       --property schema.registry.url=https://<SR ENDPOINT> \
       --property basic.auth.credentials.source=USER_INFO \
       --property schema.registry.basic.auth.user.info='<SR API KEY>:<SR API SECRET>'
    
    Same as above, as a single bash command to parse the values out of $HOME/.confluent/java.config
    kafka-avro-console-consumer \
       --topic test2 \
       --from-beginning \
       --bootstrap-server `grep "^\s*bootstrap.server" $HOME/.confluent/java.config | tail -1` \
       --consumer.config $HOME/.confluent/java.config \
       --property schema.registry.url=$(grep "^schema.registry.url" $HOME/.confluent/java.config | cut -d'=' -f2) \
       --property basic.auth.credentials.source=USER_INFO \
       --property schema.registry.basic.auth.user.info=$(grep "^schema.registry.basic.auth.user.info" $HOME/.confluent/java.config | cut -d'=' -f2)
    

    前の手順で入力したメッセージが表示されます。

    {"count":0}
    {"count":1}
    {"count":2}
    
  2. 入力し終わったら、Ctrl + C キーを押します("+" はキーを同時に押すことを意味します)。

  3. コンシューマー Avro コード を表示します。