Kafka Streams トポロジーの最適化

最適化に関する FAQ

このセクションでは、Kafka Streams アプリケーションでの最適化の使用方法に関するクイックガイダンスを提供します。最適化の詳細については後のセクションで説明します。

最適化とは何ですか?
Kafka Streams の最適化とは、Kafka Streams アプリケーションの初期構造に基づいてトポロジーを再編成することで、自動的に Kafka Streams アプリケーションの効率を高める試みです。現在実行できる最適化には 2 種類があります。1 つ目は、入力トピックから直接作成された KTable で、ソーストピックを changelog トピックとして再利用するものです。2 つ目の最適化は、キーの変更を伴う操作の後に複数のグループ化または結合操作が行われる場合に、複数の再パーティショントピックを 1 つの再パーティショントピックにマージするものです。
既存のアプリケーションで最適化を有効にする必要はありますか?
これは状況しだいです。ソース KTable プロセッサーがある場合や、キーを変更した後でグループ化操作、集約操作、または結合を行っている場合、答えは "はい" になります。これらのどちらのシナリオもアプリケーションに当てはまらない場合、現時点では最適化を有効にする必要はありませんが、有効にしても問題はなく、トポロジーへの影響もありません。
既存のアプリケーションで最適化を有効にするにはどうすればよいですか?

最適化を有効にするには、2 つの手順を実行する必要があります。まず、プロパティに properties.setProperty(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.OPTIMIZE); という行を追加します。次に、KafkaStreams インスタンスを構築するとき、トポロジーの作成時に、オーバーロードされた StreamsBuilder.build(Properties) メソッドを使用して構成プロパティを渡す必要があります。たとえば、KafkaStreams myStream = new KafkaStreams(streamsBuilder.build(properties), properties) とします。

最適化はデフォルトで無効になっているため、最適化を有効にしない場合は何もする必要はありません。

新しいアプリケーションで最適化を有効にする必要はありますか?
既存のアプリケーションと同じガイダンスが適用されますが、異なる点として、 Streams リセットツール を使用する必要はありません。最初から最適化を有効にする場合は、ローリングデプロイを行うことをお勧めします。
既存のアプリケーションで最適化を有効にしました。追加で必要な手順はありますか?

既存のアプリケーションではローリングアップグレードを実行できません。以下の手順を実行する必要があります。

  1. すべてのアプリケーションインスタンスを停止します。
  2. すべてのアプリケーションインスタンスで、前述のように構成をアップデートします。
  3. Streams リセットツール を実行して、データを再処理します。これは厳密には必須ではありませんが、マージされた再パーティショントピックには未処理のデータが含まれていることがあるため、データを再処理するとレコードの損失を防ぐことができます。すべてのレコードを処理することがビジネスニーズの要件でない場合は、この再処理の手順を省略できます。

最初の再デプロイの後、アプリケーションの最適化に追加の変更が加えられることはありません。また、最初の再デプロイの後に最適化可能なプロセッサーを追加した場合、再パーティショントピックは既にマージされることになっているため、追加で必要な手順はありません。

最適化の詳細

Confluent 5.1 では Kafka Streams に新機能が追加され、構成フラグ StreamsConfig.TOPOLOGY_OPTIMIZATIONStreamsConfig.OPTIMIZE に設定することで、ユーザーが Kafka Streams フレームワークによるトポロジーの最適化を有効にできるようになりました。StreamsConfig.TOPOLOGY_OPTIMIZATION のデフォルト設定は StreamsConfig.NO_OPTIMIZATION です。したがって、最適化を有効にしない場合は何もする必要はありません。最適化は DSL でのみ利用できます。 Processor API では、ユーザー独自の最適化を有効にするために必要な柔軟性が既に提供されているため、最適化の構成は Processor API には必要ありません。

利用可能な最適化について説明する前に、この最適化機能が追加された背景を紹介します。CP 5.1/AK 2.1 より前のバージョンでは、DSL を使用してトポロジーを構築するとき、 DSL の各呼び出しが行われるとすぐに、Kafka Streams でトポロジーの物理プランのパーツが構築されていました。物理プランの構築をすぐに開始すると、トポロジーの効率を高めるための変更を加える機会が失われます。

現在は、DSL でトポロジーを構築するとき、各呼び出しはトポロジーの中間表現または論理プランにキャプチャされ、有向非巡回グラフ(DAG)として表されます。トポロジーの物理プランは、StreamsBuilder.build(properties) 呼び出しが実行されるまで開始されません。論理プランを物理プランに変換する過程で、適用できる最適化がすべて適用されます。引数なしの StreamsBuilder.build() メソッドを使用すると、Kafka Streams はトポロジーの最適化を一切行いません。

現在、Kafka Streams で実行される最適化(有効な場合)には以下の 2 つがあります。

  1. ソース KTable で、ソーストピックを changelog トピックとして再利用します。
  2. 可能であれば、Kafka Streams で複数のパーティショントピックを 1 つの再パーティショントピックにまとめます。

最初の最適化は単純なもので、これ以上説明することはありません。2 番目の最適化はもう少し複雑であり、説明が必要でしょう。

以下のトポロジーについて考えます。

final KStream<String, String> sourceStream = builder.stream(inputTopic, Consumed.with(Serdes.String(), Serdes.String()));

// Because we are changing the key, a "needs repartitioning" flag is set.
// The repartitioning will be performed only when a subsequent operation truly needs it.
final KStream<String, String> mappedStream = sourceStream.selectKey((k, v) -> keyFunction.apply(v));
    
// triggers the first repartitioning grouping by the changed key 
mappedStream.groupByKey().windowedBy(...).aggregate(...)...;
   
// triggers the second repartitioning grouping by the changed key again
KStream<String, Long> countStream = mappedStream.groupByKey().count(...).toStream();
   
// triggers the third reparition for joining the mappedStream with the changed key
KStream<String, String> joinedStream = mappedStream.join(countStream.....);


config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "SomeStreamsApplication");
config.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"Some IP Address");

// tell Kafka Streams to optimize the topology
config.setProperty(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.OPTIMIZE);

// Since we've configured Streams to use optimizations, the topology is optimized during the build
// And because optimizations are enabled, the resulting topology will no longer need to perform
// three explicit repartitioning steps, but only one.
final Topology topology = builder.build(config);
final KafkaStreams streams = new KafkaStreams(topology, config);

streams.start();

KStream.selectKey() メソッドでキーを変更しているため、再パーティションを示すブール値のフラグが必要になる可能性があります。再パーティションは、Kafka Streams アプリケーションでキーを変更した後に自動的に行われるわけではありません。再パーティションが実行されるのは、変更された可能性のあるキーが操作で必要とされる場合だけです。

前の例では、元のストリームのキーが変更された可能性がある状態で、キーを必要とする 3 つの操作(2 回の KStream.groupByKey() と 1 回の KStream.join())を実行しているため、Kafka Streams によって 3 つの再パーティショントピックが作成されます。ここで注目に値するのは、KStream.mapKStream.transform、および KStream.flatMap を使用すると、物理的にキーを変更していなくても、常に "再パーティションが必要" フラグが設定されるという点です。このため、キーを変更する必要がないとわかっている場合は、KStream.xxxValues を代わりに使用してください。

ただし、3 つの再パーティショントピックにはすべて実質的に同じデータが格納されるため、3 つの再パーティショントピックのうち 2 つは冗長です。最適化を有効にすると、Kafka Streams は 2 回の KStream.groupByKey() 呼び出しと 1 回の KStream.join() 呼び出しから再パーティション化を削除し、KStream.selectKey() の直後に "1 つ" の再パーティショントピックを使用します。これにより、再パーティショントピックの負荷が 3 から 1 に軽減されます。

最後に、再パーティショントピックに関するトポロジーの変更の範囲により、最適化が有効な場合はローリングアップグレードを実行できません。アプリケーションのすべてのインスタンスを停止する必要があります。マージされた再パーティショントピックには少量のデータが未処理の状態で残っている場合があるため、Streams リセットツール を使用して以前のアプリケーションをクリーンアップし、新しいアプリケーションを最初から開始してデータを再処理することをお勧めします。そうしないと、少量のレコードが失われる可能性があります。

注釈

このウェブサイトには、Apache License v2 の条件に基づいて Apache Software Foundation で開発されたコンテンツが含まれています。