kafkacat: Apache Kafka® のサンプルコマンド¶
このチュートリアルでは、Apache Kafka® クラスターにメッセージを生成し、このクラスターからメッセージを消費する kcat クライアントアプリケーションを実行します。
チュートリアルの実行後に、提供された ソースコード をリファレンスとして使用して、独自の Kafka クライアントアプリケーションを開発します。
前提条件¶
クライアント¶
Kafka クラスター¶
- このチュートリアルは、どの環境の Kafka クラスターにも対応しています。
- Confluent Cloud 内
- ローカルホスト 上
- 任意のリモート Kafka クラスター
- Confluent Cloud で実行している場合は、API キーおよびシークレットで Confluent Cloud クラスターにアクセスできることが必要です。
- Confluent Cloud にサインアップし、プロモーションコード
C50INTEG
を使用した最初の 20 名のユーザーは、$50 相当を無料で使用できます(詳細)。 - Confluent Cloud で Kafka クラスター、資格情報、および ACL を自動的に作成する方法については、「Confluent Cloud 向け ccloud-stack ユーティリティ」を参照してください。
- Confluent Cloud にサインアップし、プロモーションコード
セットアップ¶
confluentinc/examples GitHub リポジトリのクローンを作成し、
6.1.5-post
ブランチをチェックアウトします。git clone https://github.com/confluentinc/examples cd examples git checkout 6.1.5-post
kcat のサンプルのディレクトリに変更します。
cd clients/cloud/kafkacat/
Kafka クラスターに接続するための構成パラメーターを含むローカルファイル(
$HOME/.confluent/java.config
など)を作成します。以下のテンプレートのいずれかをベースとして、クラスターへの接続情報でこのファイルをカスタマイズします。{{ BROKER_ENDPOINT }}
、{{CLUSTER_API_KEY }}
、および{{ CLUSTER_API_SECRET }}
に値を代入します(この値を手作業で見つける方法については、「Configure Confluent Cloud Clients」を参照してください。 または、Confluent Cloud 向け ccloud-stack ユーティリティ を使用して値を自動作成します)。Confluent Cloud のテンプレート構成ファイル
# Required connection configs for Kafka producer, consumer, and admin bootstrap.servers={{ BROKER_ENDPOINT }} security.protocol=SASL_SSL sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username='{{ CLUSTER_API_KEY }}' password='{{ CLUSTER_API_SECRET }}'; sasl.mechanism=PLAIN # Required for correctness in Apache Kafka clients prior to 2.6 client.dns.lookup=use_all_dns_ips # Best practice for Kafka producer to prevent data loss acks=all
ローカルホストのテンプレート構成ファイル
# Kafka bootstrap.servers=localhost:9092
基本プロデューサーおよびコンシューマー¶
このサンプルでは、プロデューサーアプリケーションが Kafka クラスターのトピックに Kafka データを書き込みます。Kafka クラスターにトピックがまだ存在しない場合、プロデューサーアプリケーションは Kafka Admin Client API を使用してトピックを作成します。Kafka に書き込まれる各レコードには、ユーザー名(alice
など)を表すキーと、json フォーマットのカウント値({"count": 0}
など)があります。コンシューマーアプリケーションは、同じ Kafka トピックを読み取り、各レコードの処理中にカウントのローリング合計を保持します。
レコードの生成¶
Kafka トピックを作成します。
kafka-topics --bootstrap-server `grep "^\s*bootstrap.server" $HOME/.confluent/java.config | tail -1` --command-config $HOME/.confluent/java.config --topic test1 --create --replication-factor 3 --partitions 6
kcat を実行して、次の引数を渡し、トピック
test1
にメッセージを書き込みます。-F $HOME/.confluent/java.config
: Confluent Cloud クラスターに接続するための構成ファイル-K ,
: コンマで区切られたキーと値を渡します
kafkacat -F $HOME/.confluent/java.config -K , -P -t test1
メッセージのキーと値の区切り文字として
,
を使用して、メッセージをいくつか作成します。alice,{"count":0} alice,{"count":1} alice,{"count":2}
入力し終わったら、
Ctrl + D
キーを押します("+" はキーを同時に押すことを意味します)。プロデューサーコード を表示します。
レコードの消費¶
kcat を再度実行して、次の引数を渡し、トピック
test
からメッセージを読み取ります。-F $HOME/.confluent/java.config
: Confluent Cloud クラスターに接続するための構成ファイル-K ,
: コンマで区切られたキーと値を渡します-e
: 最後のメッセージを受信すると、正常に終了します
kafkacat -F $HOME/.confluent/java.config -K , -C -t test1 -e
前の手順で入力したメッセージが表示されます。
% Reading configuration from file $HOME/.confluent/java.config % Reached end of topic test1 [3] at offset 0 alice,{"count":0} alice,{"count":1} alice,{"count":2} % Reached end of topic test1 [7] at offset 0 % Reached end of topic test1 [4] at offset 0 % Reached end of topic test1 [6] at offset 0 % Reached end of topic test1 [5] at offset 0 % Reached end of topic test1 [1] at offset 0 % Reached end of topic test1 [2] at offset 0 % Reached end of topic test1 [9] at offset 0 % Reached end of topic test1 [10] at offset 0 % Reached end of topic test1 [0] at offset 0 % Reached end of topic test1 [8] at offset 0 % Reached end of topic test1 [11] at offset 3: exiting
コンシューマーコード を表示します。