Rust: Apache Kafka® のサンプルコード¶
このチュートリアルでは、Apache Kafka® クラスターにメッセージを生成し、このクラスターからメッセージを消費する Rust クライアントアプリケーションを実行します。
チュートリアルの実行後に、提供された ソースコード をリファレンスとして使用して、独自の Kafka クライアントアプリケーションを開発します。
前提条件¶
クライアント¶
- Apache Kafka 向け Rust クライアント をマシンにインストール済み
Kafka クラスター¶
- このチュートリアルは、どの環境の Kafka クラスターにも対応しています。
- Confluent Cloud 内
- ローカルホスト 上
- 任意のリモート Kafka クラスター
- Confluent Cloud で実行している場合は、API キーおよびシークレットで Confluent Cloud クラスターにアクセスできることが必要です。
- Confluent Cloud にサインアップし、プロモーションコード
C50INTEG
を使用した最初の 20 名のユーザーは、$50 相当を無料で使用できます(詳細)。 - Confluent Cloud で Kafka クラスター、資格情報、および ACL を自動的に作成する方法については、「Confluent Cloud 向け ccloud-stack ユーティリティ」を参照してください。
- Confluent Cloud にサインアップし、プロモーションコード
セットアップ¶
confluentinc/examples GitHub リポジトリのクローンを作成し、
6.1.5-post
ブランチをチェックアウトします。git clone https://github.com/confluentinc/examples cd examples git checkout 6.1.5-post
Rust のサンプルのディレクトリに変更します。
cd clients/cloud/rust/
Kafka クラスターに接続するための構成パラメーターを含むローカルファイル(
$HOME/.confluent/librdkafka.config
など)を作成します。以下のテンプレートのいずれかをベースとして、クラスターへの接続情報でこのファイルをカスタマイズします。{{ BROKER_ENDPOINT }}
、{{CLUSTER_API_KEY }}
、および{{ CLUSTER_API_SECRET }}
に値を代入します(この値を手作業で見つける方法については、「Configure Confluent Cloud Clients」を参照してください。 または、Confluent Cloud 向け ccloud-stack ユーティリティ を使用して値を自動作成します)。Confluent Cloud のテンプレート構成ファイル
# Kafka bootstrap.servers={{ BROKER_ENDPOINT }} security.protocol=SASL_SSL sasl.mechanisms=PLAIN sasl.username={{ CLUSTER_API_KEY }} sasl.password={{ CLUSTER_API_SECRET }}
ローカルホストのテンプレート構成ファイル
# Kafka bootstrap.servers=localhost:9092
基本コンシューマーおよびプロデューサー¶
このサンプルでは、プロデューサーアプリケーションが Kafka クラスターのトピックに Kafka データを書き込みます。Kafka クラスターにトピックがまだ存在しない場合、プロデューサーアプリケーションは Kafka Admin Client API を使用してトピックを作成します。Kafka に書き込まれる各レコードには、ユーザー名(alice
など)を表すキーと、json フォーマットのカウント値({"count": 0}
など)があります。コンシューマーアプリケーションは、同じ Kafka トピックを読み取り、各レコードの処理中にカウントのローリング合計を保持します。
レコードの生成¶
プロデューサーとコンシューマーのバイナリをビルドします。
cargo build
次のように表示されます。
Compiling rust_kafka_client_example v0.1.0 (/path/to/repo/examples/clients/cloud/rust) Finished dev [unoptimized + debuginfo] target(s) in 2.85s
プロデューサーを実行して、次の引数を渡します。
- Kafka クラスターに接続するための構成パラメーターを含むローカルファイル
- トピック名
./target/debug/producer --config $HOME/.confluent/librdkafka.config --topic test1
プロデューサーがすべてのメッセージを送信したことを確認します。次のように表示されます。
Preparing to produce record: alice 0 Preparing to produce record: alice 1 Preparing to produce record: alice 2 Preparing to produce record: alice 3 Preparing to produce record: alice 4 Preparing to produce record: alice 5 Preparing to produce record: alice 6 Preparing to produce record: alice 7 Preparing to produce record: alice 8 Successfully produced record to topic test1 partition [5] @ offset 117 Successfully produced record to topic test1 partition [5] @ offset 118 Successfully produced record to topic test1 partition [5] @ offset 119 Successfully produced record to topic test1 partition [5] @ offset 120 Successfully produced record to topic test1 partition [5] @ offset 121 Successfully produced record to topic test1 partition [5] @ offset 122 Successfully produced record to topic test1 partition [5] @ offset 123 Successfully produced record to topic test1 partition [5] @ offset 124 Successfully produced record to topic test1 partition [5] @ offset 125
プロデューサーコード を表示します。
レコードの消費¶
コンシューマーを実行して、次の引数を渡します。
- Kafka クラスターに接続するための構成パラメーターを含むローカルファイル
- 前の手順で使用したトピック名
./target/debug/consumer --config $HOME/.confluent/librdkafka.config --topic test1
コンシューマーがすべてのメッセージを受信したことを確認します。次のように表示されます。
Consumed record from topic test1 partition [5] @ offset 117 with key alice and value 0 Consumed record from topic test1 partition [5] @ offset 118 with key alice and value 1 Consumed record from topic test1 partition [5] @ offset 119 with key alice and value 2 Consumed record from topic test1 partition [5] @ offset 120 with key alice and value 3 Consumed record from topic test1 partition [5] @ offset 121 with key alice and value 4 Consumed record from topic test1 partition [5] @ offset 122 with key alice and value 5 Consumed record from topic test1 partition [5] @ offset 123 with key alice and value 6 Consumed record from topic test1 partition [5] @ offset 124 with key alice and value 7 Consumed record from topic test1 partition [5] @ offset 125 with key alice and value 8
コンシューマーコード を表示します。