Streams アップグレードガイド

Confluent Platform 6.0.x から Confluent Platform 6.2.4 へのアップグレード

互換性

Confluent Platform 6.2.4 でビルドされた Kafka Streams アプリケーションは、特定の Kafka クラスターとの前方互換性と後方互換性を備えています。

Confluent Platform 6.2.4 までの新しいクラスターに対する前方互換性:
Confluent Platform 3.0.x、Confluent Platform 3.1.x、Confluent Platform 3.2.x、Confluent Platform 3.3.x、Confluent Platform 4.0.x、Confluent Platform 4.1.x、Confluent Platform 5.0.x、Confluent Platform 5.1.x、Confluent Platform 5.2.x、Confluent Platform 5.3.x、Confluent Platform 5.4.x、または Confluent Platform 5.5.x でビルドされた既存の Kafka Streams アプリケーションは、Confluent Platform 6.2.4 を実行するアップグレードされた Kafka クラスターと互換性があります。
Confluent Platform 3.1.x までの古いクラスターに対する後方互換性:
Confluent Platform 6.2.4 でビルドされた新しい Kafka Streams アプリケーションは、Confluent Platform 3.3.x、Confluent Platform 4.0.x、Confluent Platform 4.1.x、Confluent Platform 5.0.x、Confluent Platform 5.1.x、Confluent Platform 5.2.x、Confluent Platform 5.3.x、Confluent Platform 5.4.x、または Confluent Platform 5.5.x を実行する以前の Kafka クラスターと互換性があります。Confluent Platform 3.0.x、Confluent Platform 3.1.x、または Confluent Platform 3.2.x を実行する Kafka クラスターは、新しい Confluent Platform 6.2.4 Kafka Streams アプリケーションとは互換性がありません。

注釈

Confluent Platform 5.2.2 および Confluent Platform 5.3.0 では、Kafka Streams にメッセージフォーマット 0.11 以降が必要です。このため、ブローカーを Confluent Platform 3.3 以降のバージョンにアップグレードするときに以前のメッセージフォーマットのままにすると、Kafka Streams Confluent Platform 5.2.2 または Confluent Platform 5.3.0 が機能しなくなります。Kafka Streams アプリケーションを Confluent Platform 5.2.2、Confluent Platform 5.3.0、またはそれ以降にアップグレードする前に、メッセージフォーマットを 0.11 にアップグレードする必要があります。

注釈

Confluent Platform 4.0.0 では、Kafka Streams にメッセージフォーマット 0.10 以降が必要です。このため、ブローカーを Confluent Platform 3.1 以降のバージョンにアップグレードするときに以前のメッセージフォーマットのままにすると、Kafka Streams Confluent Platform 6.2.4 が機能しなくなります。Kafka Streams アプリケーションを Confluent Platform 6.2.4 以降にアップグレードする前に、メッセージフォーマットを 0.10 にアップグレードする必要があります。

互換性マトリックス

 
Kafka ブローカー(列)
Streams API(行)
3.0.x / 0.10.0.x
3.1.x / 0.10.1.x および
3.2.x / 0.10.2.x
3.3.x / 0.11.0.x および
4.0.x / 1.0.x および
4.1.x / 1.1.x および
5.0.x / 2.0.x および
5.1.x / 2.1.x および
5.2.x / 2.2.x および
5.3.x / 2.3.x および
5.4.x / 2.4.x および
5.5.x / 2.5.x および
6.0.x / 2.6.x
3.0.x / 0.10.0.x
互換性あり
互換性あり
互換性あり
3.1.x / 0.10.1.x および
3.2.x / 0.10.2.x
 
互換性あり
互換性あり
3.3.x / 0.11.0.x
 
"厳密に 1 回" がオフの場合に
互換性あり(ブローカーバージョン
Confluent Platform 3.3.x 以降が必要)
互換性あり
4.0.x / 1.0.x および
4.1.x / 1.1.x および
5.0.x / 2.0.x および
5.1.x / 2.1.x および
5.2.0 / 2.2.0 および
5.2.1 / 2.2.0
 
"厳密に 1 回" がオフの場合に
互換性あり(ブローカーバージョン
Confluent Platform 3.3.x 以降が必要)。
メッセージフォーマット 0.10 以降が必要。
メッセージヘッダーはサポート外
(ブローカーバージョン Confluent
Platform 3.3.x 以降およびメッセージ
フォーマット 0.11 以降が必要)
互換性あり。
メッセージフォーマットは
0.10 以降であることが必要。
メッセージヘッダーが
使用される場合、メッセージフォーマット
0.11 以降が必要
5.2.2 / 2.2.1 および
5.3.x / 2.3.x および
5.4.x / 2.4.x および
5.5.x / 2.5.x および
6.0.x / 2.6.x
   
互換性あり。
メッセージフォーマットは
0.11 以降が必要。
"厳密に 1 回" の v2 には
Confluent Platform
5.4.x 以降が必要

Streams API は、以前のバージョンの Kafka (0.7、0.8、0.9)を実行している Kafka クラスターとは互換性がありません。

Kafka Streams アプリケーションの Confluent Platform 6.2.4 へのアップグレード

Confluent Platform 6.2.4 を利用するには、アプリケーションの Kafka Streams の依存関係をアップデートして、バージョン番号として 6.2.4-ccs を使用する必要があります。さらに、コードに簡単な変更を加え(詳細は後述)、アプリケーションを再コンパイルすることが必要な場合もあります。

たとえば、pom.xml ファイルで次のように指定します。

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-streams</artifactId>
    <!-- update version to 6.2.4-ccs -->
    <version>6.2.4-ccs</version>
</dependency>

6.2.4 リリースでは、MacOS 10.14 以降を必要とするバージョンの RocksDB に Kafka Streams が依存しています。

注釈

Confluent Platform 5.4.0 以降では、Kafka Streams はデフォルトで新しいインクリメンタル協調バランス調整プロトコルを使用します。Confluent Platform 5.4.0 より前のバージョンから安全にローリングアップグレードを実行するには、2 回のローリング再起動を伴う特定のアップグレードパスに従う必要があります。

  1. UPGRADE_FROM 構成をアップグレード元のバージョンに設定してから、初回のローリング再起動を実行し、新しいバイトコードにアップグレードします。
  2. すべてのメンバーが新しいバージョンになったら、UPGRADE_FROM 構成を削除し、2 回目のローリング再起動を実行して、協調バランス調整を有効にします。

以前の積極的バランス調整プロトコルを引き続き使用する場合は、UPGRADE_FROM 構成を残しておくことができます。この構成が 2.3 以前に設定されている場合、Streams は協調バランス調整を使用しません。ただし、少なくとも 2.0 以降にすることをお勧めします。それよりも前のバージョンでは、極端に古いバージョンからのアップグレードに伴う他の副作用が生じるためです。

注釈

Confluent Platform 4.0.0 では、ソーストピックを再利用する代わりに changelog トピックを保持するようにソース KTable のインスタンスが変更されましたが、そのときにトポロジーに関するリグレッションが生じました。Confluent Platform 5.0.0 において、KTable インスタンスでソーストピックを changelog として再利用する動作が修復されました。ただし、これはオプションであり、StreamsConfig.TOPOLOGY_OPTIMIZATIONStreamsConfig.OPTIMIZE に設定して構成する必要があります。

これにより、アップグレード元とアップグレード先の組み合わせに応じて、いくつかの異なるシナリオが生じます。

  • Kafka 2.0.x-cp1 から Kafka 6.2.4-ccs にアップグレードする場合は、既存の最適化構成を維持することをお勧めします。最適化レベルを変更すると、アプリケーションが短時間だけデータ損失の可能性にさらされることがあります。
  • StreamsBuilder を使用するバージョンの Kafka 1.0.x-cp1/1.1.x-cp1 から Kafka 6.2.4-ccs にアップグレードする場合、最適化を有効にすることはできますが、トポロジーが変更されるため、アプリケーションを新しいアプリケーション ID で再起動する必要があります。また、ローリングアップグレードを実行する場合は、最適化を有効にしないことをお勧めします。最適化を有効にしないのであれば、何も変更する必要はありません。
  • KStreamBuilder を使用するバージョンの Kafka 1.0.x-cp1/1.1.x-cp1 から StreamsBuilder Kafka 6.2.4-ccs にアップグレードする場合、最適化を有効にしてもトポロジーは変更されないため、有効にすることをお勧めします。最適化を有効にしない場合は、キーごとに 1 つのレコードが changelog トピックに格納されるまでの短時間、データ損失の可能性があることに注意してください。

また、新しいアプリケーション ID で起動すると、アプリケーション ID の変更が原因でデータが再処理される可能性があります。レコードの再処理が望ましくない場合は、ダウンストリームユーザーが管理された方法で移行できるように、新しい出力トピックを作成する必要があります。

Confluent Platform 6.2.0 での Streams API の変更点

指定された述語に基づいて KStream を 1 つ以上の KStream インスタンスに分岐または分割するメソッドとしては、branch() メソッドに代わって、タイプセーフの split() メソッドが使用されます。

次のコードは、Confluent Platform 6.2.0 より前のバージョンで branch メソッドを使用した例です。

final KStream<String, Event>[] branches = events.branch(
    (id, event) -> event.getTransactionValue() >= FRAUD_LIMIT,
    (id, event) -> event.getTransactionValue() < FRAUD_LIMIT
);
branches[0].to(suspiciousTransactionsTopicName);
branches[1].to(validatedTransactionsTopicName);

次のコード例は、前のコードを split メソッドを使用して書き換える方法を示しています。

// Canonical rewrite from the branch() method.
final Map<String, Event> branches = events.split(Named.as("branch-"))
    .branch((id, event) -> event.getTransactionValue() >= FRAUD_LIMIT)
    .branch((id, event) -> event.getTransactionValue() < FRAUD_LIMIT));
branches.get("branch-1").to(suspiciousTransactionsTopicName);
branches.get("branch-2").to(validatedTransactionsTopicName);

// Rewrite to exploit the new API.
events.split()
    .branch(
        (id, event) -> event.getTransactionValue() >= FRAUD_LIMIT,
        Branched.withConsumer(ks -> ks.to(suspiciousTransactionsTopicName)))
    .branch(
        (id, event) -> event.getTransactionValue() < FRAUD_LIMIT,
        Branched.withConsumer(ks -> ks.to(validatedTransactionsTopicName)));

Confluent Platform 6.1.0 での Streams API の変更点

スライディングウィンドウ

Confluent Platform 6.1 では、KIP-450 で説明されているように、windowedBy() のウィンドウ化された集約のオプションとして SlidingWindows が追加されます。スライディングウィンドウは、一定の期間内のデータを扱うためのウィンドウ(時間幅)で、柔軟かつ効率的にウィンドウ化された集約を行うことができます。詳細については、「開発者ガイド」を参照してください。

TRACE レベルのエンドツーエンドのレイテンシメトリクス

6.0 で導入されたエンドツーエンドのレイテンシメトリクスが拡張されており、ストアレベルのメトリクスが含まれています。新しいストアレベルのメトリクスは、新しいメトリクスの記録レベルである TRACE レベルで記録されます。TRACE レベルのメトリクスを有効にすると、より高いレベル(INFO および DEBUG)も自動的に有効になります。詳細については、KIP-613 を参照してください。

以前の Kafka Streams アプリケーションの Confluent Platform 6.1.x へのアップグレード

Confluent Platform 6.0.0 での Streams API の変更点

  • "厳密に 1 回" の保証を使用するアプリケーションの拡張性を高める新しい処理モード( KIP-447
  • ステートストアの高可用性( KIP-441
  • KStream.through() を置き換える新しい KStream.repartition() 演算子( KIP-221
  • 新しいエンドツーエンドのレイテンシメトリクスの追加( KIP-613
  • StreamsResetter の新しい --force オプションによる残存メンバーの強制削除( KIP-571

"厳密に 1 回" の処理の強化

Confluent Platform 6.0 以降では、新しい処理モード exactly_once_v2 を利用できます。これは、processing.guarantee パラメーターで構成できます。この新機能を使用するには、ブローカーが Confluent Platform 5.5.x または Kafka 2.5 以降のバージョンで動作している必要があります。

この実装では、クライアントスレッドや使用されるネットワーク接続など、クライアントとブローカーのリソースの使用率が削減されるため、より効率が良く、スループットと拡張性も向上します。ブローカーと Kafka Streams の内部動作の詳細については、KIP-447 を参照してください。

また、KIP-447 の実装の一環で、トランザクションのタイムアウトが 60 秒から 10 秒に短縮されました。

EOS アプリケーションを以前のバージョンからアップグレードしてバージョン 6.0+ でこの機能を有効にするには、まず、アプリケーションを exactly_once のままでバージョン 6.0.x にアップグレードする必要があります。その後、2 回目のローリング再起動を実行して exactly_once_v2 に切り替えます。

EOS アプリケーションを古いバージョン(Kafka 2.6 より前)からバージョン 2.6 ~ 2.8 にアップグレードする場合は、同じ手順に従いますが、構成を exactly_once_beta に置き換えます。exactly_once_beta を使用するアプリケーションをバージョン 2.6+ から 3.0 以降にアップグレードする場合は、特別な手順は必要ありません。ローリングアップグレード中に、構成を exactly_once_beta から exactly_once_v2 に変更するだけです。

ダウングレードの場合は逆の手順を実行します。まず、構成を exactly_once_v2 から exactly_once に切り替えて、2.6.x アプリケーションでこの機能を無効にします。その後、アプリケーションを 2.6.x より前のバージョンにダウングレードできます。

ステートストアの高可用性

ステートフルアプリケーションの可用性を高めるために、タスクの割り当てアルゴリズムが変更され、ステートフルなアクティブタスクをインスタンスに移動するときに、タスクのステートにまだ追い付いていないインスタンスへの移動を遅らせるようになりました。代わりに、あるインスタンスから別のインスタンスにタスクを移行するとき(スケールアウト時など)、Kafka Streams はターゲットインスタンスにウォームアップレプリカを割り当てます。これにより、既にタスクがあったインスタンスでアクティブタスクを利用可能な状態に保ちながら、ステートの復元を開始できます。インスタンスのウォームアップタスクは、準備ができたら Kafka Streams がバックグラウンドでアクティブタスクを新しい所有者に移動できるように、進行状況をグループに伝達します。この新機能を制御する新しい構成を含む詳細については、KIP-441 を参照してください。

エンドツーエンドのレイテンシメトリクス

新しいエンドツーエンドのレイテンシメトリクスが追加されました。これらのタスクレベルのメトリクスは INFO レベルで記録され、レコードがタスクの開始(ソース)ノードから終了(ターミナル)ノードに至るまでの、エンドツーエンドの最小および最大レイテンシを報告します。詳細については、KIP-613 を参照してください。

repartition() 演算子による through() の置き換え

演算子 KStream.through() は非推奨になり、新しい KStream.repartition() 演算子に置き換えられました( KIP-221 より)。KStream.repartition()KStream.through() と似ていますが、Kafka Streams が自動的にトピックを管理します。自分で管理するトピックとの間で読み書きを行う必要がある場合は、KStream.to()StreamsBuilder#stream() と組み合わせて使用する方法にフォールバックできます。トピックには "副出力" として書き込むだけで処理を続行する場合は、myStream.xxx(); myStream.to() を使用してファンアウトのデータフローにすると、クラスターからデータを読み取り直す必要がなくなります。

Confluent Platform 5.5.0 での Streams API の変更点

Confluent Platform 5.5 では、KStream を KTable に変換する KStream.toTable() などの新しい API、複数のストリームを単一の操作で集約できる新しい cogroup() 演算子、入力トピックからの null キーまたは null 値を表す新しい Serde 型 Void、スタンバイレプリカのメタデータを公開する新しい queryMetadataForKey()、特定のパーティションと特定のタスクの種類に対して Kafka Streams の対話型クエリ を実行できる新しい KafkaStreams.store(StoreQueryParameters) API が追加されています。また、UsePreviousTimeOnInvalidTimestamp が非推奨になり、UsePartitionTimeOnInvalidTimeStamp に置き換えられています。

コグループ

Confluent Platform 5.5 では、複数のストリームを単一の操作で集約できる新しい cogroup() オペレーター( KIP-150 より)が追加されています。コグループ化されたストリームは、集約前にウィンドウ化することもできます。詳細については、「開発者ガイド」を参照してください。

スタンバイのステートデータに対するクエリの実行

Confluent Platform 5.5 では、Kafka Streams の対話型クエリ のパフォーマンスと高可用性を強化する新しい API が追加されています。KafkaStreams.queryMetadataForKey(store, key, serializer) API は、特定のキーをホストするアクティブとスタンバイの両方を含む KeyQueryMetadata オブジェクトを返します。 KafkaStreams.allLocalStorePartitionLags() は、特定のストリームインスタンスの各ストアパーティションに関するラグや陳腐化の情報を含む LagInfo オブジェクトのマップを提供します。これらの API を組み合わせると、KafkaStreams.store(StoreQueryParameters) API を細かく制御して、特定のキーのパーティションだけを返すようにストアにフィルターをかけたり、古いストアをクエリの対象にするかどうかを制御したりできます。

新しい toTable() API

Confluent Platform 5.5 リリースでは、KIP-523 により、 Kafka Streams を直接 KTable に変換できます。この API により、イベントストリームを changelog ストリームとして取得する必要のあるシナリオが簡素化されます。入力 KStream からの各レコードは、対応するキーに対する挿入、更新、または削除(値が null の場合)として結果の KTable に適用されます。null キーを持つレコードは破棄されます。

非推奨になった API

Confluent Platform 5.5 では、KIP-530 によって UsePreviousTimeOnInvalidTimestamp が非推奨となり、UsePartitionTimeOnInvalidTimeStamp に置き換えられました。これは、ストリームの時間トラッキングが改善され、パーティション時刻の定義が修正されたためです。

KIP-562 によって KafkaStreams.store(String, QueryableStoreType) が非推奨となり、KafkaStreams.store(StoreQueryParameters) に置き換えられました。新しい API では、クエリを目的のレプリカやパーティションにルーティングするためのオプションが追加されています。

KIP-535 によって KafkaStreams.metadataForKey(...) API が非推奨となり、同等の KafkaStreams.queryMetadataForKey(...) に置き換えられました。新しい API では、対話型クエリのルーティングに役立つスタンバイレプリカの情報も提供され、高可用性が向上します。

Confluent Platform 5.4.0 での Streams API の変更点

Confluent Platform 5.4.x では、トポロジーを指定する新しい API と、アプリケーションをモニタリングするための新しいメトリクスが追加されています。単体テストユーティリティの API が強化され、いくつかの API は非推奨になりました。さらに、Confluent Platform 5.4.x ではインクリメンタル協調バランス調整が導入されました。以下に詳細を示します。

外部キーによる KTable-KTable 結合

以前のバージョンでは、レコードキーによるテーブルの結合しか実行できず、両方のテーブルに同じキースペースが必要でした。

Confluent Platform 5.4.0 以降では、外部キー参照による多対一の KTable-KTable 結合を実行できます。この機能を使用するには、Function<V, KO> foreignKeyExtractor 引数を受け取る KTable#join のオーバーロードを指定します。必要な再パーティション操作は Kafka Streams によって内部で自動的にセットアップされ、適切な結合結果が計算されます。

設計の詳細については、設計ドキュメント KIP-213 を参照してください。

演算子の名前付け

Confluent Platform 5.4.x リリースでは、KIP-307 により、 Kafka Streams DSL トポロジー内のすべての演算子に名前を付けることができます。独自の演算子に意味のある名前を付けると、トポロジーの記述( Topology#describe()#toString() )がわかりやすくなり、Kafka Streams アプリケーションの動作の詳細なコンテキストを理解しやすくなります。

KStreamKTable のほとんどのメソッドには、Named オブジェクトを受け取る新しいオーバーロードがあります。通常、DSL 操作に名前を指定するには Named.as("my operator name") を使用します。集約操作の再パーティショントピックの名前付けには、引き続き Grouped を使用します。結合操作の場合は Joined か、または新しい StreamJoined オブジェクトを使用します。

KStream-KStream 結合でのステートストアの名前付け

Confluent Platform 5.4.x より前のバージョンの Kafka Streams では、DSL のユーザーは、ストリームとストリームの結合にかかわるステートストアに名前を付けることができませんでした。ユーザーがトポロジーを変更し、結合の前に演算子を追加すると、ステートストアの内部名が変わり、再度デプロイするときにアプリケーションのリセットが必要となりました。Confluent Platform 5.4.x リリースでは、Kafka Streams に StreamJoined クラスが追加され、結合にかかわる結合プロセッサー、再パーティショントピック(再パーティションが必要な場合)、およびステートストアにユーザーが名前を指定できるようになっています。また、ステートストアに名前を付けると、そのステートストアを支える changelog トピックにも名前が付けられます。重要な点として、ストアに名前を付けても、それらのストアが Kafka Streams の対話型クエリ でクエリ可能になるわけでは ありません

StreamJoined によってもたらされるもう 1 つの機能に、結合で使用されるステートストアの種類の構成があります。ストリームとストリームの結合について、インメモリーストアまたはカスタムステートストアを使用するか、組み込みの RocksDB ストアを引き続き使用するかを選択できます。ただし、指定したストアに対して Kafka Streams の対話型クエリ でクエリを実行することはできません。StreamJoined の追加に伴って、Joined を使用したストリームとストリームの結合操作は非推奨になりました。新しいオーバーロードメソッドを使用したストリームとストリームの結合方法に切り替えることをお勧めします。

// Joined class, now deprecated

 left.join(right,
            (value1, value2) -> value1 + value2,
            JoinWindows.of(ofMillis(100)),                     
            Joined.with(Serdes.String(), Serdes.Integer(), Serdes.Integer())
                  // only names repartition topic (if needed)
                  .named("my-join"));



 // StreamJoined 

 left.join(right,
            (value1, value2) -> value1 + value2,
            JoinWindows.of(ofMillis(100)),                                            
            StreamJoined.with(Serdes.String(), Serdes.Integer(), Serdes.Integer())
                         // names the state store.
                        .withStoreName("join-store")
                        // names the processor and repartition topic (if needed)
                        .withName("my-join"));

詳細については、KIP-479 を参照してください。

メトリックの強化

RocksDB ステートストアをモニタリングするメトリクスが追加されました。新しく追加された RocksDB メトリクスでは、RocksDB インスタンスから提供される特定の統計情報を収集します。RocksDB メトリクスについては、 モニタリングのセクション で説明されています。

Kafka Streams アプリケーションをクライアントレベルでモニタリングするクライアントメトリクスが追加されました。新しいクライアントメトリクスについては、モニタリングのセクション で説明されています。

テストの改良

KIP-470 により、Kafka Streams の単体テストユーティリティが改良されました。ConsumerRecordFactory で生成され、ヘルパークラス OutputVerifier で検証される低レベルの ConsumerRecord<byte[],byte[]>ProducerRecord<byte[],byte[]> を使用する代わりに、新しいクラス TestInputTopicTestOutputTopic、および TestRecord が追加されました。これらの新しいクラスが提供する強化された API によって、単一(またはリスト)の値、キーと値のペア、または TestRecord のインスタンスの読み書きが可能になります。新しい TestRecord クラスでは、ヘッダーやタイムスタンプなどのレコードメタデータを設定または検証できます。さらに、新しい API では、ユーザーが選択した任意のアサーションライブラリを使用できるため、慣用的なテストコードを記述できます。

// old API

// use a record factory to pipe data through the test driver
ConsumerRecordFactory<String, Integer> factory = new ConsumerRecordFactory<>(
    "input-topic",
    new StringSerializer(),
    new LongSerializer()
);
testDriver.pipe(factory.create("key", 42L));


// get output record from test driver and verify result
ProducerRecord<String, Integer> outputRecord = testDriver.readOutput(
    "output-topic",
    new StringDeserializer(),
    new LongDeserializer()
);
// throws AssertionError if key or value does not match
OutputVerifier.compareKeyValue(outputRecord, "key", 42L);

// advance wall-clock time using primitive long time (in ms)
testDriver.advanceWallClockTime(10L);


// new API

// use a test-input-topic to pipe data through the test driver
TestInputTopic<String, Long> inputTopic = testDriver.createInputTopic(
    "input-topic",
    new StringSerializer(),
    new LongSerializer()); 
inputTopic.pipeInput("key", 42L);

// use a test-output-topic to get output records
TestOutputTopic<String, Long> outputTopic = testDriver.createOutputTopic(
    "result-topic",
    new StringDeserializer(),
    new LongDeserializer());
// user assertion library of your choice to verify output
assertThat(outputTopic.readKeyValue(), equalTo(new KeyValue<>("a", 21L)));

// advance wall-clock time using expressive Duration
testDriver.advanceWallClockTime(Duration.ofMillis(10L));

ステートのバランス調整

インクリメンタル協調バランス調整の導入により、Streams では、バランス調整の開始時にすべてのタスクを取り消す必要がなくなりました。代わりに、バランス調整の完了時に、全体的な負荷分散のために別のコンシューマーに移行する必要のあるタスクだけが取り消され、終了されます。これによって StateListener のセマンティクスが少し変わり、バランス調整の開始時に必ずしも REBALANCING に移行するとは限らなくなります。これは、ステートの復元中を除いて常に、バランス調整の進行中でも、対話型クエリが利用可能になることを意味します。バランス調整の開始時に復元が実行されている場合は、協調バランス調整の実行中も、ステートストアやプロセスのスタンバイタスクの復元がアクティブに続行されます。この新しいバランス調整プロトコルでは、1 回目のバランス調整に続けて、すべてのタスクが問題なく分散されるようにする 2 回目のバランス調整が行われる場合があります。協調バランス調整はデフォルトで有効になっています。「Kafka Streams アプリケーションの Confluent Platform 6.2.4 へのアップグレード」で説明されているように、ローリングアップグレードを安全に実行するには、特定のアップグレードパスが必要です。新しい協調バランス調整プロトコルの詳細については、KIP-429 を参照してください。

非推奨になった API

Confluent Platform 5.4.x では、WindowStore#put(K key, V value) は非推奨になり、使用することはできません。代わりに既存の WindowStore#put(K key, V value, long windowStartTimestamp) を使用してください。

詳細については、KIP-474 を参照してください。

// old API

// similar for Transformer interfaces
class MyProcessor implements Processor {
  // omit some methods for brevity

  WindowStore store;

  public void init(ProcessorContext context) {
    store = (WindowStore) context.getStateStore("myStore");
  }

  public void process(K key, V value) {
    store.put(key, value); // deprecated
  }
}


// new API

// similar for Transformer interfaces
class MyProcessor implements Processor {
  // omit some methods for brevity

  WindowStore store;

  public void init(ProcessorContext context) {
    store = (WindowStore) context.getStateStore("myStore");
  }

  public void process(K key, V value) {
    long windowStartTimestamp = ... // compute window start timestamp explicitly
    store.put(key, value, windowStartTimestamp);
  }
}

さらに、PartitionGrouper インターフェイスおよび対応する構成パラメーター partition.grouper が非推奨になり( KIP-528 )、Apache Kafka の次のメジャーリリースで削除される予定です( KAFKA-7785 )。したがって、この機能は将来サポートされなくなるため、次のメジャーリリースにアップグレードする前にコードをアップデートする必要があります。カスタムの PartitionGrouper を使用している場合、その使用を停止すると、作成されるタスクが変わる可能性があります。このため、アプリケーションをリセットしてアップグレードする必要があります。

Confluent Platform 5.3.0 での Streams API の変更点

Confluent Platform 5.3.x では、いくつかの新しい API が追加され、一部のデフォルト構成が変更されました。さらに、RocksDB の依存関係がアップデートされました。以下に詳細を示します。

Scala の Suppress 演算子

kafka-streams-scala KTable API に Suppress 演算子が追加されました。

インメモリーのウィンドウストアとセッションストア

Streams で、RocksDB に基づく永続的なストアに加えて、インメモリーのウィンドウストアとセッションストアが提供されるようになりました。Stores には、新しいパブリックインターフェイス inMemoryWindowStore()inMemorySessionStore() が追加されています。これにより、組み込みのインメモリーウィンドウストアとセッションストアが提供されます。

Serde の close() と configure()

SerializerDeserializer、および Serdeclose()configure() にデフォルトの実装が追加され、ラムダ式で実装できるようになりました。詳細については、KIP-331 を参照してください。

RocksDB のストアのタイムスタンプ

演算子のセマンティクスを改善するために、キーと値のペアまたはウィンドウごとに追加のタイムスタンプを格納できる新しい種類のストアが追加されています。一部の DSL 演算子(KTables など)では、これらの新しいストアが使用されています。したがって、QueryableStoreType として TimestampedKeyValueStoreType または TimestampedWindowStoreType を指定すると、対話型クエリによって最新の更新タイムスタンプを取得できます。この変更はほとんどの場合は透過的ですが、コードの変更が必要となる特殊なケースもいくつかあります。型指定のないストアを受け取ってキャストを使用している場合は、必要に応じて、正しい型にキャストするようにコードをアップデートする必要があります。そうしないと、ストアから値を取得するときに java.lang.ClassCastException: class org.apache.kafka.streams.state.ValueAndTimestamp cannot be cast to class YOUR-VALUE-TYPE のような例外が発生することがあります。また、TopologyTestDriver#getStateStore() では組み込み以外のストアだけが返され、組み込みのストアにアクセスした場合は例外がスローされます。詳細については、KIP-258 を参照してください。

新しい flatTransformValues() 演算子

型の安全性を強化するために、新しい演算子 KStream#flatTransformValues が追加されました。詳細については、KIP-313 を参照してください。

max.poll.interval のデフォルト値

以前の Kafka Streams では、構成パラメーター max.poll.interval.msInteger.MAX_VALUE に設定されていました。このデフォルト値が削除され、Kafka Streams はコンシューマーのデフォルト値を使用するようになりました。詳細については、KIP-442 を参照してください。

再パーティショントピックのデフォルト

再パーティショントピックのデフォルト構成が変更されました。インデックスファイルのセグメントサイズ segment.index.bytes は 50 MB ではなく、クラスターのデフォルト値が使用されます。同様に、構成 segment.ms は 10 分ではなく、クラスターのデフォルト構成が使用されます。最後に、保持時間(retention.ms)は Long.MAX_VALUE から -1` (無限)に変更されました。詳細については、KIP-443 を参照してください。

RocksDBConfigSetter close()

メモリーリークを防ぐために、RocksDBConfigSetter に新しい close() メソッドが追加されました。これはシャットダウン時に呼び出されます。ユーザーはこのメソッドを実装し、その中で RocksDB 構成オブジェクトを終了して、これらのオブジェクトで使用されているすべてのメモリーを解放する必要があります。詳細については、 こちら を参照してください。

RocksDB の v5.18.3 へのアップグレード

RocksDB の依存関係がバージョン 5.18.3 にアップデートされました。新しいバージョンでは、より多くの RocksDB 構成を指定できるようになっています。これには、RocksDB のオフヒープメモリーの使用量を制限するために役立つ WriteBufferManager などが含まれます。詳細については、「メモリー管理」を参照してください。

Confluent Platform 5.2.0 での Streams API の変更点

Confluent Platform 5.2.x では、いくつかの新しい API が追加されているほか、一部の既存の機能が強化されています。

KafkaStreams の移行の簡素化

Confluent Platform 5.2.0 では、KafkaStreams#state の移行ダイアグラムが簡素化されています。以前のバージョンでは、ステートは CREATED から RUNNING に移行し、その後 REBALANCING に移行して最初のストリームタスクの割り当てを取得してから、RUNNING に戻ります。これに対して 5.2.0 以降では、CREATED から直接 REBALANCING に移行し、その後 RUNNING になります。ステートの移行イベントをキャプチャする StateListener を登録している場合は、この簡素化に合わせて、リスナーの実装の調整が必要になることがあります(実際には、リスナーのロジックが影響を受けることはめったにありません)。

TimeWindowSerde

WindowedSerdes には、ウィンドウサイズの構成が可能な TimeWindowSerde を返す新しい静的コンストラクターがあります。これを使用すると、時間ウィンドウの serde を構築して、時間ウィンドウ化されたストアの changelog から直接読み取ることができます。詳細については、KIP-393 を参照してください。

AutoCloseable

Confluent Platform 5.2.0 では、KafkaStreams などの一部のパブリックインターフェイスが AutoCloseable を実装するように拡張され、try-with-resource ステートメントで使用できるようになりました。影響を受けるパブリックインターフェイスの完全なリストについては、KIP-376 を参照してください。

Confluent Platform 5.1.0 での Streams API の変更点

新しい構成クラス Grouped

新しいクラス Grouped が追加され、Serialized は非推奨になりました。Grouped が追加された意図は、集約操作の実行時に作成される再パーティショントピックに名前を付けることにあります。ユーザーは Grouped#as() メソッドを使用して、再パーティショントピックが作成される場合の名前を指定できます。このメソッドは、再パーティショントピック名の一部として使用される String を受け取ります。結果の再パーティショントピック名は、${application-id}->name<-repartition というパターンに従います。Grouped クラスは、KStream#groupByKey()KStream#groupBy()、および KTable#groupBy()Serialized よりも優先されます。集約操作の再パーティショントピックは、Kafka Streams によって自動的に作成されるわけではない点に注意が必要です。また、Joined クラスもアップデートされ、新しいメソッド Joined#withName が追加されています。このメソッドにより、ユーザーは、ストリームとストリーム、またはストリームとテーブルの結合を実行するために必要な再パーティショントピックに名前を付けることができます。

// old API
KStream<String, String> stream = ...

stream.groupByKey(Grouped.with(Serdes.String(), Serdes.String()))


stream.groupBy( (key, value) -> ... , Grouped.with(Serdes.String(), Serdes.String()))


KStream<String, String> streamII = ...
streamII.join(stream, (valueII, valueI) -> ... , JoinWindows.of(20000), Joined.with(Serdes.String(), 
	                                                                             Serdes.String(), 
	                                                                             Serdes.String()))
KTable<String, String> table = ...
table.groupBy( (key, value) -> ... , Grouped.with(Serdes.String(), Serdes.String()))

// new API
KStream<String, String> stream = ...

stream.groupByKey(Grouped.with(Serdes.String(), Serdes.String()))

                            // providing name for possible reparitiion topic
                            // using a name for the repartition topic is optional
stream.groupByKey(Grouped.with("repartition-topic-name", 
                               Serdes.String(), 
                               Serdes.String())) 

stream.groupBy( (key, value) -> ... , Grouped.with(Serdes.String(), Serdes.String()))

                                      // providing name for possible reparitiion topic
                                      // using a name for the repartition topic is optional
stream.groupBy( (key, value) -> ... , Grouped.with("reparition-topic-name", 
                                                   Serdes.String(), 
                                                   Serdes.String())) 
KStream<String, String> streamII = ...


streamII.join(stream, (valueII, valueI) -> ... , JoinWindows.of(20000), Joined.with(Serdes.String(), 
	                                                                             Serdes.String(), 
	                                                                             Serdes.String()))
                                                              
streamII.join(stream, (valueII, valueI) -> ... , JoinWindows.of(20000), Joined.with(Serdes.String(), 
	                                                                             Serdes.String(), 
	                                                                             Serdes.String(), 
	                                                                             "join-repartition-topic-name")) 
                                                                    // providing name for possible reparitiion topic
                                                                    // using a name for the repartiton topic is optional
KTable<String, String> table = ...

table.groupBy( (key, value) -> ... , Grouped.with(Serdes.String(), Serdes.String()))

                                        // providing name for possible reparitiion topic
                                        // using a name for the repartiton topic is optional
table.groupBy( (key, value) -> ... , Grouped.with("repartition-topic-name", 
                                                  Serdes.String(), 
                                                  Serdes.String())) 

新しい UUID Serde

UUID 用の新しい serde( Serdes.UUIDSerde )が追加されました。これは Serdes.UUID() を通じて使用できます。

API のセマンティクスの改善

long 引数をタイムスタンプ(固定小数点数)または期間(時間)として受け取るメソッドのリストがアップデートされ、セマンティクスを改善するために、Instant パラメーターと Duration パラメーターに置き換えられました。long を使用するいくつかの以前のメソッドは非推奨になったため、コードをアップデートすることをお勧めします。

具体的に、集約ウィンドウ(ホッピング、タンブリング、時間制限なしのウィンドウおよびセッションウィンドウ)と結合ウィンドウは、ウィンドウサイズ、ホップ、ギャップの各パラメーターを指定する Duration 引数を受け取ります。Stores クラスでも、ウィンドウサイズと保持時間を Duration 型で指定します。Window クラスには新しいメソッド #startTime()#endTime() があり、ウィンドウの開始と終了のタイムスタンプを Instant として返します。対話型クエリには、Instant 引数を受け取る新しい #fetch(...) オーバーロードが用意されています。さらに、区切りの登録には ProcessorContext#schedule(Duration interval, ...) を使用します。

KafkaStreams#close(...) は非推奨になり、単一のタイムアウト引数を受け取る KafkaStreams#close(Duration) に置き換えられました。

注釈

新しい KafkaStreams#close(Duration) メソッドのセマンティクスは以前よりも強化されています(ただし、わずかな違いがあります)。タイムアウトにゼロが指定されても、無限にブロックすることはありません。また、負のタイムアウト値は許容されません。ブロックする動作が望ましい場合は、Duration.ofMillis(Long.MAX_VALUE) を渡す必要があります。

// old API

KStream stream = ...

// Tumbling/Hopping Windows
stream.groupByKey().
      .windowedBy(TimeWindows.of(5 * 60 * 1000)
                             .advanceBy(TimeUnit.MINUTES.toMillis(1)),
                  ...);

// Unlimited Windows
stream.groupByKey().
      .windowedBy(UnlimitedWindows.startOn(System.currentTimeMillis()), ...);

// Session Windows
stream.groupByKey().
      .windowedBy(SessionWindows.with(100), ...);

// Joining Streams
KStream secondStream = ...
stream.join(stream2, JoinWindows.of(5000)
                                .before(10000)
                                .after(10000), ...)

// Accessing Window start/end-timestamp
KTable<Windowed<K>, V> table = ...
table.toStream()
     .foreach((wKey, value) -> {
       // can still be used for performance reasons (eg, in a Processor)
       long windowStartTimeMs = wKey.window().start();
       long windowEndTimeMs = wKey.window().end();
       ...
     });

// Registering a Punctuation
MyProcessor implements Processor { // same for Transformer[WithKey] and ValueTransformer[WithKey]
  // other methods omitted for brevity

  void init(ProcessorContext context) {
    context.schedule(1000, PunctuationType.STREAM_TIME, timestamp -> {...});
  }
}

// Interactive Queries
KafkaStreams streams = ...
ReadOnlyWindowStore windowStore = streams.store(...);

long now = System.currentTimeMillis();
WindowStoreIterator<V> it = windowStore.fetch(key, now - 5000, now);
WindowStoreIterator<V> all = windowStore.all(now - 5000, now);

// Creating State Stores
Stores.persistentWindowStore("storeName",
                             24 * 3600 * 1000, // retention period in ms
                             3,                // number of segments
                             10 * 60 * 1000,   // window size in ms
                             false);           // retain duplicates
Stores.persistentSessionStore("storeName",
                              24 * 3600 * 1000); // retention period in ms

// Closing KafkaStreams
KafkaStreams streams = ...
streams.close(30, TimeUnit.SECONDS);
streams.close(0, TimeUnit.SECONDS); // block forever



// new API

KStream stream = ...

// Tumbling/Hopping Windows
stream.groupByKey().
      .windowedBy(TimeWindow.of(Duration.ofMinutes(5)
                            .advanceBy(Duration.ofMinutes(1)),
                  ...);

// Unlimited Windows
stream.groupByKey().
      .windowedBy(UnlimitedWindows.startOn(Instant.now()), ...);

// Session Windows
stream.groupByKey().
      .windowedBy(SessionWindows.with(Duration.ofMillis(100)), ...);

// Joining Streams
KStream secondStream = ...
stream.join(stream2, JoinWindows.of(Duration.ofSeconds(5)
                                .before(Duration.ofSeconds(10))
                                .after(Duration.ofSeconds(10)), ...)

// Accessing Window start/end-timestamp
KTable<Windowed<K>, V> table = ...
table.toStream()
     .foreach((wKey, value) -> {
       // better semantics with new API
       Instant windowStartTime = wKey.window().startTime();
       Instant windowEndTime = wKey.window().endTime();
       // can still be used for performance reasons (eg, in a Processor)
       long windowStartTimeMs = wKey.window().start();
       long windowEndTimeMs = wKey.window().end();
       ...
     });

// Registering a Punctuation
MyProcessor implements Processor { // same for Transformer[WithKey] and ValueTransformer[WithKey]
  // other methods omitted for brevity

  void init(ProcessorContext context) {
    context.schedule(Duration.ofSeconds(1), PunctuationType.STREAM_TIME, timestamp -> {...});
  }
}

// Interactive Queries
KafkaStreams streams = ...
ReadOnlyWindowStore windowStore = streams.store(...);

Instant now = Instant.now();
WindowStoreIterator<V> it = windowStore.fetch(key, now.minus(Duration.ofSeconds(5), now);
WindowStoreIterator<V> all = windowStore.all(now.minus(Duration.ofSeconds(5), now);

// Creating State Stores
Stores.persistentWindowStore("storeName",
                             Duration.ofDay(1),      // retention period
                             // number of segments is removed
                             Duration.ofMinutes(10), // window size
                             false);                 // retain duplicates
Stores.persistentSessionStore("storeName",
                              Duration.ofDay(1)); // retention period

// Closing KafkaStreams
KafkaStreams streams = ...
streams.close(Duration.ofSeconds(30));
streams.close(Duration.ofMillis(Long.MAX_VALUE)); // block forever

ストアセグメントの簡素化

ウィンドウストアの "セグメント数" の概念が廃止され、"セグメント間隔" に置き換えられました。以前の API では、セグメント間隔は min( retention-period / (number-of-segments - 1) , 60_000L) (セグメントサイズの下限は 60 秒)として計算されていました。新しい API では、セグメント間隔に制限はなく(最小のセグメント間隔は 1 ミリ秒)、セグメント数は 1 + (retention-period / segment-interval) になります。

保持時間が 24 時間のセグメントをストアで 3 つ使用していた場合、新しい API で同じ動作を実現するには、セグメント間隔を 12 時間にする必要があります(12 = 24 / (3 - 1))。

メソッド Windows#segments() と変数 Windows#segments は非推奨になりました。同様に、WindowBytesStoreSupplier#segments() も非推奨になり、WindowBytesStoreSupplier#segmentInterval() に置き換えられました。カスタムのウィンドウまたはウィンドウストアを実装している場合は、これらのメソッドの削除時にコードをアップデートする必要が生じることに注意してください。

最後に、Stores#persistentWindowStore(...) は非推奨になり、セグメント数の指定を受け付けない新しいオーバーロードに置き換えられました。セグメント数は、保持時間とセグメント間隔に基づいて自動的に計算されます。永続的なウィンドウストアを明示的に作成している場合は、この変更に合わせてコードをアップデートする必要があります。

// Old API

Stores.persistentWindowStore("storeName",
                             24 * 3600 * 1000, // retention period in ms
                             3,                // number of segments
                             10 * 60 * 1000,   // window size in ms
                             false);           // retain duplicates
// New API

Stores.persistentWindowStore("storeName",
                             Duration.ofDay(1),      // retention period (updated from `long` to `Duration`)
                             // number of segments is removed
                             Duration.ofMinutes(10), // window size (updated from `long` to `Duration`)
                             false);                 // retain duplicates

順序外のデータの処理

複数の Kafka のトピックパーティションを処理する可能性のあるタスク内で、順序外のデータの処理方法をユーザーが指定できるように、新しい構成( max.task.idle.ms)が追加されました(詳細については、「順序外の処理」セクションを参照してください)。デフォルト値は 0 に設定されており、処理のレイテンシが最小限に抑えられます。順序外のデータを処理するリスクを軽減するために、タスクのトピックパーティションの一部にしかデータがない場合に処理を待つようにするには、この構成をオーバーライドして大きな値に設定します。

AdminClient メトリクスの公開

KafkaStream#metrics() メソッドの呼び出しで取得される利用可能なメトリクスに、新しく公開された AdminClient メトリクスが含められるようになりました。Streams アプリケーションのモニタリングの詳細については、「Streams アプリケーションのモニタリング」を参照してください。

トポロジーの記述の強化

<code>TopologyDescription</code> API がアップデートされ、ランタイムチェックが強化されました。<code>TopologyDescription.Source</code> ノードでは、<code>#topics()</code> が非推奨になったため、代わりに <code>#topicSet()</code> と <code>#topicPattern()</code> を適切に使用することをお勧めします。同様に、<code>TopologyDescription.Sink</code> ノードの記述を取得するには、<code>#topic()</code> と <code>#topicNameExtractor()</code> を使用します。

// old API

TopologyDescription.Source source = ... // get Source node from a TopologyDescription
TopologyDescription.Sink sink = ... // get Sink node from a TopologyDescription

String topics = source.topics(); // comma separated list of topic names or pattern (as String)

String topic = sink.topic(); // return the output topic name (never null)


// new API

TopologyDescription.Source source = ... // get Source node from a TopologyDescription
TopologyDescription.Sink sink = ... // get Sink node from a TopologyDescription


Set<String> topics = source.topicSet(); // set of all topic names (can be null if pattern subscription is uses)
// or
Pattern pattern = source.topicPattern(); // topic pattern (can be null)

String topic = sink.topic(); // return the output topic name (can be null if dynamic topic routing is used)
// or
TopicNameExtractor topicNameExtractor = sink.topicNameExtractor(); // return the use TopicNameExtractor (can be null)

StreamsBuilder#build メソッドのオーバーロード

Kafka Streams 2.0 で追加された StreamsConfig#TOPOLOGY_OPTIMIZATION 構成を使用するために、StreamsBuilder#build メソッドに java.util.Properties のインスタンスを受け取るオーバーロードが追加されました。2.1 より前のバージョンでは、DSL を使用してトポロジーを構築するとき、ユーザーが DSL で呼び出しを行うに従って Kafka Streams が物理プランを書き込みます。現在は、StreamsConfig#TOPOLOGY_OPTIMIZATION 構成が StreamsConfig#OPTIMIZE に設定されていれば、StreamsBuilder#build 呼び出しの実行時に java.util.Properties インスタンスを渡すことで、Kafka Streams でトポロジーの物理プランを最適化できます。StreamsConfig#OPTIMIZE を設定すると、ソーストピックを changelog トピックとして再利用するように KTable が最適化されるだけでなく、冗長な複数の再パーティショントピックを 1 つの再パーティショントピックにマージするようにトポロジーが最適化されます。トポロジーの最適化が不要な場合は、パラメーターを受け取らない元の StreamsBuilder#build を引き続き使用できます。トポロジーの最適化を有効にすると、アプリケーションを再度デプロイするときに、アプリケーションのリセットが必要になることがあります。詳細については、「Kafka Streams の最適化」を参照してください。

Confluent Platform 5.0 での Streams API の変更点

Confluent Platform 5.0.x リリースでは、いくつかの新しい Streams 構成とパブリックインターフェイスが追加されています。また、いくつかの非推奨の API が Confluent Platform 5.0.x リリースで削除されています。

スキップされたレコードのメトリクスのリファクタリング

Confluent Platform 5.0.0 以降では、Kafka Streams は skippedDueToDeserializationError-rateskippedDueToDeserializationError-total の各メトリクスを報告しません。

逆シリアル化エラーと、レコードがスキップされるその他の原因はすべて、既存のメトリクスである skipped-records-rateskipped-records-total に含められるようになりました。レコードがスキップされた場合、そのイベントは WARN レベルで記録されます。これらのメトリクスは主に、予期しないイベントをモニタリングするものです。系統的な問題が原因となって多数の処理できないレコードがスキップされ、警告ログが大量に生じる場合は、レコードをスキップするセマンティクスに頼るのではなく、処理できないレコードをフィルター処理することを検討する必要があります。詳細については、KIP-274 を参照してください。

現在、レコードがスキップされる原因には次のようなものがあります。

  • テーブルソース内の null キー。
  • テーブルとテーブルの内部結合、左結合、外部結合、右結合における null キー。
  • ストリームとテーブルの結合における null のキーまたは値。
  • ストリームとストリームの結合における null のキーまたは値。
  • グループ化されたストリームの集約、縮小、カウントにおける null のキーまたは値。
  • ウィンドウ化されたストリームの集約、縮小、カウントにおける null キー。
  • セッションウィンドウ化されたストリームの集約、縮小、カウントにおける null キー。
  • 結果の生成時にエラーが発生し、構成されている default.production.exception.handlerCONTINUE を返した場合(デフォルトは FAIL で例外をスロー)。
  • レコードの逆シリアル化中にエラーが発生し、構成されている default.deserialization.exception.handlerCONTINUE を返した場合(デフォルトは FAIL で例外をスロー)。これは、以前には skippedDueToDeserializationError メトリクスで捕捉されたケースです。
  • フェッチされたレコードに負のタイムスタンプが含まれていた場合。

ウィンドウストアインターフェイスの新しい関数

Confluent Platform で ReadOnlyWindowStore の新しいメソッドがサポートされ、単一のウィンドウのキーと値のペアを照会できるようになりました。このインターフェイスでウィンドウストアの実装をカスタマイズしている場合は、コードをアップデートして、新しく追加されたメソッドを実装する必要があります。詳細については、KIP-261 を参照してください。

KafkaStreams コンストラクターの簡素化

KafakStreams のコンストラクターが簡素化されました。ユーザーがボイラープレートの StreamsConfig オブジェクトを作成する必要はありません。代わりに、実際のユーザー構成を指定する Properties オブジェクトを、コンストラクターに直接渡すことができるようになりました。

StreamsBuilder builder = new StreamsBuilder();
// define processing logic
Topology topology = builder.build();

// or

Topology topology = new Topology();
// define processing logic


Properties props = new Properties();
// define configuration


// old API

KafkaStream stream = new KafkaStreams(topology, new StreamsConfig(props));
KafkaStream stream = new KafkaStreams(topology, new StreamsConfig(props), /* pass in KafkaClientSupplier or Time */);


// new API

KafkaStream stream = new KafkaStreams(topology, props);
KafkaStream stream = new KafkaStreams(topology, props, /* pass in KafkaClientSupplier or Time */);

シンクでのダイナミックルーティングのサポート

このリリースでは、レコードをダイナミックに Kafka のトピックにルーティングできるようになりました。具体的には、低レベルの Topology#addSink と高レベルの KStream#to の両方の API に、String 型の固有のトピック名ではなく TopicNameExtractor インスタンスを受け取るバリアントが追加されました。TopicNameExtractor は、アップストリームプロセッサーから受信したレコードごとに、レコードのキーと値およびレコードのコンテキストに基づいて、どの Kafka のトピックに書き込むかをダイナミックに決定します。すべての出力 Kafka のトピックはユーザートピックと見なされるため、事前に作成されている必要があります。また、StreamPartitioner インターフェイスが変更され、トピック名のパラメーターが追加されました。これは、トピック名が事前に知られているとは限らないためです。このインターフェイスの実装をカスタマイズしている場合は、アプリケーションのアップグレード時にコードをアップデートする必要があります。

メッセージヘッダーのサポート

このリリースでは、Processor API でメッセージヘッダーがサポートされます。具体的には、新しい API ProcessorContext#headers() が追加されています。この API は、処理中のソーストピックのメッセージヘッダーを追跡する Headers オブジェクトを返します。ユーザーは、このオブジェクトを通じて、プロセッサートポロジー全体に伝達されているヘッダーのマップを操作することもできます。たとえば、Headers#add(String key, byte[] value)Headers#remove(String key) を使用できます。Streams DSL を使用すると、ユーザーは processtransform を呼び出すことができます。その中で ProcessorContext にアクセスして、メッセージヘッダーにアクセスして操作することもできます。ユーザーがヘッダーを操作しない場合もヘッダーは保持され、レコードがプロセッサートポロジーを通過するのに伴って転送されます。結果のレコードがシンクトピックに送信されるとき、保持されたメッセージヘッダーも送信されるレコード内にエンコードされます。

KTable での値の変換のサポート

このリリースでは、もう 1 つの新しい API、KTable#transformValues が追加されました。詳細については、KIP-292 を参照してください。

ウィンドウ化された Serde のサポートの強化

ヘルパークラス WindowedSerdes が追加され、ウィンドウのシリアル化と逆シリアル化の仕組みを知らなくても、時間やセッションでウィンドウ化された serde を作成できるようになりました。作成されたウィンドウ serde によって、内部のキーまたは値のデータ型に対してユーザーが提供する serde がラップされます。さらに、2 つの新しい構成 default.windowed.key.serde.innerdefault.windowed.value.serde.inner が追加され、ウィンドウ化された型の内部で使用されるデフォルトのキーおよび値の serde を指定できるようになりました。ただし、これらの新しい構成は、default.key.serde または default.value.serde で、ウィンドウ化された serde( WindowedSerdes.TimeWindowedSerde または WindowedSerdes.SessionWindowedSerde )が指定されている場合にのみ有効になります。

タイムスタンプの操作に対応

Processor API を使用して、出力メッセージのタイムスタンプを明示的に設定できるようになりました。この変更により、ProcessorContext#forward() メソッドがアップデートされています。いくつかの既存のメソッドが非推奨になり、新しいメソッドに置き換えられました。特に、レコードをインデックスに基づいてダウンストリームプロセッサーに送信することはできなくなりました。

// old API

public class MyProcessor implements Processor<String, Integer> {
  private ProcessorContext context;

  @Override
  public void init(ProcessorContext context) {
    this.context = context;
  }

  @Override
  public void process(String key, Integer value) {
    // do some computation

    // send record to all downstream processors
    context.forward(newKey, newValue);
    // send record to particular downstream processors (if it exists; otherwise drop record)
    context.forward(newKey, newValue, "downstreamProcessorName");
    // send record to particular downstream processors per index (throws if index is invalid)
    int downStreamProcessorIndex = 2;
    context.forward(newKey, newValue, downstreamProcessorIndex);
  }

  @Override
  public void close() {} // nothing to do
}


// new API

public class MyProcessor implements Processor<String, Integer> {
  // omit other methods that don't change for brevity

  @Override
  public void process(String key, Integer value) {
    // do some computation

    // send record to all downstream processors
    context.forward(newKey, newValue); // same as old API
    context.forward(newKey, newValue, To.all()); // new; same as line above
    // send record to particular downstream processors (if it exists; otherwise drop record)
    context.forward(newKey, newValue, To.child("downstreamProcessorName"));
    // send record to particular downstream processors per index (throws if index is invalid)
    // -> not supported in new API

    // new: set record timestamp
    long outputRecordTimestamp = 42L;
    context.forward(newKey, newValue, To.all().withTimestamp(outputRecordTimestamp));
    context.forward(newKey, newValue, To.child("downstreamProcessorName").withTimestamp(outputRecordTimestamp));
  }
}

パブリックな Test-Utils アーティファクト

Confluent Platform に、kafka-streams-test-uitls アーティファクトが付属するようになりました。このアーティファクトには、Kafka Streams アプリケーションの単体テストを行うユーティリティクラスが含まれています。詳細については、「Streams コードのテスト」のセクションを参照してください。

Scala API

Confluent Platform に、Kafka Streams 用の Apache Kafka Scala API が付属するようになりました。次のように、Scala 2.11 または 2.12 のアーティファクトへの依存関係を追加できます。

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-streams-scala_2.11</artifactId>
    <!-- or Scala 2.12
    <artifactId>kafka-streams-scala_2.12</artifactId>
    -->
    <version>2.0.0-cp1</version>
</dependency>

非推奨の API の削除

以下の API は非推奨で、Confluent Platform 5.0.0 で削除されました。

  1. KafkaStreams#toString は、トポロジーとランタイムメタデータを返さなくなりました。トポロジーメタデータを取得するには Topology#describe() を呼び出し、スレッドのランタイムメタデータを取得するには KafkaStreams#localThreadsMetadata を呼び出すことができます(Confluent Platform 4.0.0 から非推奨)。コードをアップデートする方法の詳しいガイダンスについては、 こちら を参照してください。
  2. TopologyBuilderKStreamBuilder は削除され、それぞれ TopologyStreamsBuidler に置き換えられました(Confluent Platform 4.0.0 から非推奨)。
  3. StateStoreSupplier は削除され、StoreBuilder に置き換えられました(Confluent Platform 4.0.0 から非推奨)。対応する Stores#create と、これを使用する KStream、KTable、KGroupedStream のオーバーロード関数も削除されました。
  4. serde などの指定を明示的に必要とする KStream、KTable、KGroupedStream のオーバーロード関数が削除され、Consumed, Produced, Serialized, Materialized, Joined を使用する類似するオーバーロード関数に置き換えられました(Confluent Platform 4.0.0 から非推奨)。
  5. Processor#punctuateValueTransformer#punctuateValueTransformer#punctuate 、および RecordContext#schedule(long) は削除され、RecordContext#schedule(long, PunctuationType, Punctuator) に置き換えられました(Confluent Platform 4.0.0 から非推奨)。
  6. ProcessorContext#registerboolean 型の第 2 パラメーター loggingEnabled が削除されました。ステートストアの作成時に StoreBuilder#withLoggingEnabled, #withLoggingDisabled を使用すると、同じ動作を指定できます(Confluent Platform 3.3.0 から非推奨)。
  7. KTable#writeAs、#print、#foreach、#to、#through は、セマンティクスが有用性よりも混乱を招くという理由から削除されました。代わりに、同じ目的で KTable#tostream()#writeAs などを呼び出すことができます(Confluent Platform 3.3.0 から非推奨)。
  8. StreamsConfig#KEY_SERDE_CLASS_CONFIG、#VALUE_SERDE_CLASS_CONFIG、#TIMESTAMP_EXTRACTOR_CLASS_CONFIG は削除され、それぞれ StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG, #DEFAULT_VALUE_SERDE_CLASS_CONFIG, #DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG に置き換えられました(Confluent Platform 3.3.0 から非推奨)。
  9. Streams で ZooKeeper の依存関係が不要になったため、 StreamsConfig#ZOOKEEPER_CONNECT_CONFIG は削除されました(Confluent Platform 3.2.0 から非推奨)。

Confluent Platform 4.1 での Streams API の変更点

Confluent Platform 4.1.x リリースでは、いくつかの新しい Streams 構成とパブリックインターフェイスが追加されています。

bin/kafka-streams-application-reset の変更点

KIP-171 に従って、リセットする入力トピックのオフセットを指定するオプションが追加されました。

組み込みの管理クライアント構成

Streams アプリケーションの内部で、組み込みの管理クライアントをカスタマイズできるようになりました。管理クライアントは、内部トピックの作成など、あらゆる管理リクエストを Kafka ブローカーに送信するために使用されます。カスタマイズは、追加された KafkaClientSupplier#getAdminClient(Map<String, Object>) インターフェイスを通じて行われます。たとえば、統合テストで独自の AdminClient の実装を提供して、デフォルトの実装をオーバーライドできます。また、返される AdminClient を構成するために、KafkaClientSupplier#getAdminClient(Map<String, Object>) に渡される構成をオーバーライドすることもできます。このようなオーバーライド構成を指定するには、StreamsConfig#adminClientPrefix(String) でプレフィックスを付けた管理構成を StreamsConfig に追加します。管理クライアント構成でない構成はすべて無視されます。

以下に例を示します。

Properties streamProps = ...;
// use retries=10 for the embedded admin client
streamsProps.put(StreamsConfig.adminClientPrefix("retries"), 10);

Confluent Platform 4.0 での Streams API の変更点

Kafka Streams とその API は、Confluent Platform 4.0.x リリースで改良および変更されています。これらの変更はすべて後方互換性があるため、Kafka Streams アプリケーションのコードをすぐにアップデートする必要はありません。ただし、一部のメソッドが非推奨になったため、今後のアップグレードに対応できるように、最終的にはコードをアップデートすることをお勧めします。このセクションでは、非推奨になった API について説明します。

トポロジーの構築と実行

トポロジーを指定する 2 つの主なクラス、KStreamBuilderTopologyBuilder が非推奨になり、StreamsBuilderTopology に置き換えられました。新しいクラスは両方ともパッケージ org.apache.kafka.streams に含まれています。 StreamsBuilderTopology の拡張ではなく、クラス階層が別になっていることに注意してください。この変更の影響は KafkaStreams のコンストラクターにも及び、コンストラクターは Topology のみを受け入れるようになりました。StreamsBuilder を使用する場合は、構築されたトポロジーを StreamsBuilder#build() で取得できます。

新しいクラスには基本的に、DSL または Processor API を通じてトポロジーを構築する、以前と同じメソッドが用意されています。ただし、KStreamBuilderTopologyBuilder では、一部の内部メソッドがパブリックになっていましたが、実際の API を構成しているわけではありませんでした。このような内部メソッドは、新しいクラスには含められなくなりました。

// old API

KStreamBuilder builder = new KStreamBuilder(); // for DSL
// or
TopologyBuilder builder = new TopologyBuilder(); // for Processor API

Properties props = new Properties();
KafkaStreams streams = new KafkaStreams(builder, props);

// new API

StreamsBuilder builder = new StreamsBuilder(); // for DSL
// ... specify computational logic
Topology topology = builder.build();
// or
Topology topology = new Topology(); // for Processor API

Properties props = new Properties();
KafkaStreams streams = new KafkaStreams(topology, props);

トポロジーとストリームタスクのメタデータの記述

KafkaStreams#toString()KafkaStreams#toString(final String indent) は、これまでユーザーが指定したプロセッサートポロジー情報と、ランタイムのストリームタスクのメタデータを取得するために使用されていましたが、4.0.0 で非推奨になりました。代わりに、KafkaStreams に新しいメソッド localThreadsMetadata() が追加されました。このメソッドは、各ローカルストリームスレッドについて、スレッドのランタイムステートと現在割り当てられているタスクのメタデータを表す org.apache.kafka.streams.processor.ThreadMetadata オブジェクトを返します。このような情報は、ストリームアプリケーションのデバッグやモニタリングを行う場合に非常に役立ちます。指定したプロセッサートポロジーの情報を取得するには、Topology#describe() を呼び出すことができます。これにより、トポロジーの詳細な記述を含む org.apache.kafka.streams.TopologyDescription オブジェクトが返されます(DSL ユーザーは、最初に StreamsBuilder#build() を呼び出して Topology オブジェクトを取得する必要があります)。

KStreams のマージ

上記のとおり、KStreamBuilder は非推奨になり、StreamsBuilder に置き換えられました。さらに、KStreamBuilder#merge(KStream...)KStream#merge(KStream) に置き換えられたため、StreamsBuilder には merge() メソッドがありません。以前の API は任意の数の KStream インスタンスを単一の KStream にマージしていましたが、新しい #merge() メソッドは異なり、1 つの KStream だけを受け取ります。つまり、2 つの KStream インスタンスを 1 つにマージします。3 つ以上の KStream インスタンスをマージする場合は、KStream#merge() を複数回呼び出すことができます。

// old API

KStreamBuilder builder = new KStreamBuilder();

KStream<Long, String> firstStream = ...;
KStream<Long, String> secondStream = ...;
KStream<Long, String> thirdStream = ...;

KStream<Long, String> mergedStream = builder.merge(
  firstStream,
  secondStream,
  thirdStream);

// new API

StreamsBuilder builder = new StreamsBuilder();

KStream<Long, String> firstStream = ...;
KStream<Long, String> secondStream = ...;
KStream<Long, String> thirdStream = ...;

KStream<Long, String> mergedStream = firstStream.merge(secondStream)
                                                .merge(thirdStream);

Punctuation 関数

Processor API は、ユーザーが イベント時刻PunctuationType.STREAM_TIME )または "ウォールクロック時刻"( PunctuationType.WALL_CLOCK_TIME )に基づいて punctuate 関数をスケジュールできるように拡張されました。これまでは、"イベント時刻" に基づいてスケジュールすることしかできなかったため、punctuate 関数は常にデータドリブンでした。この変更によって、元の ProcessorContext#schedule は非推奨になり、新しいオーバーロード関数に置き換えられます。さらに、Processor 内の punctuate 関数も非推奨になり、新しく追加された Punctuator#punctuate インターフェイスに置き換えられます。

// old API (punctuate defined in Processor, and schedule only with stream-time)

public class WordCountProcessor implements Processor<String, String> {

    private ProcessorContext context;
    private KeyValueStore<String, Long> kvStore;

    @Override
    @SuppressWarnings("unchecked")
    public void init(ProcessorContext context) {
        // keep the processor context locally because we need it in punctuate() and commit()
        this.context = context;

        // call this processor's punctuate() method every 1000 milliseconds
        this.context.schedule(1000);

        // retrieve the key-value store named "Counts"
        kvStore = (KeyValueStore) context.getStateStore("Counts");
    }

    @Override
    public void punctuate(long timestamp) {
        KeyValueIterator<String, Long> iter = this.kvStore.all();
        while (iter.hasNext()) {
            KeyValue<String, Long> entry = iter.next();
            context.forward(entry.key, entry.value.toString());
        }
        iter.close();

        // commit the current processing progress
        context.commit();
    }

    // .. other functions
}


// new API (punctuate defined in Punctuator, and schedule can be either stream-time or wall-clock-time)

public class WordCountProcessor implements Processor<String, String> {

    private ProcessorContext context;
    private KeyValueStore<String, Long> kvStore;

    @Override
    @SuppressWarnings("unchecked")
    public void init(ProcessorContext context) {
        // keep the processor context locally because we need it in punctuate() and commit()
        this.context = context;

        // retrieve the key-value store named "Counts"
        kvStore = (KeyValueStore) context.getStateStore("Counts");

        // schedule a punctuate() method every 1000 milliseconds based on stream time
        this.context.schedule(1000, PunctuationType.STREAM_TIME, (timestamp) -> {
            KeyValueIterator<String, Long> iter = this.kvStore.all();
            while (iter.hasNext()) {
                KeyValue<String, Long> entry = iter.next();
                context.forward(entry.key, entry.value.toString());
            }
            iter.close();

            // commit the current processing progress
            context.commit();
        });
    }

    // .. other functions
}

Streams の構成

内部の再パーティショントピックと changelog トピックの作成に使用される構成をオーバーライドできるようになりました。これらの構成を指定するには、StreamsConfig#topicPrefix(String) でプレフィックスを付けたトピック構成を StreamsConfig に追加します。内部トピックの作成時には、StreamsConfig に含まれている、このプレフィックスを持つすべてのプロパティが適用されます。トピック構成でない構成はすべて無視されます。既に StateStoreSupplier または Materialized を使用して changelogs の構成を指定している場合は、そちらが優先されます。

以下に例を示します。

Properties streamProps = ...;
// use cleanup.policy=delete for internal topics
streamsProps.put(StreamsConfig.topicPrefix("cleanup.policy"), "delete");

オプションの DSL パラメーター用の新しいクラス

DSL でのオーバーロードを減らすことができるように、SerializedConsumedProduced など、いくつかの新しいクラスが導入されました。これらのクラスには通常、Serialized.with(Serdes.Long(), Serdes.String()) のように、インスタンスを作成する静的メソッド with が用意されています。

Scala ユーザーは、with をバッククォートで囲む必要があることに注意してください。

以下に例を示します。

// When using Scala: enclose "with" with backticks
Serialized.`with`(Serdes.Long(), Serdes.String())

Confluent Platform 3.3 での Streams API の変更点

Kafka Streams とその API は、Confluent Platform 3.2.x リリースから改良および変更されています。これらの変更はすべて後方互換性があるため、Kafka Streams アプリケーションのコードをすぐにアップデートする必要はありません。ただし、一部のメソッドと構成パラメーターが非推奨になったため、今後のアップグレードに対応できるように、最終的にはコードをアップデートすることをお勧めします。このセクションでは、非推奨になった API について説明します。

Streams の構成

以下の構成パラメーターの名前が変更され、以前の名前は非推奨になりました。

  • key.serdedefault.key.serde に変更
  • value.serdedefault.value.serde に変更
  • timestamp.extractordefault.timestamp.extractor に変更

このため、StreamsConfig#KEY_SERDE_CONFIGStreamsConfig#VALUE_SERDE_CONFIG、および StreamsConfig#TIMESTAMP_EXTRACTOR_CONFIG も非推奨になりました。

また、メソッドに以下の変更が加えられました。

  • メソッド keySerde() が非推奨になり、defaultKeySerde() に移行
  • メソッド valueSerde() が非推奨になり、defaultValueSerde() に移行
  • 新しいメソッド defaultTimestampExtractor() を追加

ローカルタイムスタンプエクストラクター

Streams API は、ストリームやテーブルごとのタイムスタンプエクストラクターをユーザーが指定できるように拡張されました。これにより、異なるストリームやテーブルに対して異なるタイムスタンプエクストラクターロジックを使用する手順が簡単になります。これまで、異なる入力トピックに異なるロジックを適用するには、デフォルトのタイムスタンプエクストラクター内で if-then-else パターンを使用する必要がありました。以前の動作では不要な依存関係が生じ、コードのモジュール性とコードの再利用が制限される結果となっていました。

この新機能を有効にするために、KStreamBuilder#stream()KStreamBuilder#table()KStream#globalTable()TopologyBuilder#addSource()、および TopologyBuilder#addGlobalStore() の各メソッドに新しいオーバーロードが追加され、対応する入力トピックに単独で適用される "ローカル" なタイムスタンプエクストラクターを指定できるようになりました。

// old API (single default TimestampExtractor that is applied globally)

public class MyTimestampExtractor implements TimestampExtractor {
    @Override
    public long extract(ConsumerRecord record, long previousTimestamp) {
        long timestamp;

        String topic = record.topic();
        switch (topic) {
            case "streamInputTopic":
                timestamp = record.value().getDataTimestamp(); // assuming that value type has a method #getDataTimestamp()
                break;
            default:
                timestamp = record.timestamp();
        }

        if (timestamp < 0) {
            throw new RuntimeException("Invalid negative timestamp.");
        }

        return timestamp;
    }
}

KStreamBuilder builder = new KStreamBuilder();
KStream stream = builder.stream(keySerde, valueSerde, "streamInputTopic");
KTable table= builder.table("tableInputTopic");

Properties props = new Properties(); // omitting mandatory configs for brevity
// set MyTimestampExtractor as global default extractor for all topics
config.set("default.timestamp.extractor", MyTimestampExtractor.class);

KafkaStreams streams = new KafkaStreams(builder, props);


// new API (custom TimestampExtractor for topic "streamInputTopic" only; returns value embedded timestamp)

public class StreamTimestampExtractor implements TimestampExtractor {
    @Override
    public long extract(ConsumerRecord record, long previousTimestamp) {
        long timestamp = record.value().getDataTimestamp(); // assuming that value type has a method #getDataTimestamp()

        if (timestamp < 0) {
            throw new RuntimeException("Invalid negative timestamp.");
        }

        return timestamp;
    }
}

KStreamBuilder builder = new KStreamBuilder();
// set StreamTimestampExtractor explicitly for "streamInputTopic"
KStream stream = builder.stream(new StreamTimestampExtractor(), keySerde, valueSerde, "streamInputTopic");
KTable table= builder.table("tableInputTopic");

Properties props = new Properties(); // omitting mandatory configs for brevity

KafkaStreams streams = new KafkaStreams(builder, props);

KTable の変更点

KTable インターフェイスでは、以下のメソッドが非推奨になりました。

  • void foreach(final ForeachAction<? super K, ? super V> action)
  • void print()
  • void print(final String streamName)
  • void print(final Serde<K> keySerde, final Serde<V> valSerde)
  • void print(final Serde<K> keySerde, final Serde<V> valSerde, final String streamName)
  • void writeAsText(final String filePath)
  • void writeAsText(final String filePath, final Serde<K> keySerde, final Serde<V> valSerde)
  • void writeAsText(final String filePath, final String streamName)
  • void writeAsText(final String filePath, final String streamName, final Serde<K> keySerde, final Serde<V> valSerde)

これらのメソッドは非推奨になり、代わりに 対話型クエリ API を使用することをお勧めします。

KTable を支えるステートストアの現在のコンテンツに対してクエリを実行するには、次のアプローチを使用します。

  • KafkaStreams.store(String storeName, QueryableStoreType<T> queryableStoreType) を呼び出した後、ReadOnlyKeyValueStore.all() を呼び出して、KTable のすべてのキーを反復処理します。

KTable の changelog ストリームを表示するには、次のような操作を実行します。

  • KTable.toStream() を呼び出した後、KStream#print() を呼び出します。

Confluent Platform 3.2 での Streams API の変更点

Kafka Streams とその API は、Confluent Platform 3.1.x リリースから改良および変更されています。これらの変更のいくつかは以前の動作と互換性がなく、Kafka Streams アプリケーションのコードのアップデートが必要になります。このセクションでは、このような影響を伴う変更についてのみ取り上げます。

負のタイムスタンプの処理とタイムスタンプエクストラクターインターフェイス

無効な(負の)タイムスタンプに関する Kafka Streams の動作が改善されました。デフォルトでは、以前と同様に、無効なタイムスタンプが検出されると例外が発生します。ただし、アプリケーションを再構成すると、無効なタイムスタンプに対して以前にはできなかった穏便な対応が可能になります。

変更された TimestampExtractor インターフェイスには以前との互換性がないため、カスタムのタイムスタンプエクストラクターを使用しない場合でも、アプリケーションコードの再コンパイルが必要です。

負のタイムスタンプに関する Kafka Streams の内部の動作も変更されています。タイムスタンプエクストラクターが負のタイムスタンプを返した場合、例外が生成されるのではなく、対応するレコードが警告なしで破棄され、処理から除外されるようになりました。これにより、有効なタイムスタンプを返さないレコードがあっても、その数が少なければトピックを処理することができます。さらに、TimestampExtractor インターフェイスが変更され、パラメーターが 1 つ追加されました。このパラメーターにはタイムスタンプを指定します。それは、たとえば、現在のレコードから有効なタイムスタンプを直接抽出できない場合に、推定のタイムスタンプを返すために使用されます。

以前のデフォルトのタイムスタンプエクストラクター ConsumerRecordTimestampExtractorFailOnInvalidTimestamp に置き換えられ、レコードの組み込みのタイムスタンプを抽出する 2 つの新しいエクストラクター( LogAndSkipOnInvalidTimestamp および UsePreviousTimeOnInvalidTimestamp )が追加されました。新しいデフォルトのエクストラクター( FailOnInvalidTimestamp )では、レコードの組み込みのタイムスタンプが負の場合に例外が生成され、Kafka Streams のデフォルトの動作(負のタイムスタンプでフェイルファスト)が維持されます。新しく追加された 2 つのエクストラクターは、記録してスキップする戦略とタイムスタンプを推定する戦略を実装し、負のタイムスタンプをより穏便に処理できます。

// old interface
public class TimestampExtractor {
  // returning -1 results in an exception
  long extract(ConsumerRecord<Object, Object> record);
}

// new interface
public class TimestampExtractor {
  // provides a timestamp that could be used as a timestamp estimation,
  // if no valid timestamp can be extracted from the current record
  //
  // allows to return -1, which tells Kafka Streams to not process the record (it will be dropped silently)
  long extract(ConsumerRecord<Object, Object> record, long previousTimestamp);
}

メトリクス

StreamsMetrics インターフェイスを実装してカスタムメトリクスを提供している場合、このインターフェイスには以前よりも高精度のメトリクスを登録できる新しいメソッドが多数追加されているため、コードのアップデートが必要です。詳細については、KIP-114 を参照してください。

// old interface
public interface StreamsMetrics {
  // Add the latency sensor.
  Sensor addLatencySensor(String scopeName, String entityName, String operationName, String... tags);

  // Record the given latency value of the sensor.
  void recordLatency(Sensor sensor, long startNs, long endNs);
}

// new interface
public interface StreamsMetrics {
  // Get read-only handle on global metrics registry.
  Map<MetricName, ? extends Metric> metrics();

  // Add a latency and throughput sensor for a specific operation
  Sensor addLatencyAndThroughputSensor(final String scopeName,
                                       final String entityName,
                                       final String operationName,
                                       final Sensor.RecordingLevel recordingLevel,
                                       final String... tags);

  // Record the given latency value of the sensor.
  void recordLatency(final Sensor sensor,
                     final long startNs,
                     final long endNs);

  // Add a throughput sensor for a specific operation:
  Sensor addThroughputSensor(final String scopeName,
                             final String entityName,
                             final String operationName,
                             final Sensor.RecordingLevel recordingLevel,
                             final String... tags);

  // Record the throughput value of a sensor.
  void recordThroughput(final Sensor sensor,
                        final long value);

  // Generic method to create a sensor.
  Sensor addSensor(final String name,
                   final Sensor.RecordingLevel recordingLevel);

  // Generic method to create a sensor with parent sensors.
  Sensor addSensor(final String name,
                   final Sensor.RecordingLevel recordingLevel,
                   final Sensor... parents);

  // Remove a sensor.
  void removeSensor(final Sensor sensor);
}

Scala

0.10.2.0 以降では、アプリケーションが Scala で記述されている場合、コードをコンパイルするためには型の明示的な宣言が必要になることがあります。StreamToTableJoinScalaIntegrationTest には、戻り変数の型を明示的に宣言する例が含まれています。

Confluent Platform 3.1 での Streams API の変更点

ストリームのグループ化と集約

グループ化(再パーティション化)と集約を行う KStream API は、KTable API に合わせて大幅に変更されました。グループ化と集約は、多数のパラメーターを持つ 1 つのメソッドを使用するのではなく、2 つのステップに分かれる形になりました。まず、KStreamKGroupedStream に変換されます。これは、元の KStream の再パーティション化されたコピーです。その後、KGroupedStream に対して集約を実行すると、集約結果を含む新しい KTable を得ることができます。

このため、メソッド KStream#aggregateByKey(...)KStream#reduceByKey(...)、および KStream#countByKey(...) は、KGroupedStream を返す KStream#groupBy(...)KStream#groupByKey(...) に置き換えられました。KStream#groupByKey(...) は現在のキーでグループ化するのに対して、KStream#groupBy(...) は新しいキーを設定し、データを再パーティション化して、新しいキーでグループを構築します。新しいクラス KGroupedStream は、それぞれに対応するメソッド aggregate(...)reduce(...)、および count(...) を提供します。

KStream stream = builder.stream(...);
Reducer reducer = new Reducer() { /* ... */ };

// old API
KTable newTable = stream.reduceByKey(reducer, name);

// new API, Group by existing key
KTable newTable = stream.groupByKey().reduce(reducer, name);
// or Group by a different key
KTable otherTable = stream.groupBy((key, value) -> value).reduce(reducer, name);

自動再パーティション化

以前には、キーが変更される KStream#map(...)KStream#flatMap(...)KStream#selectKey(...) のような操作の後に KStream#join(...)KStream#outerJoin(...)、または KStream#leftJoin(...) 操作を実行する場合、開発者は KStream#through(...) を呼び出して、マップされた KStream を再パーティション化する必要がありましたが、これは不要になりました。現在、再パーティション化はすべての結合操作で自動的に行われます。

KStream streamOne = builder.stream(...);
KStream streamTwo = builder.stream(...);
KeyValueMapper selector = new KeyValueMapper() { /* ... */ };
ValueJoiner joiner = new ValueJoiner { /* ... */ };
JoinWindows windows = JoinWindows.of("the-join").within(60 * 1000);

// old API
KStream oldJoined = streamOne.selectKey(selector)
                             .through("repartitioned-topic")
                             .join(streamTwo,
                                   joiner,
                                   windows);

// new API
KStream newJoined = streamOne.selectKey((key,value) -> value)
                             .join(streamTwo,
                                   joiner,
                                   windows);

TopologyBuilder

TopologyBuilder で、2 つのパブリックメソッド TopologyBuilder#sourceTopics(String applicationId)TopologyBuilder#topicGroups(String applicationId) のシグネチャが変更されました。これらのメソッドは、パラメーターとして applicationId を受け取らなくなりました。代わりに、メソッドを呼び出す前に TopologyBuilder#setApplicationId(String applicationId) を呼び出す必要があります。

TopologyBuilder builder = new TopologyBuilder();
...

// old API
Set<String> topics = topologyBuilder.sourceTopics("applicationId");
Map<Integer, TopicsInfo> topicGroups = topologyBuilder.topicGroups("applicationId");

// new API
topologyBuilder.setApplicationId("applicationId");
Set<String> topics = topologyBuilder.sourceTopics();
Map<Integer, TopicsInfo> topicGroups = topologyBuilder.topicGroups();

DSL: ステートストア名を指定する新しいパラメーター

Apache Kafka 0.10.1 では、Kafka Streams の対話型クエリ が導入され、Kafka Streams アプリケーションのステートストアに対してクエリを直接実行できるようになりました。この新機能のために、DSL の演算子にいくつかの変更が加えられました。Kafka 0.10.1 以降では、ステートストアに常に "名前を付ける" 必要があります。これは、明示的に使用されるステートストア(ユーザー定義のストアなど)でも、内部使用のステートストア(count() のような操作によって内部的に作成されるストアなど)でも同様です。この名前付けは、ステートストアをクエリ可能にするための前提条件です。この結果、以前の "オペレーター名" はステートストア名に変更されました。この変更は、KStreamBuilder#table(...) と、"ウィンドウ化" された集約操作 KGroupedStream#count(...)#reduce(...)、および #aggregate(...) に影響します。

// old API
builder.table("topic");
builder.table(keySerde, valSerde, "topic");

table2 = table1.through("topic");

stream.countByKey(TimeWindows.of("windowName", 1000)); // window has a name

// new API
builder.table("topic", "storeName"); // requires to provide a store name to make KTable queryable
builder.table(keySerde, valSerde, "topic", "storeName"); // requires to provide a store name to make KTable queryable

table2 = table1.through("topic", "storeName"); // requires to provide a store name to make KTable queryable

// for changes of countByKey() -> groupByKey().count(...), please see example above
// for changes of TimeWindows.of(...), please see example below
stream.groupByKey().count(TimeWindows.of(1000), "countStoreName"); // window name removed, store name added

ウィンドウ化

JoinWindows の API が改良されました。デフォルトサイズ(ゼロ)のウィンドウを定義することはできなくなりました。また、ウィンドウに名前が与えられなくなりました。正確には、このような名前付けはすべてステートストアで行われます。上の「DSL: ステートストア名を指定する新しいパラメーター」のセクションを参照してください。

// old API
JoinWindows.of("name"); // defines window with size zero
JoinWindows.of("name").within(60 * 1000L);

TimeWindows.of("name", 60 * 1000L);
UnlimitedWindows.of("name", 60 * 1000L);

// new API, no name, requires window size
JoinWindows.of(0); // no name; set window size explicitly to zero
JoinWindows.of(60 * 1000L); // no name

TimeWindows.of(60 * 1000L); // not required to specify a name anymore
UnlimitedWindows.of(); // not required to specify a name anymore