Kafka Streams Processor API

Processor API を使用すると、開発者は、カスタムプロセッサーを定義して接続したり、ステートストアとやり取りしたりできます。Processor API では、一度に 1 つの受信レコードを処理する任意のストリームプロセッサーを定義し、それらのプロセッサーおよび関連付けられたステートストアを接続して、カスタマイズされた処理ロジックを表すプロセッサートポロジーを構成できます。

概要

Processor API は、ステートレスステートフル のどちらの操作を実装するためにも使用できます。ステートフルな操作は、ステートストア を使用することで実現されます。

ちなみに

DSL と Processor API: の組み合わせ :プロセッサーとトランスフォーマーの適用(Processor API の統合)」セクションで説明されているように、DSL の利便性と、Processor API のパワーおよび柔軟性を組み合わせることもできます。

使用できる API 機能の網羅的なリストについては、 Kafka Streams API ドキュメント を参照してください。

ストリーム処理の定義

ストリームプロセッサー とは、プロセッサートポロジー内で単一の処理ステップを表すノードです。Processor API を使用すると、受信したレコードを一度に 1 つずつ処理する任意のストリームプロセッサーを定義し、それらのプロセッサーを関連するステートストアに接続して、プロセッサートポロジーを構成することができます。

カスタマイズされたストリームプロセッサーを定義するには、process() API メソッドを提供する Processor インターフェイスを実装します。process() メソッドは、受信したレコードのそれぞれに対して呼び出されます。

Processor インターフェイスには init() メソッドがあり、タスクの構築フェーズで Kafka Streams ライブラリによって呼び出されます。Processor インスタンスは、必要なすべての初期化をこのメソッド内で実行する必要があります。init() メソッドには、現在の処理対象レコードのメタデータへのアクセスを提供する ProcessorContext インスタンスが渡されます。これには、ソースの Apache Kafka® トピックとパーティション、対応するメッセージオフセットなどの情報が含まれます。このコンテキストインスタンスを使用して、区切り関数をスケジュールしたり( ProcessorContext#schedule())、新しいレコードをキーと値のペアとしてダウンストリームプロセッサーに転送したり( ProcessorContext#forward())、現在の処理の進行状況をコミットしたり( ProcessorContext#commit())できます。

特に ProcessorContext#schedule() は、ユーザーの Punctuator コールバックインターフェイスを受け取り、その punctuate() API メソッドを PunctuationType に基づいて定期的にトリガーします。PunctuationType は、区切りのスケジュールに使用される時間の概念が イベント時刻 と実際の時刻のどちらであるかを決定します(デフォルトの構成はイベント時刻で、TimestampExtractor で抽出されるイベント時刻を表します)。イベント時刻を使用する場合、punctuate() は純粋にデータによってトリガーされます。イベント時刻は、入力データから取り出されたタイムスタンプによって決定される(および進められる)ためです。新しい入力データがなければ、イベント時刻は進行しないため、punctuate() は呼び出されません。

たとえば、Punctuator 関数を PunctuationType.STREAM_TIME に基づいて 10 秒ごとにスケジュールし、1 秒(最初のレコード)から 60 秒(最後のレコード)までの連続したタイムスタンプを持つ 60 個のレコードのストリームを処理する場合、punctuate() は 6 回呼び出されます。この呼び出しは、レコードを実際に処理するために必要な時間とは関係なく発生します。60 個のレコードの処理にかかる時間が 1 秒でも、1 分でも、1 時間でも、punctuate() は 6 回呼び出されます。

実際の時刻(つまり PunctuationType.WALL_CLOCK_TIME)を使用する場合、punctuate() は純粋に実際の時刻に基づいてトリガーされます。上記の例を再度使用して、Punctuator 関数を PunctuationType.WALL_CLOCK_TIME に基づいてスケジュールする場合、同じ 60 個のレコードが 20 秒で処理されるとすると、punctuate() は 2 回(10 秒ごとに 1 回)呼び出されます。これらの 60 個のレコードが 5 秒で処理された場合、punctuate() は 1 回も呼び出されません。init() メソッドの中から ProcessorContext#schedule() を複数回呼び出すと、同じプロセッサー内で、異なる PunctuationType の種類に対して複数の Punctuator コールバックをスケジュールできます。

注意

イベント時刻が進行するのは、すべての入力トピックのすべての入力パーティションに新しいデータ(タイムスタンプが新しいもの)がある場合だけです。1 つ以上のパーティションで新しいデータが利用できなければ、イベント時刻は進行せず、PunctuationType.STREAM_TIME が指定されている場合は punctuate() もトリガーされません。これはタイムスタンプエクストラクターの構成とは独立した動作です。つまり、WallclockTimestampExtractor を使用しても、実際の時刻に基づいて punctuate() がトリガーされるわけではありません。

以下に、単語数をカウントする単純なアルゴリズムを定義する Processor の例を示します。この例では、次のアクションが実行されます。

  • init() メソッド内で、1 秒ごとの区切りをスケジュールし(サポートされる最小の時間単位は 1 ミリ秒)、"Counts" という名前のローカルステートストアを取得します。
  • process() メソッド内で、受け取ったレコードごとに値の文字列を単語に分割し、ステートストアの単語数を更新します(これについては、このセクションで後述します)。

init() でスケジュールされる Punctuator オブジェクトは、punctuate() を定義します。このメソッドでは、ローカルステートストアを反復処理し、集約されたカウントをダウンストリームプロセッサーに送信して(ダウンストリームプロセッサーについてはこのセクションで後述します)、現在のストリームのステートをコミットします。

注意

ここに示すコードは単純化した例で、単一パーティションの入力トピックでのみ動作します。一般的な Processor API のワードカウントでは、2 つのプロセッサーが必要になります。1 つ目は、各行を単語に分割し、単語をレコードキーとして設定するステートレスなプロセッサーです。1 つ目のプロセッサーの結果は、2 つ目の(ステートフルな)プロセッサーによって消費される追加トピックに書き戻されます。実際のカウントを実行するのは、2 つ目のプロセッサーです。単語をトピックに書き戻す処理は、同じ単語をグループ化して、同じ単語が 2 つ目のプロセッサーの同じインスタンスに届くようにするために必要です。これは機能的に、Map-Reduce 計算のシャッフルのフェーズに似ています。

public class WordCountProcessor implements Processor<String, String> {

  private ProcessorContext context;
  private KeyValueStore<String, Long> kvStore;

  @Override
  @SuppressWarnings("unchecked")
  public void init(ProcessorContext context) {
      // keep the processor context locally because we need it in punctuate() and commit()
      this.context = context;

      // retrieve the key-value store named "Counts"
      kvStore = (KeyValueStore) context.getStateStore("Counts");

      // schedule a punctuate() method every second based on event-time
      this.context.schedule(Duration.ofSeconds(1), PunctuationType.STREAM_TIME, (timestamp) -> {
          KeyValueIterator<String, Long> iter = this.kvStore.all();
          while (iter.hasNext()) {
              KeyValue<String, Long> entry = iter.next();
              context.forward(entry.key, entry.value.toString());
          }
          iter.close();

          // commit the current processing progress
          context.commit();
      });
  }

  @Override
  public void process(String dummy, String line) {
      String[] words = line.toLowerCase(Locale.getDefault()).split(" ");

      for (String word : words) {
          Integer oldValue = this.kvStore.get(word);

          if (oldValue == null) {
              this.kvStore.put(word, 1);
          } else {
              this.kvStore.put(word, oldValue + 1);
          }
      }
  }

  @Override
  public void close() {
      // nothing to do
  }

}

注釈

ステートストアを使用するステートフルな処理 : 上記で定義されている WordCountProcessor では、process() メソッドの中で現在の受信レコードにアクセスでき、ステートストア を利用して処理ステートを維持できます。たとえば、集約や結合のようなステートフルな処理で使用するために、最近到着したレコードを記憶することができます。詳細については、ステートストア のドキュメントを参照してください。

プロセッサーコンテキストへのアクセス

前述 のとおり、ProcessorContext は処理ワークフローを制御し、区切り関数のスケジュールや、現在処理中のステートのコミットなどを行います。このオブジェクトは、applicationIdtaskId、タスクのステートを格納するための stateDir など、アプリケーションに関連するメタデータにアクセスするためにも使用できます。さらに、topicpartitionoffsettimestamp など、現在処理中のレコードのメタデータにもアクセスできます。

以下の例の process() 関数では、レコードのコンテキストに応じて異なる情報でレコードを修飾します。

public class EnrichProcessor implements Processor<String, String> {

  private ProcessorContext context;

  @Override
  @SuppressWarnings("unchecked")
  public void init(ProcessorContext context) {
      // keep the processor context locally because we need it in process()
      this.context = context;
  }

  @Override
  public void process(String key, String value) {
      switch(context.topic()) {
          case "alerts":
              context.forward(key, decorateWithHighPriority(value));
          case "notifications":
              context.forward(key, decorateWithMediumPriority(value));
          default:
              context.forward(key, decorateWithLowPriority(value));
      }
  }
}

注釈

レコードのコンテキストメタデータ : 現在処理されているレコードのメタデータは常に利用できるとは限りません。たとえば、現在処理されているレコードがソーストピックからパイプされものではなく、区切り関数から生成された場合、そのメタデータの topic フィールドは空になり、partitionoffset はセンチネル値(この場合は -1 )になります。一方、timestamp フィールドは、このレコードを生成した区切り関数がトリガーされた時刻を表します。

ステートストア

ステートフルProcessorTransformer を実装するには、1 つ以上のステートストアをプロセッサーまたはトランスフォーマーに提供する必要があります("ステートレス" なプロセッサーやトランスフォーマーには、ステートストアは必要ありません)。ステートストアは、最近受信した入力レコードの保存、ローリング集約の追跡、入力レコードの重複排除などを実行するために使用できます。ステートストアのもう 1 つの機能として、他のアプリケーションからの 対話型クエリ に応答することができます。たとえば、NodeJS ベースのダッシュボードや、Scala または Go で実装されたマイクロサービスから対話型クエリを実行できます。

Kafka Streams で 使用できるステートストアの種類 では、デフォルトで フォールトトレランス が有効になっています。

ステートストアの定義と作成

使用可能なストアの種類のいずれかを使用するか、 独自のカスタムストアの種類を実装 することができます。一般的には、Stores ファクトリを通じて既存のストアを利用します。

Kafka Streams を使用する場合は、通常、コード内で直接ステートストアを作成したりインスタンス化したりはしないことに注意してください。代わりに StoreBuilder を作成して、ステートストアを間接的に定義します。このビルダーは Kafka Streams によってファクトリとして使用され、必要時に必要な場所で、実際のステートストアをアプリケーションインスタンス内でローカルにインスタンス化します。

以下のストアの種類が標準で使用可能です。

ストアの種類 ストレージエンジン フォールトトレランス 説明
永続 KeyValueStore<K, V> RocksDB 対応(デフォルトで有効)
// Creating a persistent key-value store:
// here, we create a `KeyValueStore<String, Long>` named "persistent-counts".
import org.apache.kafka.streams.processor.StateStoreSupplier;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;

StoreBuilder countStoreBuilder =
  Stores.keyValueStoreBuilder(
    Stores.persistentKeyValueStore("persistent-counts"),
    Serdes.String(),
    Serdes.Long()
  );

詳細なファクトリオプションについては、PersistentKeyValueStore を参照してください。

インメモリー KeyValueStore<K, V> - 対応(デフォルトで有効)
// Creating an in-memory key-value store:
// here, we create a `KeyValueStore<String, Long>` named "inmemory-counts".
import org.apache.kafka.streams.processor.StateStoreSupplier;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;

StateStoreBuilder countStoreBuilder =
  Stores.keyValueStoreBuilder(
    Stores.inMemoryKeyValueStore("inmemory-counts"),
    Serdes.String(),
    Serdes.Long()
  );

詳細なファクトリオプションについては、InMemoryKeyValueStore を参照してください。

タイムスタンプ付きのステートストア

KTable では、デフォルトで常にタイムスタンプが格納されます。タイムスタンプ付きのステートストアでは、ストリーム処理のセマンティクスが強化され、ソース KTables 内の順序外のデータを管理できます。順序外の結合と集約が検出され、対話型クエリでは最新の更新のタイムスタンプが取得されます。

タイムスタンプ付きのステートストアに対するクエリは、タイムスタンプを使用して実行することも、使用せずに実行することもできます。

注釈

各インスタンスは 1 回のローリング再起動でアップグレードできます。

  • Processor API を使用している場合、既存のアプリケーションに変更はなく、タイムスタンプ付きのストアが使用可能になります。
  • DSL 演算子の場合、ストアデータはバックグラウンドでゆっくりとアップグレードされます。
  • カスタムの XxxBytesStoreSupplier を提供している場合、アップグレードは行われませんが、TimestampedBytesStore インターフェイスを実装することでオプトインできます。この場合は以前のフォーマットが維持され、Kafka Streams では、読み書きの際にタイムスタンプを追加または削除するプロキシストアが使用されます。

フォールトトレラントなステートストア

ステートストアでフォールトトレランスを実現し、データの損失なくステートストアを移行できるようにするには、バックグラウンドで継続的にステートストアを Kafka トピックにバックアップする必要があります。これにより、たとえば、 アプリケーションの処理容量を柔軟に追加または削除 するときに、1 台のマシンから別のマシンにステートフルなストリームタスクを移行することが可能になります。バックアップのトピックは、ステートストアに関連付けられた "changelog トピック"、またはステートストアの "changelog" と呼ばれます。たとえば、マシンで障害が発生したら、ステートストアとアプリケーションのステートを changelog から完全に復元できます。このステートストアの バックアップ機能は、有効または無効にする ことができます。

デフォルトでは、永続的なキーと値のストアはフォールトトレラントになります。これらは 圧縮 された changelog トピックにバックアップされます。このトピックを圧縮する目的は、トピックのサイズが無制限に増大するのを防ぎ、関連付けられている Kafka クラスターで消費されるストレージを減らすとともに、ステートストアを changelog トピックから復元する必要がある場合に、復旧にかかる時間を最小限に抑えるためです。

同様に、永続的なウィンドウストアもフォールトトレラントです。これらは、圧縮と削除の両方を使用するトピックにバックアップされます。ウィンドウストアの changelog トピックでは、changelog トピックに送信されるメッセージキーの構造に対応するために、削除と圧縮を組み合わせる必要があります。ウィンドウストアのメッセージキーは、"通常" のキーとウィンドウのタイムスタンプを含む複合キーです。このような複合キーの場合、changelog トピックが無制限に増大するのを防ぐには、圧縮を有効にするだけでは不十分です。削除を有効にすると、ログセグメントが期限切れになるにつれて、期限の切れた古いウィンドウが Kafka のログクリーナーによってクリアされます。デフォルトの保持設定は Materialized#withRetention() + 1 日です。この設定は、StreamsConfigStreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG を設定することでオーバーライドできます。

ステートストアから Iterator を開いた場合は、使い終わったときに反復子の close() を呼び出して、リソースを回収する必要があります。または、try-with-resources ステートメントの中で反復子を使用することもできます。反復子を終了しないと、メモリー不足エラーが発生する可能性があります。

ステートストアのフォールトトレランス(ストアの Changelog)の有効化または無効化

ステートストアのフォールトトレランスを有効または無効にするには、withLoggingEnabled()withLoggingDisabled() を使用して、ストアの変更記録を有効または無効にします。必要に応じて、関連付けられたトピックの構成を細かく調整することもできます。

フォールトトレランスを無効にする例を以下に示します。

import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;

StoreBuilder<KeyValueStore<String, Long>> countStoreSupplier = Stores.keyValueStoreBuilder(
  Stores.persistentKeyValueStore("Counts"),
    Serdes.String(),
    Serdes.Long())
  .withLoggingDisabled(); // disable backing up the store to a changelog topic

注意

changelog を無効にすると、関連付けられたステートストアがフォールトトレラントではなくなり、 スタンバイレプリカ も用意されなくなります。

フォールトトレランスを有効にして、追加の changelog トピック構成を指定する例を以下に示します。kafka.log.LogConfig から任意のログ構成を追加できます。認識できない構成は無視されます。

import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;

Map<String, String> changelogConfig = new HashMap<>();
// override min.insync.replicas
changelogConfig.put("min.insync.replicas", "1");

StoreBuilder<KeyValueStore<String, Long>> countStoreSupplier = Stores.keyValueStoreBuilder(
  Stores.persistentKeyValueStore("Counts"),
    Serdes.String(),
    Serdes.Long())
  .withLoggingEnabled(changelogConfig); // enable changelogging, with custom changelog settings

カスタムステートストアの実装

組み込みのステートストアの種類 を使用するだけでなく、独自のストアを実装することもできます。ストアを実装するためのプライマリインターフェイスは org.apache.kafka.streams.processor.StateStore です。Kafka Streams には、KeyValueStore などの継承されたインターフェイスもいくつか用意されています。

また、org.apache.kafka.streams.state.StoreBuilder インターフェイスを実装して、ストアの "ファクトリ" を提供する必要もあります。Kafka Streams は、これを使用してストアのインスタンスを作成します。

Scala でのステートストアの実装例が提供されているため、これを独自のストアの開始点として使用できます。

プロセッサーとステートストアの接続

プロセッサー (WordCountProcessor)とステートストアが定義されたら、それらのプロセッサーとステートストアを接続して、プロセッサートポロジーを構築できます。これには Topology インターフェイスを使用します。さらに、ソースプロセッサーおよび指定した Kafka トピックを追加して、トポロジーへの入力データストリームを生成し、シンクプロセッサーおよび指定した Kafka トピックを追加して、トポロジーからの出力データストリームを生成します。

実装例を以下に示します。

Topology builder = new Topology();

// add the source processor node that takes Kafka topic "source-topic" as input
builder.addSource("Source", "source-topic")

    // add the WordCountProcessor node which takes the source processor as its upstream processor
    .addProcessor("Process", () -> new WordCountProcessor(), "Source")

    // add the count store associated with the WordCountProcessor processor
    .addStateStore(countStoreSupplier, "Process")

    // add the sink processor node that takes Kafka topic "sink-topic" as output
    // and the WordCountProcessor node as its upstream processor
    .addSink("Sink", "sink-topic", "Process");

この例について簡単に説明します。

  • addSource メソッドにより、"Source" という名前のソースプロセッサーノードがトポロジーに追加されます。このノードは、"source-topic" という 1 つの Kafka トピックからデータを読み取ります。オプションとして、 Confluent の GenericAvroSerde や SpecificAvroSerde など、ソースから読み取るためのキーと値の逆シリアライザーを指定できます。
  • 次に addProcessor メソッドにより、"Process" という名前のプロセッサーノードと定義済みの WordCountProcessor ロジックが、"Source" ノードのダウンストリームプロセッサーとして追加されます。
  • countStoreSupplier を使用して、定義済みの永続的なキーと値のステートストアが作成され、"Process" ノードに関連付けられます。
  • addSink メソッドにより、シンクプロセッサーノードが追加されてトポロジーが完成します。このノードは、アップストリームプロセッサーとして "Process" ノードを受け取り、"sink-topic" という別個の Kafka トピックに書き込みます。または、オーバーロードされたもう 1 つの addSink のバリアントを使用すると、アップストリームプロセッサーから受け取ったレコードごとに、書き込み先の Kafka トピックを動的に決定することもできます。

このトポロジーでは、"Process" ストリームプロセッサーノードは "Source" ノードのダウンストリームプロセッサーであると同時に、"Sink" ノードのアップストリームプロセッサーと見なされます。したがって、Kafka から新しくフェッチされたレコードを "Source" ノードがダウンストリームの "Process" ノードに転送するたびに、WordCountProcessor#process() メソッドがトリガーされてレコードが処理され、関連付けられたステートストアが更新されます。WordCountProcessor#punctuate() メソッド内で context#forward() が呼び出されると、集約のキーと値のペアが "Sink" プロセッサーノードを通じて Kafka トピック "sink-topic" に送信されます。WordCountProcessor の実装では、キーと値のストアにアクセスするときに、同じ "Counts" というストア名を参照する必要があることに注意してください。そうでない場合、実行時にステートストアが見つからないという例外がスローされます。また、Topology コード内のプロセッサーに関連付けられていないステートストアにプロセッサーの init() メソッドからアクセスした場合も、実行時に例外がスローされ、このプロセッサーからはステートストアにアクセスできないことが通知されます。

これでアプリケーションのプロセッサートポロジー全体が定義されたので、 Kafka Streams アプリケーション を実行する手順に進むことができます。

トポロジーの記述

Topology が指定されたら、対応する DAG の記述を取得できます。これには、TopologyDescription を返す #describe() を使用します。TopologyDescription には、追加されたすべてのソース、プロセッサー、シンクノードと、関連付けられているすべてのストアが含まれます。ソースノードとシンクノードについて、指定された入力および出力のトピック名とパターンにアクセスできます。プロセッサーノードでは、関連付けられているストアが記述に追加されます。また、すべてのノードには、接続されている後続ノードと先行ノードのリストが含まれています。このため、TopologyDescription では、指定されたトポロジーの DAG 構造を取得できます。グローバルストアは、明示的に接続しなくてもすべてのノードからアクセスできるため、明確に示されます。さらに、ノードは SubTopology でグループ化されます。 SubTopology とは、互いに直接接続されているプロセッサーノードのグループです(つまり、トピック以外の直接接続またはストアの共有によって接続されています)。実行中、各 SubTopology は 1 つ以上のタスクによって処理されます。したがって、各 SubTopology は、異なるスレッドで並列に実行できる独立した作業単位を表します。トポロジーが指定されたストリームアプリケーションでは、起動前に Topology を記述すると、タスクの数と並列処理の上限を推測するために役立ちます(作成したアプリケーションの実行方法についてはこのセクションで後述します)。トポロジーが上記のように直接指定されたのではなく、Kafka Streams DSL を通じて指定されている場合も、Topology についてのインサイトを得ることは有用です。

注釈

このウェブサイトには、Apache License v2 の条件に基づいて Apache Software Foundation で開発されたコンテンツが含まれています。