Python: Apache Kafka® のサンプルコード¶
このチュートリアルでは、Apache Kafka® クラスターにメッセージを生成し、このクラスターからメッセージを消費する Python クライアントアプリケーションを実行します。
チュートリアルの実行後に、提供された ソースコード をリファレンスとして使用して、独自の Kafka クライアントアプリケーションを開発します。
前提条件¶
クライアント¶
Apache Kafka 向け Confluent Python クライアント をインストールした、正常に機能する Python 環境。
使用する
confluent-kafka
ライブラリのバージョンを確認してください。requirements.txt ファイルには、ここで示す最新のシリアル化 API に必要なバージョン 1.4.2 以降のconfluent-kafka
ライブラリが指定されています。このライブラリを手作業で、またはグローバルにインストールする場合は、同じバージョン要件が適用されます。Virtualenv を使用し、次のコマンドを実行して、インストールされているクライアントで仮想環境を作成することができます。
virtualenv ccloud-venv source ./ccloud-venv/bin/activate pip install -r requirements.txt
SSL トラストストアの構成¶
オペレーティングシステムまたは Linux ディストリビューションによっては、追加の手順を実行して SSL CA ルート証明書をセットアップする必要がある場合があります。システムで SSL CA ルート証明書が正しくセットアップされていない場合は、次のような SSL handshake failed
エラーメッセージが表示される場合があります。
%3|1605776788.619|FAIL|rdkafka#producer-1| [thrd:sasl_ssl://...confluent.cloud:9092/bootstr]: sasl_ssl://...confluent.cloud:9092/bootstrap: SSL handshake failed: error:14090086:SSL routines:ssl3_get_server_certificate:certificate verify failed: broker certificate could not be verified, verify that ssl.ca.location is correctly configured or root CA certificates are installed (brew install openssl) (after 258ms in state CONNECT)
この場合は、検証済み CA ルート証明書のセットを手作業でインストールする必要があります。また、クライアントコードを変更して ssl.ca.location
構成プロパティを設定する必要がある場合もあります(詳細については、このクライアントの基盤である librdkafka のドキュメントを参照してください)。
macOS¶
より新しい macOS バージョン(10.15 など)では、依存関係の追加が必要な場合があります。
Python クライアントの場合:
pip install certifi
その他のクライアントの場合:
brew install openssl
CA ルート証明書をインストールしたら、クライアントコードで ssl.ca.location
プロパティを設定します。プロデューサーとコンシューマーのコードファイルを両方とも編集し、プロデューサーおよびコンシューマーのプロパティ内に ssl.ca.location
構成パラメーターを追加します。その値は、ホスト上の適切な CA ルート証明書ファイルの場所に対応している必要があります。
Python クライアントの場合は、certifi.where()
を使用して、証明書ファイルの場所を決定します。
ssl.ca.location: certifi.where()
その他のクライアントの場合は、インストールパスを調べ、それをコードで指定します。
ssl.ca.location: '/usr/local/etc/openssl@1.1/cert.pem'
CentOS¶
次の方法で CA ルート証明書をインストールする必要がある場合があります。
sudo yum reinstall ca-certificates
これで、Kafka クライアントは証明書を見つけることができます。ただし、同じエラーが依然として表示される場合は、クライアントコードで ssl.ca.location
プロパティを設定できます。プロデューサーとコンシューマーのコードファイルを両方とも編集し、プロデューサーおよびコンシューマーのプロパティ内に ssl.ca.location
構成パラメーターを追加します。その値は、次の例のように、ホスト上の適切な CA ルート証明書ファイルの場所に対応している必要があります。
ssl.ca.location: '/etc/ssl/certs/ca-bundle.crt'
Kafka クラスター¶
- このチュートリアルは、どの環境の Kafka クラスターにも対応しています。
- Confluent Cloud 内
- ローカルホスト 上
- 任意のリモート Kafka クラスター
- Confluent Cloud で実行している場合は、API キーおよびシークレットで Confluent Cloud クラスターにアクセスできることが必要です。
- Confluent Cloud にサインアップし、プロモーションコード
C50INTEG
を使用した最初の 20 名のユーザーは、$50 相当を無料で使用できます(詳細)。 - Confluent Cloud で Kafka クラスター、資格情報、および ACL を自動的に作成する方法については、「Confluent Cloud 向け ccloud-stack ユーティリティ」を参照してください。
- Confluent Cloud にサインアップし、プロモーションコード
セットアップ¶
confluentinc/examples GitHub リポジトリのクローンを作成し、
6.1.5-post
ブランチをチェックアウトします。git clone https://github.com/confluentinc/examples cd examples git checkout 6.1.5-post
Python のサンプルのディレクトリに変更します。
cd clients/cloud/python/
Kafka クラスターに接続するための構成パラメーターを含むローカルファイル(
$HOME/.confluent/librdkafka.config
など)を作成します。以下のテンプレートのいずれかをベースとして、クラスターへの接続情報でこのファイルをカスタマイズします。{{ BROKER_ENDPOINT }}
、{{CLUSTER_API_KEY }}
、および{{ CLUSTER_API_SECRET }}
に値を代入します(この値を手作業で見つける方法については、「Configure Confluent Cloud Clients」を参照してください。 または、Confluent Cloud 向け ccloud-stack ユーティリティ を使用して値を自動作成します)。Confluent Cloud のテンプレート構成ファイル
# Kafka bootstrap.servers={{ BROKER_ENDPOINT }} security.protocol=SASL_SSL sasl.mechanisms=PLAIN sasl.username={{ CLUSTER_API_KEY }} sasl.password={{ CLUSTER_API_SECRET }}
ローカルホストのテンプレート構成ファイル
# Kafka bootstrap.servers=localhost:9092
基本プロデューサーおよびコンシューマー¶
このサンプルでは、プロデューサーアプリケーションが Kafka クラスターのトピックに Kafka データを書き込みます。Kafka クラスターにトピックがまだ存在しない場合、プロデューサーアプリケーションは Kafka Admin Client API を使用してトピックを作成します。Kafka に書き込まれる各レコードには、ユーザー名(alice
など)を表すキーと、json フォーマットのカウント値({"count": 0}
など)があります。コンシューマーアプリケーションは、同じ Kafka トピックを読み取り、各レコードの処理中にカウントのローリング合計を保持します。
レコードの生成¶
プロデューサーを実行して、次の引数を渡します。
- Kafka クラスターに接続するための構成パラメーターを含むローカルファイル
- トピック名
./producer.py -f $HOME/.confluent/librdkafka.config -t test1
プロデューサーがすべてのメッセージを送信したことを確認します。次のように表示されます。
Producing record: alice {"count": 0} Producing record: alice {"count": 1} Producing record: alice {"count": 2} Producing record: alice {"count": 3} Producing record: alice {"count": 4} Producing record: alice {"count": 5} Producing record: alice {"count": 6} Producing record: alice {"count": 7} Producing record: alice {"count": 8} Producing record: alice {"count": 9} Produced record to topic test1 partition [0] @ offset 0 Produced record to topic test1 partition [0] @ offset 1 Produced record to topic test1 partition [0] @ offset 2 Produced record to topic test1 partition [0] @ offset 3 Produced record to topic test1 partition [0] @ offset 4 Produced record to topic test1 partition [0] @ offset 5 Produced record to topic test1 partition [0] @ offset 6 Produced record to topic test1 partition [0] @ offset 7 Produced record to topic test1 partition [0] @ offset 8 Produced record to topic test1 partition [0] @ offset 9 10 messages were produced to topic test1!
プロデューサーコード を表示します。
レコードの消費¶
コンシューマーを実行して、次の引数を渡します。
- Kafka クラスターに接続するための構成パラメーターを含むローカルファイル
- 手順 1 で使用したものと同じトピック名
./consumer.py -f $HOME/.confluent/librdkafka.config -t test1
コンシューマーがすべてのメッセージを受信したことを確認します。次のように表示されます。
... Waiting for message or event/error in poll() Consumed record with key alice and value {"count": 0}, and updated total count to 0 Consumed record with key alice and value {"count": 1}, and updated total count to 1 Consumed record with key alice and value {"count": 2}, and updated total count to 3 Consumed record with key alice and value {"count": 3}, and updated total count to 6 Consumed record with key alice and value {"count": 4}, and updated total count to 10 Consumed record with key alice and value {"count": 5}, and updated total count to 15 Consumed record with key alice and value {"count": 6}, and updated total count to 21 Consumed record with key alice and value {"count": 7}, and updated total count to 28 Consumed record with key alice and value {"count": 8}, and updated total count to 36 Consumed record with key alice and value {"count": 9}, and updated total count to 45 Waiting for message or event/error in poll() ...
コンシューマーコード を表示します。
Avro と Confluent Cloud Schema Registry¶
このサンプルは、前のサンプルと似ていますが、値は Avro フォーマットです。また、Confluent Cloud Schema Registry と統合されています。Confluent Cloud Schema Registry を使用する前に、その 可用性と制限 を確認してください。
これらのサンプルでは、confluent-kafka
ライブラリが提供する最新のシリアライザー API を使用します。シリアライザー API は、従来の AvroProducer および AvroConsumer クラスに代わって機能し、JSON、Protobuf、および Avro データフォーマットの追加サポートを含む、より柔軟性の高い API を提供します。詳細については、最新の confluent-kafka のドキュメント を参照してください。
Confluent Cloud GUI の「Confluent Cloud におけるスキーマ管理のクイックスタート」にある説明に従って、Confluent Cloud Schema Registry を有効にし、API キーおよびシークレットを作成して接続します。
ご使用の VPC が Confluent Cloud Schema Registry 公衆インターネットエンドポイントに接続できることを確認します。
Schema Registry に接続するための構成パラメーターを含むローカル構成ファイル(
$HOME/.confluent/librdkafka.config
など)をアップデートします。Confluent Cloud のテンプレート構成ファイル
# Kafka bootstrap.servers={{ BROKER_ENDPOINT }} security.protocol=SASL_SSL sasl.mechanisms=PLAIN sasl.username={{ CLUSTER_API_KEY }} sasl.password={{ CLUSTER_API_SECRET }} # Confluent Cloud Schema Registry schema.registry.url=https://{{ SR_ENDPOINT }} basic.auth.credentials.source=USER_INFO basic.auth.user.info={{ SR_API_KEY }}:{{ SR_API_SECRET }}
ローカルホストのテンプレート構成ファイル
# Kafka bootstrap.servers=localhost:9092 # Confluent Schema Registry schema.registry.url=http://localhost:8081
Schema Registry サブジェクトのリストを表示して Confluent Cloud Schema Registry の資格情報を確認します。次の例の
{{ SR_API_KEY }}
、{{ SR_API_SECRET }}
、および{{ SR_ENDPOINT }}
に値を代入します。curl -u {{ SR_API_KEY }}:{{ SR_API_SECRET }} https://{{ SR_ENDPOINT }}/subjects
Avro レコードの生成¶
Avro プロデューサーを実行して、次の引数を渡します。
- Kafka クラスターに接続するための構成パラメーターを含むローカルファイル
- トピック名
./producer_ccsr.py -f $HOME/.confluent/librdkafka.config -t test2
プロデューサーがすべてのメッセージを送信したことを確認します。次のように表示されます。
Producing Avro record: alice 0 Producing Avro record: alice 1 Producing Avro record: alice 2 Producing Avro record: alice 3 Producing Avro record: alice 4 Producing Avro record: alice 5 Producing Avro record: alice 6 Producing Avro record: alice 7 Producing Avro record: alice 8 Producing Avro record: alice 9 Produced record to topic test2 partition [0] @ offset 0 Produced record to topic test2 partition [0] @ offset 1 Produced record to topic test2 partition [0] @ offset 2 Produced record to topic test2 partition [0] @ offset 3 Produced record to topic test2 partition [0] @ offset 4 Produced record to topic test2 partition [0] @ offset 5 Produced record to topic test2 partition [0] @ offset 6 Produced record to topic test2 partition [0] @ offset 7 Produced record to topic test2 partition [0] @ offset 8 Produced record to topic test2 partition [0] @ offset 9 10 messages were produced to topic test2!
プロデューサー Avro コード を表示します。
Avro レコードの消費¶
Avro コンシューマーを実行して、次の引数を渡します。
- Kafka クラスターに接続するための構成パラメーターを含むローカルファイル
- 手順 5 で使用したものと同じトピック名
./consumer_ccsr.py -f $HOME/.confluent/librdkafka.config -t test2
コンシューマーがすべてのメッセージを受信したことを確認します。次のように表示されます。
./consumer_ccsr.py -f $HOME/.confluent/librdkafka.config -t test2 ... Waiting for message or event/error in poll() Consumed record with key alice and value 0, and updated total count to 0 Consumed record with key alice and value 1, and updated total count to 1 Consumed record with key alice and value 2, and updated total count to 3 Consumed record with key alice and value 3, and updated total count to 6 Consumed record with key alice and value 4, and updated total count to 10 Consumed record with key alice and value 5, and updated total count to 15 Consumed record with key alice and value 6, and updated total count to 21 Consumed record with key alice and value 7, and updated total count to 28 Consumed record with key alice and value 8, and updated total count to 36 Consumed record with key alice and value 9, and updated total count to 45 ...
コンシューマー Avro コード を表示します。
Confluent Cloud Schema Registry¶
Confluent Cloud Schema Registry に登録されたスキーマサブジェクトを表示します。次の出力の
<SR API KEY>
、<SR API SECRET>
、および<SR ENDPOINT>
に値を代入します。curl -u <SR API KEY>:<SR API SECRET> https://<SR ENDPOINT>/subjects
サブジェクト
test2-value
が存在することを確認します。["test2-value"]
サブジェクト test2-value のスキーマ情報を表示します。次の出力の
<SR API KEY>
、<SR API SECRET>
、および<SR ENDPOINT>
に値を代入します。curl -u <SR API KEY>:<SR API SECRET> https://<SR ENDPOINT>/subjects/test2-value/versions/1
サブジェクト
test2-value
のスキーマ情報を確認します。{"subject":"test2-value","version":1,"id":100001,"schema":"{\"name\":\"io.confluent.examples.clients.cloud.DataRecordAvro\",\"type\":\"record\",\"fields\":[{\"name\":\"count\",\"type\":\"long\"}]}"}
Docker でのすべてのコードの実行¶
前のコードをすべて、Docker 内で実行することもできます。
$HOME/.confluent/librdkafka.config
で Kafka クラスターに接続するための構成パラメーターを含むローカルファイルを作成していることを確認します。カスタム Docker イメージをビルドする Dockerfile を表示します。
FROM python:3.7-slim COPY requirements.txt /tmp/requirements.txt RUN pip3 install -U -r /tmp/requirements.txt COPY *.py ./
次のコマンドを実行して Docker イメージをビルドします。
docker build -t cloud-demo-python .
次のコマンドを実行して Docker イメージを実行します。
docker run -v $HOME/.confluent/librdkafka.config:/root/.confluent/librdkafka.config -it --rm cloud-demo-python bash
コンテナーシェル内から Python アプリケーションを実行します。詳細については、前のセクションを参照してください。
root@6970a2a9e65b:/# ./producer.py -f $HOME/.confluent/librdkafka.config -t test1 root@6970a2a9e65b:/# ./consumer.py -f $HOME/.confluent/librdkafka.config -t test1 root@6970a2a9e65b:/# ./producer_ccsr.py -f $HOME/.confluent/librdkafka.config -t test2 root@6970a2a9e65b:/# ./consumer_ccsr.py -f $HOME/.confluent/librdkafka.config -t test2