kcat: Apache Kafka® のサンプルコマンド¶
このチュートリアルでは、Apache Kafka® クラスターにメッセージを生成し、このクラスターからメッセージを消費する kcat
(旧称 kafkacat
)クライアントアプリケーションを実行します。
チュートリアルの実行後に、提供された ソースコード をリファレンスとして使用して、独自の Kafka クライアントアプリケーションを開発します。
前提条件¶
クライアント¶
- kcat をマシンにインストール済み。最新のマスターブランチから kcat を ビルド して
-F
機能を入手する必要があります。この機能により、Confluent Cloud 構成ファイルに構成を渡しやすくなるためです。 - ローカルにインストールされた Confluent CLI v2.5.0 以降。
Kafka クラスター¶
このチュートリアルは、Confluent Cloud を使用して取り組むのが最も簡単です。そうすればローカル Kafka クラスターを実行する必要はありません。Confluent Cloud にサインアップするときは、プロモーションコード C50INTEG
を適用すると、$50 相当を無料で使用できます(詳細)。Console から LEARN をクリックしてクラスターをプロビジョニングし、Clients
をクリックして、クライアントアプリケーションに対して設定するクラスター固有の構成と認証情報を取得します。サポートされている CLI や REST API、またはコミュニティでサポートされている Confluent Cloud 向け ccloud-stack ユーティリティ を使用してもかまいません。
Confluent Cloud の使用を希望しない場合は、ローカルホスト や他のリモートサーバーで実行されている Kafka クラスターでこのチュートリアルに取り組むこともできます。
セットアップ¶
confluentinc/examples GitHub リポジトリのクローンを作成し、
7.1.1-post
ブランチをチェックアウトします。git clone https://github.com/confluentinc/examples cd examples git checkout 7.1.1-post
kcat のサンプルのディレクトリに変更します。
cd clients/cloud/kcat/
Kafka クラスターに接続するための構成パラメーターを含むローカルファイル(
$HOME/.confluent/librdkafka.config
など)を作成します。以下のテンプレートのいずれかをベースとして、クラスターへの接続情報でこのファイルをカスタマイズします。{{ BROKER_ENDPOINT }}
、{{CLUSTER_API_KEY }}
、および{{ CLUSTER_API_SECRET }}
に値を代入します(この値を手作業で見つける方法については、「Configure Confluent Cloud Clients」を参照してください。 または、Confluent Cloud 向け ccloud-stack ユーティリティ を使用して値を自動作成します)。Confluent Cloud のテンプレート構成ファイル
# Kafka bootstrap.servers={{ BROKER_ENDPOINT }} security.protocol=SASL_SSL sasl.mechanisms=PLAIN sasl.username={{ CLUSTER_API_KEY }} sasl.password={{ CLUSTER_API_SECRET }} # Best practice for higher availability in librdkafka clients prior to 1.7 session.timeout.ms=45000
ローカルホストのテンプレート構成ファイル
# Kafka bootstrap.servers=localhost:9092
基本プロデューサーおよびコンシューマー¶
このサンプルでは、プロデューサーアプリケーションが Kafka クラスターのトピックに Kafka データを書き込みます。Kafka クラスターにトピックがまだ存在しない場合、プロデューサーアプリケーションは Kafka Admin Client API を使用してトピックを作成します。Kafka に書き込まれる各レコードには、ユーザー名(alice
など)を表すキーと、json フォーマットのカウント値({"count": 0}
など)があります。コンシューマーアプリケーションは、同じ Kafka トピックを読み取り、各レコードの処理中にカウントのローリング合計を保持します。
レコードの生成¶
Kafka トピックを作成します。Confluent Cloud を使用している場合は、Confluent CLI を使用します。
confluent kafka topic create --if-not-exists test1
ローカルの Kafka クラスターを使用している場合は、Kafka CLI を使用します。
kafka-topics --bootstrap-server localhost:9092 --topic $topic_name --create --if-not-exists
kcat を実行して、次の引数を渡し、トピック
test1
にメッセージを書き込みます。-F $HOME/.confluent/librdkafka.config
: Confluent Cloud クラスターに接続するための構成ファイル-K ,
: コンマで区切られたキーと値を渡します
kcat -F $HOME/.confluent/librdkafka.config -K , -P -t test1
メッセージのキーと値の区切り文字として
,
を使用して、メッセージをいくつか作成します。alice,{"count":0} alice,{"count":1} alice,{"count":2}
入力し終わったら、
Ctrl + D
キーを押します("+" はキーを同時に押すことを意味します)。プロデューサーコード を表示します。
レコードの消費¶
kcat を再度実行して、次の引数を渡し、トピック
test
からメッセージを読み取ります。-F $HOME/.confluent/librdkafka.config
: Confluent Cloud クラスターに接続するための構成ファイル-K ,
: コンマで区切られたキーと値を渡します-e
: 最後のメッセージを受信すると、正常に終了します
kcat -F $HOME/.confluent/librdkafka.config -K , -C -t test1 -e
前の手順で入力したメッセージが表示されます。
% Reading configuration from file $HOME/.confluent/librdkafka.config % Reached end of topic test1 [3] at offset 0 alice,{"count":0} alice,{"count":1} alice,{"count":2} % Reached end of topic test1 [7] at offset 0 % Reached end of topic test1 [4] at offset 0 % Reached end of topic test1 [6] at offset 0 % Reached end of topic test1 [5] at offset 0 % Reached end of topic test1 [1] at offset 0 % Reached end of topic test1 [2] at offset 0 % Reached end of topic test1 [9] at offset 0 % Reached end of topic test1 [10] at offset 0 % Reached end of topic test1 [0] at offset 0 % Reached end of topic test1 [8] at offset 0 % Reached end of topic test1 [11] at offset 3: exiting
コンシューマーコード を表示します。