C(librdkafka): Apache Kafka® のサンプルコード

このチュートリアルでは、Apache Kafka® クラスターにメッセージを生成し、このクラスターからメッセージを消費する C クライアントアプリケーションを実行します。

チュートリアルの実行後に、提供された ソースコード をリファレンスとして使用して、独自の Kafka クライアントアプリケーションを開発します。

前提条件

クライアント

SSL トラストストアの構成

オペレーティングシステムまたは Linux ディストリビューションによっては、追加の手順を実行して SSL CA ルート証明書をセットアップする必要がある場合があります。システムで SSL CA ルート証明書が正しくセットアップされていない場合は、次のような SSL handshake failed エラーメッセージが表示される場合があります。

%3|1605776788.619|FAIL|rdkafka#producer-1| [thrd:sasl_ssl://...confluent.cloud:9092/bootstr]: sasl_ssl://...confluent.cloud:9092/bootstrap: SSL handshake failed: error:14090086:SSL routines:ssl3_get_server_certificate:certificate verify failed: broker certificate could not be verified, verify that ssl.ca.location is correctly configured or root CA certificates are installed (brew install openssl) (after 258ms in state CONNECT)
Copy

この場合は、検証済み CA ルート証明書のセットを手作業でインストールする必要があります。また、クライアントコードを変更して ssl.ca.location 構成プロパティを設定する必要がある場合もあります(詳細については、このクライアントの基盤である librdkafka のドキュメントを参照してください)。

macOS

より新しい macOS バージョン(10.15 など)では、依存関係の追加が必要な場合があります。

Python クライアントの場合:

pip install certifi
Copy

その他のクライアントの場合:

brew install openssl
Copy

CA ルート証明書をインストールしたら、クライアントコードで ssl.ca.location プロパティを設定します。プロデューサーとコンシューマーのコードファイルを両方とも編集し、プロデューサーおよびコンシューマーのプロパティ内に ssl.ca.location 構成パラメーターを追加します。その値は、ホスト上の適切な CA ルート証明書ファイルの場所に対応している必要があります。

Python クライアントの場合は、certifi.where() を使用して、証明書ファイルの場所を決定します。

ssl.ca.location: certifi.where()
Copy

その他のクライアントの場合は、インストールパスを調べ、それをコードで指定します。

ssl.ca.location: '/usr/local/etc/openssl@1.1/cert.pem'
Copy
CentOS

次の方法で CA ルート証明書をインストールする必要がある場合があります。

sudo yum reinstall ca-certificates
Copy

これで、Kafka クライアントは証明書を見つけることができます。ただし、同じエラーが依然として表示される場合は、クライアントコードで ssl.ca.location プロパティを設定できます。プロデューサーとコンシューマーのコードファイルを両方とも編集し、プロデューサーおよびコンシューマーのプロパティ内に ssl.ca.location 構成パラメーターを追加します。その値は、次の例のように、ホスト上の適切な CA ルート証明書ファイルの場所に対応している必要があります。

ssl.ca.location: '/etc/ssl/certs/ca-bundle.crt'
Copy

Kafka クラスター

このチュートリアルは、Confluent Cloud を使用して取り組むのが最も簡単です。そうすればローカル Kafka クラスターを実行する必要はありません。Confluent Cloud にサインアップするときは、プロモーションコード C50INTEG を適用すると、$50 相当を無料で使用できます(詳細)。Console から LEARN をクリックしてクラスターをプロビジョニングし、Clients をクリックして、クライアントアプリケーションに対して設定するクラスター固有の構成と認証情報を取得します。サポートされている CLIREST API、またはコミュニティでサポートされている Confluent Cloud 向け ccloud-stack ユーティリティ を使用してもかまいません。

Confluent Cloud の使用を希望しない場合は、ローカルホスト や他のリモートサーバーで実行されている Kafka クラスターでこのチュートリアルに取り組むこともできます。

セットアップ

  1. confluentinc/examples GitHub リポジトリのクローンを作成し、6.2.4-post ブランチをチェックアウトします。

    git clone https://github.com/confluentinc/examples
    cd examples
    git checkout 6.2.4-post
    
    Copy
  2. C のサンプルのディレクトリに変更します。

    cd clients/cloud/c/
    
    Copy
  3. Kafka クラスターに接続するための構成パラメーターを含むローカルファイル($HOME/.confluent/librdkafka.config など)を作成します。以下のテンプレートのいずれかをベースとして、クラスターへの接続情報でこのファイルをカスタマイズします。{{ BROKER_ENDPOINT }}{{CLUSTER_API_KEY }}、および {{ CLUSTER_API_SECRET }} に値を代入します(この値を手作業で見つける方法については、「Configure Confluent Cloud Clients」を参照してください。 または、Confluent Cloud 向け ccloud-stack ユーティリティ を使用して値を自動作成します)。

    • Confluent Cloud のテンプレート構成ファイル

      # Kafka
      bootstrap.servers={{ BROKER_ENDPOINT }}
      security.protocol=SASL_SSL
      sasl.mechanisms=PLAIN
      sasl.username={{ CLUSTER_API_KEY }}
      sasl.password={{ CLUSTER_API_SECRET }}
      
      Copy
    • ローカルホストのテンプレート構成ファイル

      # Kafka
      bootstrap.servers=localhost:9092
      
      Copy

基本プロデューサーおよびコンシューマー

このサンプルでは、プロデューサーアプリケーションが Kafka クラスターのトピックに Kafka データを書き込みます。Kafka クラスターにトピックがまだ存在しない場合、プロデューサーアプリケーションは Kafka Admin Client API を使用してトピックを作成します。Kafka に書き込まれる各レコードには、ユーザー名(alice など)を表すキーと、json フォーマットのカウント値({"count": 0} など)があります。コンシューマーアプリケーションは、同じ Kafka トピックを読み取り、各レコードの処理中にカウントのローリング合計を保持します。

レコードの生成

  1. プロデューサーコンシューマー のサンプルアプリケーションをビルドします。

    make
    
    Copy
  2. ビルドしたアプリケーションが機能することを確認します。次のように表示されます。

    cc   consumer.c common.c json.c -o consumer -lrdkafka -lm
    cc   producer.c common.c json.c -o producer -lrdkafka -lm
    
    Copy
  3. プロデューサーを実行して、次の引数を渡します。

    • Kafka クラスターに接続するための構成パラメーターを含むローカルファイル
    • トピック名
    ./producer test1 $HOME/.confluent/librdkafka.config
    
    Copy
  4. プロデューサーがすべてのメッセージを送信したことを確認します。次のように表示されます。

    Creating topic test1
    Topic test1 successfully created
    Producing message #0 to test1: alice={ "count": 1 }
    Producing message #1 to test1: alice={ "count": 2 }
    Producing message #2 to test1: alice={ "count": 3 }
    Producing message #3 to test1: alice={ "count": 4 }
    Producing message #4 to test1: alice={ "count": 5 }
    Producing message #5 to test1: alice={ "count": 6 }
    Producing message #6 to test1: alice={ "count": 7 }
    Producing message #7 to test1: alice={ "count": 8 }
    Producing message #8 to test1: alice={ "count": 9 }
    Producing message #9 to test1: alice={ "count": 10 }
    Waiting for 10 more delivery results
    Message delivered to test1 [0] at offset 0 in 22.75ms: { "count": 1 }
    Message delivered to test1 [0] at offset 1 in 22.77ms: { "count": 2 }
    Message delivered to test1 [0] at offset 2 in 22.77ms: { "count": 3 }
    Message delivered to test1 [0] at offset 3 in 22.78ms: { "count": 4 }
    Message delivered to test1 [0] at offset 4 in 22.78ms: { "count": 5 }
    Message delivered to test1 [0] at offset 5 in 22.78ms: { "count": 6 }
    Message delivered to test1 [0] at offset 6 in 22.78ms: { "count": 7 }
    Message delivered to test1 [0] at offset 7 in 22.79ms: { "count": 8 }
    Message delivered to test1 [0] at offset 8 in 22.80ms: { "count": 9 }
    Message delivered to test1 [0] at offset 9 in 22.81ms: { "count": 10 }
    10/10 messages delivered
    
    Copy
  5. プロデューサーコード を表示します。

レコードの消費

  1. コンシューマーを実行して、次の引数を渡します。

    • 前の手順で使用したトピック名
    • Kafka クラスターに接続するための構成パラメーターを含むローカルファイル
    ./consumer test1 $HOME/.confluent/librdkafka.config
    
    Copy
  2. コンシューマーがすべてのメッセージを受信したことを確認します。次のように表示されます。

    Subscribing to test1, waiting for assignment and messages...
    Press Ctrl-C to exit.
    Received message on test1 [0] at offset 0: { "count": 1 }
    User alice sum 1
    Received message on test1 [0] at offset 1: { "count": 2 }
    User alice sum 3
    Received message on test1 [0] at offset 2: { "count": 3 }
    User alice sum 6
    Received message on test1 [0] at offset 3: { "count": 4 }
    User alice sum 10
    Received message on test1 [0] at offset 4: { "count": 5 }
    User alice sum 15
    Received message on test1 [0] at offset 5: { "count": 6 }
    User alice sum 21
    Received message on test1 [0] at offset 6: { "count": 7 }
    User alice sum 28
    Received message on test1 [0] at offset 7: { "count": 8 }
    User alice sum 36
    Received message on test1 [0] at offset 8: { "count": 9 }
    User alice sum 45
    Received message on test1 [0] at offset 9: { "count": 10 }
    User alice sum 55
    
    Copy
  3. Ctrl + C キーを押して終了します("+" はキーを同時に押すことを意味します)。

  4. コンシューマーコード を表示します。