Java Spring Boot: Apache Kafka® のサンプルコード

このチュートリアルでは、Apache Kafka® クラスターにメッセージを生成し、このクラスターからメッセージを消費する Java Spring Boot クライアントアプリケーションを実行します。

チュートリアルの実行後に、提供された ソースコード をリファレンスとして使用して、独自の Kafka クライアントアプリケーションを開発します。

前提条件

クライアント

  • Java 1.8 以降(デモアプリケーションの実行用)

Kafka クラスター

このチュートリアルは、Confluent Cloud を使用して取り組むのが最も簡単です。そうすればローカル Kafka クラスターを実行する必要はありません。Confluent Cloud にサインアップするときは、プロモーションコード C50INTEG を適用すると、$50 相当を無料で使用できます(詳細)。Console から LEARN をクリックしてクラスターをプロビジョニングし、Clients をクリックして、クライアントアプリケーションに対して設定するクラスター固有の構成と認証情報を取得します。サポートされている CLIREST API、またはコミュニティでサポートされている Confluent Cloud 向け ccloud-stack ユーティリティ を使用してもかまいません。

Confluent Cloud の使用を希望しない場合は、ローカルホスト や他のリモートサーバーで実行されている Kafka クラスターでこのチュートリアルに取り組むこともできます。

セットアップ

  1. confluentinc/examples GitHub リポジトリのクローンを作成し、6.2.4-post ブランチをチェックアウトします。

    git clone https://github.com/confluentinc/examples
    cd examples
    git checkout 6.2.4-post
    
  2. Java Spring Boot のサンプルのディレクトリに変更します。

    cd clients/cloud/java-springboot/
    
  3. Kafka クラスターに接続するための構成パラメーターを含むローカルファイル($HOME/.confluent/java.config など)を作成します。以下のテンプレートのいずれかをベースとして、クラスターへの接続情報でこのファイルをカスタマイズします。{{ BROKER_ENDPOINT }}{{CLUSTER_API_KEY }}、および {{ CLUSTER_API_SECRET }} に値を代入します(この値を手作業で見つける方法については、「Configure Confluent Cloud Clients」を参照してください。 または、Confluent Cloud 向け ccloud-stack ユーティリティ を使用して値を自動作成します)。

    • Confluent Cloud のテンプレート構成ファイル

      # Required connection configs for Kafka producer, consumer, and admin
      bootstrap.servers={{ BROKER_ENDPOINT }}
      security.protocol=SASL_SSL
      sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username='{{ CLUSTER_API_KEY }}' password='{{ CLUSTER_API_SECRET }}';
      sasl.mechanism=PLAIN
      # Required for correctness in Apache Kafka clients prior to 2.6
      client.dns.lookup=use_all_dns_ips
      
      # Best practice for Kafka producer to prevent data loss 
      acks=all
      
    • ローカルホストのテンプレート構成ファイル

      # Kafka
      bootstrap.servers=localhost:9092
      

Avro と Confluent Cloud Schema Registry

このサンプルでは、プロデューサーアプリケーションが Kafka クラスターのトピックに Kafka データを書き込みます。Kafka クラスターにトピックがまだ存在しない場合、プロデューサーアプリケーションは Kafka Admin Client API を使用してトピックを作成します。Kafka に書き込まれる各レコードには、ユーザー名(alice など)を表すキーと、json フォーマットのカウント値({"count": 0} など)があります。コンシューマーアプリケーションは、同じ Kafka トピックを読み取り、各レコードの処理中にカウントのローリング合計を保持します。

  1. Confluent Cloud Console の「Confluent Cloud におけるスキーマ管理のクイックスタート」にある説明に従って、Confluent Cloud Schema Registry を有効にし、API キーおよびシークレットを作成して接続します。

  2. ご使用の VPC が Confluent Cloud Schema Registry 公衆インターネットエンドポイントに接続できることを確認します。

  3. Schema Registry に接続するための構成パラメーターを含むローカル構成ファイル($HOME/.confluent/java.config など)をアップデートします。

    • Confluent Cloud のテンプレート構成ファイル

      # Required connection configs for Kafka producer, consumer, and admin
      bootstrap.servers={{ BROKER_ENDPOINT }}
      security.protocol=SASL_SSL
      sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username='{{ CLUSTER_API_KEY }}' password='{{ CLUSTER_API_SECRET }}';
      sasl.mechanism=PLAIN
      # Required for correctness in Apache Kafka clients prior to 2.6
      client.dns.lookup=use_all_dns_ips
      
      # Best practice for Kafka producer to prevent data loss 
      acks=all
      
      # Required connection configs for Confluent Cloud Schema Registry
      schema.registry.url=https://{{ SR_ENDPOINT }}
      basic.auth.credentials.source=USER_INFO
      basic.auth.user.info={{ SR_API_KEY }}:{{ SR_API_SECRET }}
      
    • ローカルホストのテンプレート構成ファイル

      # Kafka
      bootstrap.servers=localhost:9092
      
      # Confluent Schema Registry
      schema.registry.url=http://localhost:8081
      
  4. Schema Registry サブジェクトのリストを表示して Confluent Cloud Schema Registry の資格情報を確認します。次の例の {{ SR_API_KEY }}{{ SR_API_SECRET }}、および {{ SR_ENDPOINT }} に値を代入します。

    curl -u {{ SR_API_KEY }}:{{ SR_API_SECRET }} https://{{ SR_ENDPOINT }}/subjects
    

レコードの生成と消費

この Spring Boot アプリケーションには、プロデューサー および コンシューマー という 2 つのコンポーネントがあり、これらは Spring Boot アプリケーションの起動時に初期化されます。プロデューサーは、Kafka クラスター内のトピックに Kafka データを書き込みます。各レコードには、ユーザー名(alice など)を表す文字列キーと、Avro スキーマ DataRecordAvro.avsc でフォーマットされたカウント値があります。

{"namespace": "io.confluent.examples.clients.cloud",
 "type": "record",
 "name": "DataRecordAvro",
 "fields": [
     {"name": "count", "type": "long"}
 ]
}
  1. 次のコマンドで、プロデューサーとコンシューマーを実行します。jar がビルドされ、spring-kafka を使用するプロデューサーとコンシューマーが実行されます。

    ./startProducerConsumer.sh
    
  2. プロデューサーがすべてのメッセージを送信したことを確認します。次のように表示されます。

    ...
    2020-02-13 14:41:57.924  INFO 44191 --- [ad | producer-1] i.c.e.c.c.springboot.ProducerExample     : Produced record to topic test partition 3 @ offset 20
    2020-02-13 14:41:57.927  INFO 44191 --- [ad | producer-1] i.c.e.c.c.springboot.ProducerExample     : Produced record to topic test partition 3 @ offset 21
    2020-02-13 14:41:57.927  INFO 44191 --- [ad | producer-1] i.c.e.c.c.springboot.ProducerExample     : Produced record to topic test partition 3 @ offset 22
    2020-02-13 14:41:57.927  INFO 44191 --- [ad | producer-1] i.c.e.c.c.springboot.ProducerExample     : Produced record to topic test partition 3 @ offset 23
    2020-02-13 14:41:57.928  INFO 44191 --- [ad | producer-1] i.c.e.c.c.springboot.ProducerExample     : Produced record to topic test partition 3 @ offset 24
    2020-02-13 14:41:57.928  INFO 44191 --- [ad | producer-1] i.c.e.c.c.springboot.ProducerExample     : Produced record to topic test partition 3 @ offset 25
    2020-02-13 14:41:57.928  INFO 44191 --- [ad | producer-1] i.c.e.c.c.springboot.ProducerExample     : Produced record to topic test partition 3 @ offset 26
    2020-02-13 14:41:57.929  INFO 44191 --- [ad | producer-1] i.c.e.c.c.springboot.ProducerExample     : Produced record to topic test partition 3 @ offset 27
    2020-02-13 14:41:57.929  INFO 44191 --- [ad | producer-1] i.c.e.c.c.springboot.ProducerExample     : Produced record to topic test partition 3 @ offset 28
    2020-02-13 14:41:57.930  INFO 44191 --- [ad | producer-1] i.c.e.c.c.springboot.ProducerExample     : Produced record to topic test partition 3 @ offset 29
    10 messages were produced to topic test
    ...
    
  3. コンシューマーがすべてのメッセージを受信したことを確認します。次のように表示されます。

    ...
    2020-02-13 14:41:58.248  INFO 44191 --- [ntainer#0-0-C-1] i.c.e.c.c.springboot.ConsumerExample     : received alice {"count": 0}
    2020-02-13 14:41:58.248  INFO 44191 --- [ntainer#0-0-C-1] i.c.e.c.c.springboot.ConsumerExample     : received alice {"count": 1}
    2020-02-13 14:41:58.248  INFO 44191 --- [ntainer#0-0-C-1] i.c.e.c.c.springboot.ConsumerExample     : received alice {"count": 2}
    2020-02-13 14:41:58.248  INFO 44191 --- [ntainer#0-0-C-1] i.c.e.c.c.springboot.ConsumerExample     : received alice {"count": 3}
    2020-02-13 14:41:58.249  INFO 44191 --- [ntainer#0-0-C-1] i.c.e.c.c.springboot.ConsumerExample     : received alice {"count": 4}
    2020-02-13 14:41:58.249  INFO 44191 --- [ntainer#0-0-C-1] i.c.e.c.c.springboot.ConsumerExample     : received alice {"count": 5}
    2020-02-13 14:41:58.249  INFO 44191 --- [ntainer#0-0-C-1] i.c.e.c.c.springboot.ConsumerExample     : received alice {"count": 6}
    2020-02-13 14:41:58.249  INFO 44191 --- [ntainer#0-0-C-1] i.c.e.c.c.springboot.ConsumerExample     : received alice {"count": 7}
    2020-02-13 14:41:58.249  INFO 44191 --- [ntainer#0-0-C-1] i.c.e.c.c.springboot.ConsumerExample     : received alice {"count": 8}
    2020-02-13 14:41:58.249  INFO 44191 --- [ntainer#0-0-C-1] i.c.e.c.c.springboot.ConsumerExample     : received alice {"count": 9}
    
  4. 入力し終わったら、Ctrl + C キーを押します("+" はキーを同時に押すことを意味します)。

  5. プロデューサーコードコンシューマーコード を表示します。

Kafka Streams

Kafka Streams API は、同じトピックから読み取り、各レコードの処理中にローリングカウントとステートフルな合計の集計を行います。

  1. Kafka Streams アプリケーションを実行します。

    ./startStreams.sh
    
  2. 次のように出力されることを確認します。

    ...
    [Consumed record]: alice, 0
    [Consumed record]: alice, 1
    [Consumed record]: alice, 2
    [Consumed record]: alice, 3
    [Consumed record]: alice, 4
    [Consumed record]: alice, 5
    [Consumed record]: alice, 6
    [Consumed record]: alice, 7
    [Consumed record]: alice, 8
    [Consumed record]: alice, 9
    ...
    [Running count]: alice, 0
    [Running count]: alice, 1
    [Running count]: alice, 3
    [Running count]: alice, 6
    [Running count]: alice, 10
    [Running count]: alice, 15
    [Running count]: alice, 21
    [Running count]: alice, 28
    [Running count]: alice, 36
    [Running count]: alice, 45
    ...
    
  3. 入力し終わったら、Ctrl + C キーを押します("+" はキーを同時に押すことを意味します)。

  4. Kafka Streams コード を表示します。