Rust: Apache Kafka® のサンプルコード

このチュートリアルでは、Apache Kafka® クラスターにメッセージを生成し、このクラスターからメッセージを消費する Rust クライアントアプリケーションを実行します。

チュートリアルの実行後に、提供された ソースコード をリファレンスとして使用して、独自の Kafka クライアントアプリケーションを開発します。

前提条件

クライアント

Kafka クラスター

このチュートリアルは、Confluent Cloud を使用して取り組むのが最も簡単です。そうすればローカル Kafka クラスターを実行する必要はありません。Confluent Cloud にサインアップするときは、プロモーションコード C50INTEG を適用すると、$50 相当を無料で使用できます(詳細)。Console から LEARN をクリックしてクラスターをプロビジョニングし、Clients をクリックして、クライアントアプリケーションに対して設定するクラスター固有の構成と認証情報を取得します。サポートされている CLIREST API、またはコミュニティでサポートされている Confluent Cloud 向け ccloud-stack ユーティリティ を使用してもかまいません。

Confluent Cloud の使用を希望しない場合は、ローカルホスト や他のリモートサーバーで実行されている Kafka クラスターでこのチュートリアルに取り組むこともできます。

セットアップ

  1. confluentinc/examples GitHub リポジトリのクローンを作成し、6.2.4-post ブランチをチェックアウトします。

    git clone https://github.com/confluentinc/examples
    cd examples
    git checkout 6.2.4-post
    
  2. Rust のサンプルのディレクトリに変更します。

    cd clients/cloud/rust/
    
  3. Kafka クラスターに接続するための構成パラメーターを含むローカルファイル($HOME/.confluent/librdkafka.config など)を作成します。以下のテンプレートのいずれかをベースとして、クラスターへの接続情報でこのファイルをカスタマイズします。{{ BROKER_ENDPOINT }}{{CLUSTER_API_KEY }}、および {{ CLUSTER_API_SECRET }} に値を代入します(この値を手作業で見つける方法については、「Configure Confluent Cloud Clients」を参照してください。 または、Confluent Cloud 向け ccloud-stack ユーティリティ を使用して値を自動作成します)。

    • Confluent Cloud のテンプレート構成ファイル

      # Kafka
      bootstrap.servers={{ BROKER_ENDPOINT }}
      security.protocol=SASL_SSL
      sasl.mechanisms=PLAIN
      sasl.username={{ CLUSTER_API_KEY }}
      sasl.password={{ CLUSTER_API_SECRET }}
      
    • ローカルホストのテンプレート構成ファイル

      # Kafka
      bootstrap.servers=localhost:9092
      

基本コンシューマーおよびプロデューサー

このサンプルでは、プロデューサーアプリケーションが Kafka クラスターのトピックに Kafka データを書き込みます。Kafka クラスターにトピックがまだ存在しない場合、プロデューサーアプリケーションは Kafka Admin Client API を使用してトピックを作成します。Kafka に書き込まれる各レコードには、ユーザー名(alice など)を表すキーと、json フォーマットのカウント値({"count": 0} など)があります。コンシューマーアプリケーションは、同じ Kafka トピックを読み取り、各レコードの処理中にカウントのローリング合計を保持します。

レコードの生成

  1. プロデューサーとコンシューマーのバイナリをビルドします。

    cargo build
    

    次のように表示されます。

    Compiling rust_kafka_client_example v0.1.0 (/path/to/repo/examples/clients/cloud/rust)
    Finished dev [unoptimized + debuginfo] target(s) in 2.85s
    
  2. プロデューサーを実行して、次の引数を渡します。

    • Kafka クラスターに接続するための構成パラメーターを含むローカルファイル
    • トピック名
    ./target/debug/producer --config $HOME/.confluent/librdkafka.config --topic test1
    
  3. プロデューサーがすべてのメッセージを送信したことを確認します。次のように表示されます。

    Preparing to produce record: alice 0
    Preparing to produce record: alice 1
    Preparing to produce record: alice 2
    Preparing to produce record: alice 3
    Preparing to produce record: alice 4
    Preparing to produce record: alice 5
    Preparing to produce record: alice 6
    Preparing to produce record: alice 7
    Preparing to produce record: alice 8
    Successfully produced record to topic test1 partition [5] @ offset 117
    Successfully produced record to topic test1 partition [5] @ offset 118
    Successfully produced record to topic test1 partition [5] @ offset 119
    Successfully produced record to topic test1 partition [5] @ offset 120
    Successfully produced record to topic test1 partition [5] @ offset 121
    Successfully produced record to topic test1 partition [5] @ offset 122
    Successfully produced record to topic test1 partition [5] @ offset 123
    Successfully produced record to topic test1 partition [5] @ offset 124
    Successfully produced record to topic test1 partition [5] @ offset 125
    
  4. プロデューサーコード を表示します。

レコードの消費

  1. コンシューマーを実行して、次の引数を渡します。

    • Kafka クラスターに接続するための構成パラメーターを含むローカルファイル
    • 前の手順で使用したトピック名
    ./target/debug/consumer --config $HOME/.confluent/librdkafka.config --topic test1
    
  2. コンシューマーがすべてのメッセージを受信したことを確認します。次のように表示されます。

    Consumed record from topic test1 partition [5] @ offset 117 with key alice and value 0
    Consumed record from topic test1 partition [5] @ offset 118 with key alice and value 1
    Consumed record from topic test1 partition [5] @ offset 119 with key alice and value 2
    Consumed record from topic test1 partition [5] @ offset 120 with key alice and value 3
    Consumed record from topic test1 partition [5] @ offset 121 with key alice and value 4
    Consumed record from topic test1 partition [5] @ offset 122 with key alice and value 5
    Consumed record from topic test1 partition [5] @ offset 123 with key alice and value 6
    Consumed record from topic test1 partition [5] @ offset 124 with key alice and value 7
    Consumed record from topic test1 partition [5] @ offset 125 with key alice and value 8
    
  3. コンシューマーコード を表示します。