Streams アプリケーションの作成

Kafka Streams ライブラリを利用する Java アプリケーションは、すべて Kafka Streams アプリケーションと見なされます。Kafka Streams アプリケーションの計算ロジックは、 プロセッサートポロジー として定義されます。プロセッサートポロジーとは、ストリームプロセッサー(ノード)とストリーム(エッジ)を表す図式です。

プロセッサートポロジーは、Kafka Streams API を使用して定義できます。

Kafka Streams DSL
最もよく使用されるデータ変換操作を提供する 高レベル API。 mapfilterjoinaggregations などの操作を、特別な設定を行うことなくすぐに実行できます。DSL は、Kafka Streams を初めて使用する開発者に推奨される開始点であり、さまざまなユースケースとストリーム処理のニーズに対応します。
Processor API
プロセッサーの追加と接続や、ステートストアとの直接的な対話を可能にする低レベル API。Processor API では DSL よりも高い柔軟性が提供されますが、その一方で、アプリケーション開発者側により多くの手動作業(コード行数の増加など)が求められます。

ライブラリと Maven アーティファクト

このセクションでは、Kafka Streams アプリケーションの作成に使用できる、Kafka Streams 関連のライブラリの一覧を示します。

これらのライブラリに対応する Maven アーティファクトは、Confluent の Maven リポジトリにあります。

<!-- Example pom.xml snippet when using Maven to build your Java applications. -->
<repositories>
    <repository>
        <id>confluent</id>
        <url>https://packages.confluent.io/maven/</url>
    </repository>
</repositories>

Kafka Streams アプリケーションでは、以下のライブラリに対する依存関係を定義できます。

グループ ID アーティファクト ID バージョン 説明
org.apache.kafka kafka-streams 6.2.4-ccs (必須)|kstreams| の基本ライブラリ。
org.apache.kafka kafka-streams-scala_2.11 または kafka-streams-scala_2.12 6.2.4-ccs Kafka Streams 用 Scala API。これはオプションです。
org.apache.kafka kafka-clients 6.2.4-ccs (必須)|ak-tm| クライアントライブラリ。組み込みのシリアライザーと逆シリアライザーが含まれます。
org.apache.avro avro 1.8.2 Apache Avro ライブラリ。これはオプションです(Avro を使用する場合にのみ必要です)。
io.confluent kafka-streams-avro-serde 6.2.4 Confluent の Avro シリアライザーと逆シリアライザー。これはオプションです(Avro を使用する場合にのみ必要です)。

ちなみに

シリアライザーと逆シリアライザーの詳細については、「Kafka Streams のデータ型とシリアル化」のセクションを参照してください。

以下は、Maven を使用する場合の pom.xml のスニペットの例です。

<repositories>
    <repository>
        <id>confluent</id>
        <url>https://packages.confluent.io/maven/</url>
    </repository>
</repositories>

<dependencies>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-streams</artifactId>
        <version>6.2.4-ccs</version>
    </dependency>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>6.2.4-ccs</version>
    </dependency>

    <!-- For Scala developers -->
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-streams-scala_2.11</artifactId>
        <!-- or
        <artifactId>kafka-streams-scala_2.12</artifactId>
        -->
        <version>6.2.4-ccs</version>
    </dependency>

    <!-- Dependencies below are required/recommended only when using Apache Avro. -->
    <dependency>
        <groupId>io.confluent</groupId>
        <artifactId>kafka-avro-serializer</artifactId>
        <version>6.2.4</version>
    </dependency>
    <dependency>
        <groupId>org.apache.avro</groupId>
        <artifactId>avro</artifactId>
        <version>1.8.2</version>
    </dependency>
    <dependency>
        <groupId>org.apache.avro</groupId>
        <artifactId>avro-maven-plugin</artifactId>
        <version>1.8.2</version>
    </dependency>
</dependencies>

Maven Project Object Model(POM)の完全なセットアップについては、Confluent のサンプルリポジトリにある Kafka Streams のサンプル を参照してください。

アプリケーションコード内での Kafka Streams の使用

Kafka Streams はアプリケーションコードのどこからでも呼び出すことができますが、これらの呼び出しは通常、アプリケーションの main() メソッドまたはそのバリアントの中で行われます。以下では、アプリケーション内で処理トポロジーを定義するための基本要素について説明します。

まず、KafkaStreams のインスタンスを作成する必要があります。

  • KafkaStreams コンストラクターの第 1 引数には、トポロジー(DSL では StreamsBuilder#build()Processor API では Topology )を指定します。これを使用してトポロジーが定義されます。
  • 第 2 引数は、その特定のトポロジーの構成を定義する StreamsConfig のインスタンスです。

以下にコード例を示します。

import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.kstream.StreamsBuilder;
import org.apache.kafka.streams.processor.Topology;

// Use the builders to define the actual processing topology, e.g. to specify
// from which input topics to read, which stream operations (filter, map, etc.)
// should be called, and so on.  We will cover this in detail in the subsequent
// sections of this Developer Guide.

StreamsBuilder builder = ...;  // when using the DSL
Topology topology = builder.build();
//
// OR
//
Topology topology = ...; // when using the Processor API

// Use the configuration properties to tell your application where the Kafka cluster is,
// which Serializers/Deserializers to use by default, to specify security settings,
// and so on.
Properties props = ...;

KafkaStreams streams = new KafkaStreams(topology, props);

この時点で内部構造は初期化されますが、処理はまだ開始されていません。KafkaStreams#start() メソッドを呼び出して、明示的に Kafka Streams スレッドを開始する必要があります。

// Start the Kafka Streams threads
streams.start();

このストリーム処理アプリケーションのインスタンスが他の場所(別のマシンなど)で実行されている場合、Kafka Streams は、既存のインスタンスから新しく開始されたインスタンスに透過的にタスクを再割り当てします。詳細については、「ストリームパーティションとタスク」と「スレッドモデル」を参照してください。

予期しない例外をすべてキャッチするには、アプリケーションの起動前に java.lang.Thread.UncaughtExceptionHandler を設定できます。このハンドラーは、ストリームスレッドが予期しない例外で終了するたびに呼び出されます。

// Java 8+, using lambda expressions
streams.setUncaughtExceptionHandler((Thread thread, Throwable throwable) -> {
  // here you should examine the throwable/exception and perform an appropriate action!
});


// Java 7
streams.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
  public void uncaughtException(Thread thread, Throwable throwable) {
    // here you should examine the throwable/exception and perform an appropriate action!
  }
});

アプリケーションインスタンスを停止するには、KafkaStreams#close() メソッドを呼び出します。

// Stop the Kafka Streams threads
streams.close();

SIGTERM に応答してアプリケーションを適切にシャットダウンできるようにするには、シャットダウンフックを追加して KafkaStreams#close を呼び出すことをお勧めします。

  • 以下に、Java 8 以降でのシャットダウンフックの例を示します。

    // Add shutdown hook to stop the Kafka Streams threads.
    // You can optionally provide a timeout to `close`.
    Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
    
  • 以下に、Java 7 でのシャットダウンフックの例を示します。

    // Add shutdown hook to stop the Kafka Streams threads.
    // You can optionally provide a timeout to `close`.
    Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
      @Override
      public void run() {
          streams.close();
      }
    }));
    

アプリケーションが停止すると、Kafka Streams は、このインスタンスで実行されていたすべてのタスクを、利用可能な残りのインスタンスに移行します。

注釈

このウェブサイトには、Apache License v2 の条件に基づいて Apache Software Foundation で開発されたコンテンツが含まれています。