Java Spring Boot: Apache Kafka® のサンプルコード¶
このチュートリアルでは、Apache Kafka® クラスターにメッセージを生成し、このクラスターからメッセージを消費する Java Spring Boot クライアントアプリケーションを実行します。
チュートリアルの実行後に、提供された ソースコード をリファレンスとして使用して、独自の Kafka クライアントアプリケーションを開発します。
前提条件¶
クライアント¶
- Java 1.8 以降(デモアプリケーションの実行用)
Kafka クラスター¶
- このチュートリアルは、どの環境の Kafka クラスターにも対応しています。
- Confluent Cloud 内
- ローカルホスト 上
- 任意のリモート Kafka クラスター
- Confluent Cloud で実行している場合は、API キーおよびシークレットで Confluent Cloud クラスターにアクセスできることが必要です。
- Confluent Cloud にサインアップし、プロモーションコード
C50INTEG
を使用した最初の 20 名のユーザーは、$50 相当を無料で使用できます(詳細)。 - Confluent Cloud で Kafka クラスター、資格情報、および ACL を自動的に作成する方法については、「Confluent Cloud 向け ccloud-stack ユーティリティ」を参照してください。
- Confluent Cloud にサインアップし、プロモーションコード
セットアップ¶
confluentinc/examples GitHub リポジトリのクローンを作成し、
6.1.5-post
ブランチをチェックアウトします。git clone https://github.com/confluentinc/examples cd examples git checkout 6.1.5-post
Java Spring Boot のサンプルのディレクトリに変更します。
cd clients/cloud/java-springboot/
Kafka クラスターに接続するための構成パラメーターを含むローカルファイル(
$HOME/.confluent/java.config
など)を作成します。以下のテンプレートのいずれかをベースとして、クラスターへの接続情報でこのファイルをカスタマイズします。{{ BROKER_ENDPOINT }}
、{{CLUSTER_API_KEY }}
、および{{ CLUSTER_API_SECRET }}
に値を代入します(この値を手作業で見つける方法については、「Configure Confluent Cloud Clients」を参照してください。 または、Confluent Cloud 向け ccloud-stack ユーティリティ を使用して値を自動作成します)。Confluent Cloud のテンプレート構成ファイル
# Required connection configs for Kafka producer, consumer, and admin bootstrap.servers={{ BROKER_ENDPOINT }} security.protocol=SASL_SSL sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username='{{ CLUSTER_API_KEY }}' password='{{ CLUSTER_API_SECRET }}'; sasl.mechanism=PLAIN # Required for correctness in Apache Kafka clients prior to 2.6 client.dns.lookup=use_all_dns_ips # Best practice for Kafka producer to prevent data loss acks=all
ローカルホストのテンプレート構成ファイル
# Kafka bootstrap.servers=localhost:9092
Avro と Confluent Cloud Schema Registry¶
このサンプルでは、プロデューサーアプリケーションが Kafka クラスターのトピックに Kafka データを書き込みます。Kafka クラスターにトピックがまだ存在しない場合、プロデューサーアプリケーションは Kafka Admin Client API を使用してトピックを作成します。Kafka に書き込まれる各レコードには、ユーザー名(alice
など)を表すキーと、json フォーマットのカウント値({"count": 0}
など)があります。コンシューマーアプリケーションは、同じ Kafka トピックを読み取り、各レコードの処理中にカウントのローリング合計を保持します。
Confluent Cloud GUI の「Confluent Cloud におけるスキーマ管理のクイックスタート」にある説明に従って、Confluent Cloud Schema Registry を有効にし、API キーおよびシークレットを作成して接続します。
ご使用の VPC が Confluent Cloud Schema Registry 公衆インターネットエンドポイントに接続できることを確認します。
Schema Registry に接続するための構成パラメーターを含むローカル構成ファイル(
$HOME/.confluent/java.config
など)をアップデートします。Confluent Cloud のテンプレート構成ファイル
# Required connection configs for Kafka producer, consumer, and admin bootstrap.servers={{ BROKER_ENDPOINT }} security.protocol=SASL_SSL sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username='{{ CLUSTER_API_KEY }}' password='{{ CLUSTER_API_SECRET }}'; sasl.mechanism=PLAIN # Required for correctness in Apache Kafka clients prior to 2.6 client.dns.lookup=use_all_dns_ips # Best practice for Kafka producer to prevent data loss acks=all # Required connection configs for Confluent Cloud Schema Registry schema.registry.url=https://{{ SR_ENDPOINT }} basic.auth.credentials.source=USER_INFO basic.auth.user.info={{ SR_API_KEY }}:{{ SR_API_SECRET }}
ローカルホストのテンプレート構成ファイル
# Kafka bootstrap.servers=localhost:9092 # Confluent Schema Registry schema.registry.url=http://localhost:8081
Schema Registry サブジェクトのリストを表示して Confluent Cloud Schema Registry の資格情報を確認します。次の例の
{{ SR_API_KEY }}
、{{ SR_API_SECRET }}
、および{{ SR_ENDPOINT }}
に値を代入します。curl -u {{ SR_API_KEY }}:{{ SR_API_SECRET }} https://{{ SR_ENDPOINT }}/subjects
レコードの生成と消費¶
この Spring Boot アプリケーションには、プロデューサー および コンシューマー という 2 つのコンポーネントがあり、これらは Spring Boot アプリケーションの起動時に初期化されます。プロデューサーは、Kafka クラスター内のトピックに Kafka データを書き込みます。各レコードには、ユーザー名(alice
など)を表す文字列キーと、Avro スキーマ DataRecordAvro.avsc でフォーマットされたカウント値があります。
{"namespace": "io.confluent.examples.clients.cloud",
"type": "record",
"name": "DataRecordAvro",
"fields": [
{"name": "count", "type": "long"}
]
}
次のコマンドで、プロデューサーとコンシューマーを実行します。jar がビルドされ、
spring-kafka
を使用するプロデューサーとコンシューマーが実行されます。./startProducerConsumer.sh
プロデューサーがすべてのメッセージを送信したことを確認します。次のように表示されます。
... 2020-02-13 14:41:57.924 INFO 44191 --- [ad | producer-1] i.c.e.c.c.springboot.ProducerExample : Produced record to topic test partition 3 @ offset 20 2020-02-13 14:41:57.927 INFO 44191 --- [ad | producer-1] i.c.e.c.c.springboot.ProducerExample : Produced record to topic test partition 3 @ offset 21 2020-02-13 14:41:57.927 INFO 44191 --- [ad | producer-1] i.c.e.c.c.springboot.ProducerExample : Produced record to topic test partition 3 @ offset 22 2020-02-13 14:41:57.927 INFO 44191 --- [ad | producer-1] i.c.e.c.c.springboot.ProducerExample : Produced record to topic test partition 3 @ offset 23 2020-02-13 14:41:57.928 INFO 44191 --- [ad | producer-1] i.c.e.c.c.springboot.ProducerExample : Produced record to topic test partition 3 @ offset 24 2020-02-13 14:41:57.928 INFO 44191 --- [ad | producer-1] i.c.e.c.c.springboot.ProducerExample : Produced record to topic test partition 3 @ offset 25 2020-02-13 14:41:57.928 INFO 44191 --- [ad | producer-1] i.c.e.c.c.springboot.ProducerExample : Produced record to topic test partition 3 @ offset 26 2020-02-13 14:41:57.929 INFO 44191 --- [ad | producer-1] i.c.e.c.c.springboot.ProducerExample : Produced record to topic test partition 3 @ offset 27 2020-02-13 14:41:57.929 INFO 44191 --- [ad | producer-1] i.c.e.c.c.springboot.ProducerExample : Produced record to topic test partition 3 @ offset 28 2020-02-13 14:41:57.930 INFO 44191 --- [ad | producer-1] i.c.e.c.c.springboot.ProducerExample : Produced record to topic test partition 3 @ offset 29 10 messages were produced to topic test ...
コンシューマーがすべてのメッセージを受信したことを確認します。次のように表示されます。
... 2020-02-13 14:41:58.248 INFO 44191 --- [ntainer#0-0-C-1] i.c.e.c.c.springboot.ConsumerExample : received alice {"count": 0} 2020-02-13 14:41:58.248 INFO 44191 --- [ntainer#0-0-C-1] i.c.e.c.c.springboot.ConsumerExample : received alice {"count": 1} 2020-02-13 14:41:58.248 INFO 44191 --- [ntainer#0-0-C-1] i.c.e.c.c.springboot.ConsumerExample : received alice {"count": 2} 2020-02-13 14:41:58.248 INFO 44191 --- [ntainer#0-0-C-1] i.c.e.c.c.springboot.ConsumerExample : received alice {"count": 3} 2020-02-13 14:41:58.249 INFO 44191 --- [ntainer#0-0-C-1] i.c.e.c.c.springboot.ConsumerExample : received alice {"count": 4} 2020-02-13 14:41:58.249 INFO 44191 --- [ntainer#0-0-C-1] i.c.e.c.c.springboot.ConsumerExample : received alice {"count": 5} 2020-02-13 14:41:58.249 INFO 44191 --- [ntainer#0-0-C-1] i.c.e.c.c.springboot.ConsumerExample : received alice {"count": 6} 2020-02-13 14:41:58.249 INFO 44191 --- [ntainer#0-0-C-1] i.c.e.c.c.springboot.ConsumerExample : received alice {"count": 7} 2020-02-13 14:41:58.249 INFO 44191 --- [ntainer#0-0-C-1] i.c.e.c.c.springboot.ConsumerExample : received alice {"count": 8} 2020-02-13 14:41:58.249 INFO 44191 --- [ntainer#0-0-C-1] i.c.e.c.c.springboot.ConsumerExample : received alice {"count": 9}
入力し終わったら、
Ctrl + C
キーを押します("+" はキーを同時に押すことを意味します)。プロデューサーコード と コンシューマーコード を表示します。
Kafka Streams¶
Kafka Streams API は、同じトピックから読み取り、各レコードの処理中にローリングカウントとステートフルな合計の集計を行います。
Kafka Streams アプリケーションを実行します。
./startStreams.sh
次のように出力されることを確認します。
... [Consumed record]: alice, 0 [Consumed record]: alice, 1 [Consumed record]: alice, 2 [Consumed record]: alice, 3 [Consumed record]: alice, 4 [Consumed record]: alice, 5 [Consumed record]: alice, 6 [Consumed record]: alice, 7 [Consumed record]: alice, 8 [Consumed record]: alice, 9 ... [Running count]: alice, 0 [Running count]: alice, 1 [Running count]: alice, 3 [Running count]: alice, 6 [Running count]: alice, 10 [Running count]: alice, 15 [Running count]: alice, 21 [Running count]: alice, 28 [Running count]: alice, 36 [Running count]: alice, 45 ...
入力し終わったら、
Ctrl + C
キーを押します("+" はキーを同時に押すことを意味します)。Kafka Streams コード を表示します。