Java: Apache Kafka® のサンプルコード¶
このチュートリアルでは、Apache Kafka® クラスターにメッセージを生成し、このクラスターからメッセージを消費する Java クライアントアプリケーションを実行します。
チュートリアルの実行後に、提供された ソースコード をリファレンスとして使用して、独自の Kafka クライアントアプリケーションを開発します。
前提条件¶
クライアント¶
- Java 1.8 以降(デモアプリケーションの実行用)
- Maven(デモアプリケーションのコンパイル用)
Kafka クラスター¶
- このチュートリアルは、どの環境の Kafka クラスターにも対応しています。
- Confluent Cloud 内
- ローカルホスト 上
- 任意のリモート Kafka クラスター
- Confluent Cloud で実行している場合は、API キーおよびシークレットで Confluent Cloud クラスターにアクセスできることが必要です。
- Confluent Cloud にサインアップし、プロモーションコード
C50INTEG
を使用した最初の 20 名のユーザーは、$50 相当を無料で使用できます(詳細)。 - Confluent Cloud で Kafka クラスター、資格情報、および ACL を自動的に作成する方法については、「Confluent Cloud 向け ccloud-stack ユーティリティ」を参照してください。
- Confluent Cloud にサインアップし、プロモーションコード
セットアップ¶
confluentinc/examples GitHub リポジトリのクローンを作成し、
6.1.5-post
ブランチをチェックアウトします。git clone https://github.com/confluentinc/examples cd examples git checkout 6.1.5-post
Java のサンプルのディレクトリに変更します。
cd clients/cloud/java/
Kafka クラスターに接続するための構成パラメーターを含むローカルファイル(
$HOME/.confluent/java.config
など)を作成します。以下のテンプレートのいずれかをベースとして、クラスターへの接続情報でこのファイルをカスタマイズします。{{ BROKER_ENDPOINT }}
、{{CLUSTER_API_KEY }}
、および{{ CLUSTER_API_SECRET }}
に値を代入します(この値を手作業で見つける方法については、「Configure Confluent Cloud Clients」を参照してください。 または、Confluent Cloud 向け ccloud-stack ユーティリティ を使用して値を自動作成します)。Confluent Cloud のテンプレート構成ファイル
# Required connection configs for Kafka producer, consumer, and admin bootstrap.servers={{ BROKER_ENDPOINT }} security.protocol=SASL_SSL sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username='{{ CLUSTER_API_KEY }}' password='{{ CLUSTER_API_SECRET }}'; sasl.mechanism=PLAIN # Required for correctness in Apache Kafka clients prior to 2.6 client.dns.lookup=use_all_dns_ips # Best practice for Kafka producer to prevent data loss acks=all
ローカルホストのテンプレート構成ファイル
# Kafka bootstrap.servers=localhost:9092
基本プロデューサーおよびコンシューマー、Kafka Streams¶
このサンプルでは、プロデューサーアプリケーションが Kafka クラスターのトピックに Kafka データを書き込みます。Kafka クラスターにトピックがまだ存在しない場合、プロデューサーアプリケーションは Kafka Admin Client API を使用してトピックを作成します。Kafka に書き込まれる各レコードには、ユーザー名(alice
など)を表すキーと、json フォーマットのカウント値({"count": 0}
など)があります。コンシューマーアプリケーションは、同じ Kafka トピックを読み取り、各レコードの処理中にカウントのローリング合計を保持します。
レコードの生成¶
Java コードをコンパイルします。
mvn clean package
プロデューサーを実行して、次の引数を渡します。
- Kafka クラスターに接続するための構成パラメーターを含むローカルファイル
- トピック名
mvn exec:java -Dexec.mainClass="io.confluent.examples.clients.cloud.ProducerExample" \ -Dexec.args="$HOME/.confluent/java.config test1"
プロデューサーがすべてのメッセージを送信したことを確認します。次のように表示されます。
... Producing record: alice {"count":0} Producing record: alice {"count":1} Producing record: alice {"count":2} Producing record: alice {"count":3} Producing record: alice {"count":4} Producing record: alice {"count":5} Producing record: alice {"count":6} Producing record: alice {"count":7} Producing record: alice {"count":8} Producing record: alice {"count":9} Produced record to topic test1 partition [0] @ offset 0 Produced record to topic test1 partition [0] @ offset 1 Produced record to topic test1 partition [0] @ offset 2 Produced record to topic test1 partition [0] @ offset 3 Produced record to topic test1 partition [0] @ offset 4 Produced record to topic test1 partition [0] @ offset 5 Produced record to topic test1 partition [0] @ offset 6 Produced record to topic test1 partition [0] @ offset 7 Produced record to topic test1 partition [0] @ offset 8 Produced record to topic test1 partition [0] @ offset 9 10 messages were produced to topic test1 ...
プロデューサーコード を表示します。
レコードの消費¶
コンシューマーを実行して、次の引数を渡します。
- Kafka クラスターに接続するための構成パラメーターを含むローカルファイル
- 前の手順で使用したトピック名
mvn exec:java -Dexec.mainClass="io.confluent.examples.clients.cloud.ConsumerExample" \ -Dexec.args="$HOME/.confluent/java.config test1"
コンシューマーがすべてのメッセージを受信したことを確認します。次のように表示されます。
... Polling Consumed record with key alice and value {"count":0}, and updated total count to 0 Consumed record with key alice and value {"count":1}, and updated total count to 1 Consumed record with key alice and value {"count":2}, and updated total count to 3 Consumed record with key alice and value {"count":3}, and updated total count to 6 Consumed record with key alice and value {"count":4}, and updated total count to 10 Consumed record with key alice and value {"count":5}, and updated total count to 15 Consumed record with key alice and value {"count":6}, and updated total count to 21 Consumed record with key alice and value {"count":7}, and updated total count to 28 Consumed record with key alice and value {"count":8}, and updated total count to 36 Consumed record with key alice and value {"count":9}, and updated total count to 45 Polling ...
入力し終わったら、
Ctrl + C
キーを押します("+" はキーを同時に押すことを意味します)。コンシューマーコード を表示します。
Kafka Streams¶
Kafka Streams アプリケーションを実行して、次の引数を渡します。
- Kafka クラスターに接続するための構成パラメーターを含むローカルファイル
- 前の手順で使用したトピック名
mvn exec:java -Dexec.mainClass="io.confluent.examples.clients.cloud.StreamsExample" \ -Dexec.args="$HOME/.confluent/java.config test1"
Kafka Streams アプリケーションがすべてのメッセージを処理したことを確認します。次のように表示されます。
... [Consumed record]: alice, 0 [Consumed record]: alice, 1 [Consumed record]: alice, 2 [Consumed record]: alice, 3 [Consumed record]: alice, 4 [Consumed record]: alice, 5 [Consumed record]: alice, 6 [Consumed record]: alice, 7 [Consumed record]: alice, 8 [Consumed record]: alice, 9 ... [Running count]: alice, 0 [Running count]: alice, 1 [Running count]: alice, 3 [Running count]: alice, 6 [Running count]: alice, 10 [Running count]: alice, 15 [Running count]: alice, 21 [Running count]: alice, 28 [Running count]: alice, 36 [Running count]: alice, 45 ...
入力し終わったら、
Ctrl + C
キーを押します("+" はキーを同時に押すことを意味します)。Kafka Streams コード を表示します。
Avro と Confluent Cloud Schema Registry¶
このサンプルは、前のサンプルと似ていますが、値は Avro フォーマットです。また、Confluent Cloud Schema Registry と統合されています。Confluent Cloud Schema Registry を使用する前に、その 可用性と制限 を確認してください。
Confluent Cloud GUI の「Confluent Cloud におけるスキーマ管理のクイックスタート」にある説明に従って、Confluent Cloud Schema Registry を有効にし、API キーおよびシークレットを作成して接続します。
ご使用の VPC が Confluent Cloud Schema Registry 公衆インターネットエンドポイントに接続できることを確認します。
Schema Registry に接続するための構成パラメーターを含むローカル構成ファイル(
$HOME/.confluent/java.config
など)をアップデートします。Confluent Cloud のテンプレート構成ファイル
# Required connection configs for Kafka producer, consumer, and admin bootstrap.servers={{ BROKER_ENDPOINT }} security.protocol=SASL_SSL sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username='{{ CLUSTER_API_KEY }}' password='{{ CLUSTER_API_SECRET }}'; sasl.mechanism=PLAIN # Required for correctness in Apache Kafka clients prior to 2.6 client.dns.lookup=use_all_dns_ips # Best practice for Kafka producer to prevent data loss acks=all # Required connection configs for Confluent Cloud Schema Registry schema.registry.url=https://{{ SR_ENDPOINT }} basic.auth.credentials.source=USER_INFO basic.auth.user.info={{ SR_API_KEY }}:{{ SR_API_SECRET }}
ローカルホストのテンプレート構成ファイル
# Kafka bootstrap.servers=localhost:9092 # Confluent Schema Registry schema.registry.url=http://localhost:8081
Schema Registry サブジェクトのリストを表示して Confluent Cloud Schema Registry の資格情報を確認します。次の例の
{{ SR_API_KEY }}
、{{ SR_API_SECRET }}
、および{{ SR_ENDPOINT }}
に値を代入します。curl -u {{ SR_API_KEY }}:{{ SR_API_SECRET }} https://{{ SR_ENDPOINT }}/subjects
Avro レコードの生成¶
Avro プロデューサーを実行して、次の引数を渡します。
- Kafka クラスターに接続するための構成パラメーターを含むローカルファイル
- トピック名
mvn exec:java -Dexec.mainClass="io.confluent.examples.clients.cloud.ProducerAvroExample" \ -Dexec.args="$HOME/.confluent/java.config test2"
プロデューサー Avro コード を表示します。
Avro レコードの消費¶
Avro コンシューマーを実行して、次の引数を渡します。
- Kafka クラスターに接続するための構成パラメーターを含むローカルファイル
- トピック名
mvn exec:java -Dexec.mainClass="io.confluent.examples.clients.cloud.ConsumerAvroExample" \ -Dexec.args="$HOME/.confluent/java.config test2"
コンシューマー Avro コード を表示します。
Avro Kafka Streams¶
Avro Kafka Streams アプリケーションを実行して、次の引数を渡します。
- Kafka クラスターに接続するための構成パラメーターを含むローカルファイル
- 前の手順で使用したものと同じトピック名
mvn exec:java -Dexec.mainClass="io.confluent.examples.clients.cloud.StreamsAvroExample" \ -Dexec.args="$HOME/.confluent/java.config test2"
Kafka Streams Avro コード を表示します。
Confluent Cloud Schema Registry によるスキーマ進化¶
Confluent Cloud Schema Registry に登録されたスキーマサブジェクトを表示します。次の出力の
<SR API KEY>
、<SR API SECRET>
、および<SR ENDPOINT>
に値を代入します。curl -u <SR API KEY>:<SR API SECRET> https://<SR ENDPOINT>/subjects
サブジェクト
test2-value
が存在することを確認します。["test2-value"]
サブジェクト test2-value のスキーマ情報を表示します。次の出力の
<SR API KEY>
、<SR API SECRET>
、および<SR ENDPOINT>
に値を代入します。curl -u <SR API KEY>:<SR API SECRET> https://<SR ENDPOINT>/subjects/test2-value/versions/1
サブジェクト
test2-value
のスキーマ情報を確認します。{"subject":"test2-value","version":1,"id":100001,"schema":"{\"name\":\"io.confluent.examples.clients.cloud.DataRecordAvro\",\"type\":\"record\",\"fields\":[{\"name\":\"count\",\"type\":\"long\"}]}"}
スキーマを進化させるために、Confluent Cloud Schema Registry で、スキーマの新しいバージョンと古いバージョンの間の スキーマの互換性をテスト することができます。pom.xml では、スキーマレジストリのサブジェクト名が
test2-value
にハードコーディングされます。トピック名test2
を使用しなかった場合は、これを変更してください。続けて、DataRecordAvro2a.avsc のローカルスキーマ互換性をテストすると、これは失敗します。DataRecordAvro2b.avsc のローカルスキーマ互換性をテストすると、これは成功します。# DataRecordAvro2a.avsc compatibility test: FAIL mvn schema-registry:test-compatibility "-DschemaRegistryUrl=https://{{ SR_ENDPOINT }}" "-DschemaRegistryBasicAuthUserInfo={{ SR_API_KEY }}:{{ SR_API_SECRET }}" "-DschemaLocal=src/main/resources/avro/io/confluent/examples/clients/cloud/DataRecordAvro2a.avsc" # DataRecordAvro2b.avsc compatibility test: PASS mvn schema-registry:test-compatibility "-DschemaRegistryUrl=https://{{ SR_ENDPOINT }}" "-DschemaRegistryBasicAuthUserInfo={{ SR_API_KEY }}:{{ SR_API_SECRET }}" "-DschemaLocal=src/main/resources/avro/io/confluent/examples/clients/cloud/DataRecordAvro2b.avsc"