kafkacat: Apache Kafka® のサンプルコマンド

このチュートリアルでは、Apache Kafka® クラスターにメッセージを生成し、このクラスターからメッセージを消費する kcat クライアントアプリケーションを実行します。

チュートリアルの実行後に、提供された ソースコード をリファレンスとして使用して、独自の Kafka クライアントアプリケーションを開発します。

前提条件

クライアント

  • kafkacat をマシンにインストール済み。最新のマスターブランチから kcat を ビルド して -F 機能を入手する必要があります。この機能により、Confluent Cloud 構成ファイルに構成を渡しやすくなるためです。

Kafka クラスター

  • このチュートリアルは、どの環境の Kafka クラスターにも対応しています。
  • Confluent Cloud で実行している場合は、API キーおよびシークレットで Confluent Cloud クラスターにアクセスできることが必要です。
    • Confluent Cloud にサインアップし、プロモーションコード C50INTEG を使用した最初の 20 名のユーザーは、$50 相当を無料で使用できます(詳細)。
    • Confluent Cloud で Kafka クラスター、資格情報、および ACL を自動的に作成する方法については、「Confluent Cloud 向け ccloud-stack ユーティリティ」を参照してください。

セットアップ

  1. confluentinc/examples GitHub リポジトリのクローンを作成し、6.1.5-post ブランチをチェックアウトします。

    git clone https://github.com/confluentinc/examples
    cd examples
    git checkout 6.1.5-post
    
  2. kcat のサンプルのディレクトリに変更します。

    cd clients/cloud/kafkacat/
    
  3. Kafka クラスターに接続するための構成パラメーターを含むローカルファイル($HOME/.confluent/java.config など)を作成します。以下のテンプレートのいずれかをベースとして、クラスターへの接続情報でこのファイルをカスタマイズします。{{ BROKER_ENDPOINT }}{{CLUSTER_API_KEY }}、および {{ CLUSTER_API_SECRET }} に値を代入します(この値を手作業で見つける方法については、「Configure Confluent Cloud Clients」を参照してください。 または、Confluent Cloud 向け ccloud-stack ユーティリティ を使用して値を自動作成します)。

    • Confluent Cloud のテンプレート構成ファイル

      # Required connection configs for Kafka producer, consumer, and admin
      bootstrap.servers={{ BROKER_ENDPOINT }}
      security.protocol=SASL_SSL
      sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username='{{ CLUSTER_API_KEY }}' password='{{ CLUSTER_API_SECRET }}';
      sasl.mechanism=PLAIN
      # Required for correctness in Apache Kafka clients prior to 2.6
      client.dns.lookup=use_all_dns_ips
      
      # Best practice for Kafka producer to prevent data loss 
      acks=all
      
    • ローカルホストのテンプレート構成ファイル

      # Kafka
      bootstrap.servers=localhost:9092
      

基本プロデューサーおよびコンシューマー

このサンプルでは、プロデューサーアプリケーションが Kafka クラスターのトピックに Kafka データを書き込みます。Kafka クラスターにトピックがまだ存在しない場合、プロデューサーアプリケーションは Kafka Admin Client API を使用してトピックを作成します。Kafka に書き込まれる各レコードには、ユーザー名(alice など)を表すキーと、json フォーマットのカウント値({"count": 0} など)があります。コンシューマーアプリケーションは、同じ Kafka トピックを読み取り、各レコードの処理中にカウントのローリング合計を保持します。

レコードの生成

  1. Kafka トピックを作成します。

    kafka-topics --bootstrap-server `grep "^\s*bootstrap.server" $HOME/.confluent/java.config | tail -1` --command-config $HOME/.confluent/java.config --topic test1 --create --replication-factor 3 --partitions 6
    
  2. kcat を実行して、次の引数を渡し、トピック test1 にメッセージを書き込みます。

    • -F $HOME/.confluent/java.config: Confluent Cloud クラスターに接続するための構成ファイル
    • -K ,: コンマで区切られたキーと値を渡します
    kafkacat -F $HOME/.confluent/java.config -K , -P -t test1
    
  3. メッセージのキーと値の区切り文字として , を使用して、メッセージをいくつか作成します。

    alice,{"count":0}
    alice,{"count":1}
    alice,{"count":2}
    
  4. 入力し終わったら、Ctrl + D キーを押します("+" はキーを同時に押すことを意味します)。

  5. プロデューサーコード を表示します。

レコードの消費

  1. kcat を再度実行して、次の引数を渡し、トピック test からメッセージを読み取ります。

    • -F $HOME/.confluent/java.config: Confluent Cloud クラスターに接続するための構成ファイル
    • -K ,: コンマで区切られたキーと値を渡します
    • -e: 最後のメッセージを受信すると、正常に終了します
    kafkacat -F $HOME/.confluent/java.config -K , -C -t test1 -e
    

    前の手順で入力したメッセージが表示されます。

    % Reading configuration from file $HOME/.confluent/java.config
    % Reached end of topic test1 [3] at offset 0
    alice,{"count":0}
    alice,{"count":1}
    alice,{"count":2}
    % Reached end of topic test1 [7] at offset 0
    % Reached end of topic test1 [4] at offset 0
    % Reached end of topic test1 [6] at offset 0
    % Reached end of topic test1 [5] at offset 0
    % Reached end of topic test1 [1] at offset 0
    % Reached end of topic test1 [2] at offset 0
    % Reached end of topic test1 [9] at offset 0
    % Reached end of topic test1 [10] at offset 0
    % Reached end of topic test1 [0] at offset 0
    % Reached end of topic test1 [8] at offset 0
    % Reached end of topic test1 [11] at offset 3: exiting
    
  2. コンシューマーコード を表示します。