Kafka Streams DSL トポロジーの名前付け

Kafka Streams DSL を使用する場合は、プロセッサーに名前を付けることができます。PAPI には ProcessorsState Stores があり、それぞれに明示的に名前を付ける必要があります。

DSL レイヤーには演算子があります。1 つの DSL 演算子は、複数の ProcessorsState Stores、および必要に応じて repartition topics にコンパイルされます。ただし、Kafka Streams DSL では、これらの名前がすべて自動的に生成されます。生成されるプロセッサー名、ステートストア名(および changelog トピック名)、再パーティショントピック名の間には関連性があります。ステートストア、changelog トピック、および再パーティショントピックの名前は "ステートフル" ですが、プロセッサー名は "ステートレス" です。

このステートフルとステートレスな名前の区別は、トポロジーをアップデートするときに重要な意味を持ちます。内部での名前付けによって DSL でのトポロジーの作成は大幅に単純化されますが、これにはトレードオフが 2 つあります。最初のトレードオフは、わかりやすさの問題です。もう 1 つのトレードオフはもっと厳しいもので、DSL の演算子と、生成された ProcessorsState Stores、changelog トピック、再パーティショントピックとの関連性により、名前の変更が生じる場合があります。

わかりやすさの問題

わかりやすさのトレードオフは、トポロジーの記述を表示するときの問題です。Topology#desribe() メソッドを通じてトポロジーの文字列記述を表示すると、プロセッサーを確認することができます。ただし、ビジネス上の目的に関するコンテキストはわかりません。たとえば、次のような単純なトポロジーがあるとします。

KStream<String,String> stream = builder.stream("input");
stream.filter((k,v) -> !v.equals("invalid_txn"))
      .mapValues((v) -> v.substring(0,5))
      .to("output")

Topology#describe() を実行すると、以下の文字列が生成されます。

Topologies:
   Sub-topology: 0
    Source: KSTREAM-SOURCE-0000000000 (topics: [input])
      --> KSTREAM-FILTER-0000000001
    Processor: KSTREAM-FILTER-0000000001 (stores: [])
      --> KSTREAM-MAPVALUES-0000000002
      <-- KSTREAM-SOURCE-0000000000
    Processor: KSTREAM-MAPVALUES-0000000002 (stores: [])
      --> KSTREAM-SINK-0000000003
      <-- KSTREAM-FILTER-0000000001
    Sink: KSTREAM-SINK-0000000003 (topic: output)
      <-- KSTREAM-MAPVALUES-0000000002

このレポートから複数の演算子を確認できますが、より全体的なコンテキストはどうなっているのでしょうか。たとえば、KSTREAM-FILTER-0000000001 を見るとフィルター操作であることがわかります。つまり、指定された述語に一致しないレコードは破棄されます。しかし、その述語はどのようなものでしょうか。さらに、ソースノードとシンクノードのトピック名が表示されていますが、トピックに意味のある名前が付いていなかったらどうでしょうか。その場合、トピックの背後にあるビジネス上の目的は想像するしかありません。番号にも注目してください。ソースノードには 0000000000 というサフィックスが付いていて、トポロジーの最初のプロセッサーであることを示します。フィルターのサフィックスは 0000000001 で、トポロジーの 2 番目のプロセッサーであることを示します。Kafka Streams では、KStreamKTable のどちらにも、新しいパラメーター Named を受け取る新しいオーバーロードメソッドがあります。DSL の Named クラスを使用すると、トポロジー内のプロセッサーに意味のある名前を指定できます。

すべてのプロセッサーに名前を指定した場合、トポロジーは次のようになります。

KStream<String,String> stream =
builder.stream("input", Consumed.as("Customer_transactions_input_topic"));
stream.filter((k,v) -> !v.equals("invalid_txn"), Named.as("filter_out_invalid_txns"))
      .mapValues((v) -> v.substring(0,5), Named.as("Map_values_to_first_6_characters"))
      .to("output", Produced.as("Mapped_transactions_output_topic"));
Topologies:
  Sub-topology: 0
   Source: Customer_transactions_input_topic (topics: [input])
     --> filter_out_invalid_txns
   Processor: filter_out_invalid_txns (stores: [])
     --> Map_values_to_first_6_characters
     <-- Customer_transactions_input_topic
   Processor: Map_values_to_first_6_characters (stores: [])
     --> Mapped_transactions_output_topic
     <-- filter_out_invalid_txns
   Sink: Mapped_transactions_output_topic (topic: output)
     <-- Map_values_to_first_6_characters

このトポロジーの記述を確認すると、トポロジー内での各プロセッサーの役割がよくわかります。プロセッサーノードに名前を付ける理由は他にもあります。Kafka Streams アプリケーション、ステートストア、changelog トピック、再パーティショントピックの再起動をまたいで残るステートフルな演算子がある場合、生成された名前をプロセッサーノードで使用していると、名前が変わる可能性があるためです。

名前の変更

生成される名前には、トポロジーのどこに組み込まれているかを表す番号が付けられます。名前の生成方法は KSTREAM|KTABLE->operator name<->number suffix< というパターンに従います。番号はグローバルに 1 ずつ増加する値で、トポロジー内での演算子の順序を表します。生成された番号は、常に 10 文字の文字列になるように先頭が "0" で埋められます。したがって、操作を追加または削除したり、操作の順序を入れ替えたりすると、プロセッサーの位置が変わり、結果としてプロセッサーの名前も変わります。 ほとんど のプロセッサーはメモリー内にのみ存在するため、多くのトポロジーでは、この名前の変更が問題になることはありません。ただし、ステートフルな演算子や再パーティショントピックを持つトポロジーは、この名前の変更の影響を受けます。例として、ステートを使用する別のトポロジーを以下に示します。

KStream<String,String> stream = builder.stream("input");
stream.groupByKey()
      .count()
      .toStream()
      .to("output");

このトポロジーの記述は次のようになります。

Topologies:
  Sub-topology: 0
   Source: KSTREAM-SOURCE-0000000000 (topics: [input])
    --> KSTREAM-AGGREGATE-0000000002
   Processor: KSTREAM-AGGREGATE-0000000002 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000001])
    --> KTABLE-TOSTREAM-0000000003
    <-- KSTREAM-SOURCE-0000000000
   Processor: KTABLE-TOSTREAM-0000000003 (stores: [])
    --> KSTREAM-SINK-0000000004
    <-- KSTREAM-AGGREGATE-0000000002
   Sink: KSTREAM-SINK-0000000004 (topic: output)
    <-- KTABLE-TOSTREAM-0000000003

上記のトポロジーの記述から、ステートストアの名前は KSTREAM-AGGREGATE-STATE-STORE-0000000002 であることがわかります。ここで、集約によってレコードの一部を抽出するフィルターを追加した場合にどうなるかを見てみましょう。

KStream<String,String> stream = builder.stream("input");
stream.filter((k,v)-> v !=null && v.length() >= 6 )
      .groupByKey()
      .count()
      .toStream()
      .to("output");

対応するトポロジーは次のようになります。

Topologies:
  Sub-topology: 0
   Source: KSTREAM-SOURCE-0000000000 (topics: [input])
    --> KSTREAM-FILTER-0000000001
   Processor: KSTREAM-FILTER-0000000001 (stores: [])
     --> KSTREAM-AGGREGATE-0000000003
     <-- KSTREAM-SOURCE-0000000000
   Processor: KSTREAM-AGGREGATE-0000000003 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000002])
     --> KTABLE-TOSTREAM-0000000004
     <-- KSTREAM-FILTER-0000000001
   Processor: KTABLE-TOSTREAM-0000000004 (stores: [])
     --> KSTREAM-SINK-0000000005
     <-- KSTREAM-AGGREGATE-0000000003
    Sink: KSTREAM-SINK-0000000005 (topic: output)
     <-- KTABLE-TOSTREAM-0000000004

count 操作の "前" に別の操作を追加したため、ステートストア(および changelog トピック)の名前が変わったことに注意してください。この名前の変更は、トポロジーをアップデートしたときにローリング再デプロイを実行できないことを意味します。さらに、 Streams リセットツール を使用して集約を再計算する必要もあります。これは、changelog トピックが起動時に変更され、新しい changelog トピックには何もデータが含まれていないためです。さいわい、この状況は簡単な方法で修復できます。つまり、名前を生成するのではなく、ユーザー定義の名前をステートストアに付けることで、トポロジーの変更に伴うステートストアの名前の変更を気にする必要がなくなります。JoinedStreamJoinedGrouped の各クラスでは再パーティショントピックの名前を指定でき、Materialized ではステートストアと changelog トピックの名前を指定できます。繰り返しになりますが、これらの DSL トポロジー操作に名前を付けることには重要な意味があります。ステートストアに特定の名前を指定する場合、DSL コードは次のようになります。

KStream<String,String> stream = builder.stream("input");
stream.filter((k, v) -> v != null && v.length() >= 6)
      .groupByKey()
      .count(Materialized.as("Purchase_count_store"))
      .toStream()
      .to("output");

トポロジーは次のようになります。

Topologies:
 Sub-topology: 0
  Source: KSTREAM-SOURCE-0000000000 (topics: [input])
    --> KSTREAM-FILTER-0000000001
  Processor: KSTREAM-FILTER-0000000001 (stores: [])
    --> KSTREAM-AGGREGATE-0000000002
    <-- KSTREAM-SOURCE-0000000000
  Processor: KSTREAM-AGGREGATE-0000000002 (stores: [Purchase_count_store])
    --> KTABLE-TOSTREAM-0000000003
    <-- KSTREAM-FILTER-0000000001
  Processor: KTABLE-TOSTREAM-0000000003 (stores: [])
    --> KSTREAM-SINK-0000000004
    <-- KSTREAM-AGGREGATE-0000000002
  Sink: KSTREAM-SINK-0000000004 (topic: output)
    <-- KTABLE-TOSTREAM-0000000003

ステートストアの前にプロセッサーを追加しても、ストア名とその changelog トピック名は変わりません。これにより、プロセッサーの追加と削除に伴う変更に対して、トポロジーの堅牢性と回復性が高まります。

結論

DSL の使用時には、処理ノードに名前を付けることをお勧めします。再パーティショントピックやステートストア(および付随する changelog トピック)などの "ステートフル" なプロセッサーがアプリケーションにある場合は、これがいっそう重要になります。以下に、DSL トポロジーに名前を付けるときに覚えておくべき 2 つのポイントを示します。

  1. "既存のトポロジー" があり、ステートストア(および changelog トピック)に名前を付けていない場合は、ベストプラクティスとして名前を付けることをお勧めします。ただし、これはトポロジーの重大な変更になるため、すべてのアプリケーションインスタンスをシャットダウンし、変更を行って、 Streams リセットツール を実行する必要があります。最初は不便を強いられるかもしれませんが、トポロジーの変更による予期しないエラーからアプリケーションを保護するためには労力を費やす価値があります。
  2. "新しいトポロジー" の場合は、トポロジーの永続的な部分であるステートストア(changelog トピック)と再パーティショントピックに名前を付けるようにします。これにより、デプロイされた Kafka Streams アプリケーションを、名前がなければ重大な影響を生じるようなトポロジーの変更から保護できます。ステートレスなプロセッサーについては、最初から名前を付けたくない場合はそのままでもかまいません。いつでも戻って後から名前を付けることができます。

Kafka Streams アプリケーションの重要な部分の名前付けについて、以下にクイックリファレンスを示します。この方法で、トポロジーの名前の変更によるアプリケーションへの重大な影響を防ぐことができます。

操作 名前付けクラス
集約の再パーティショントピック Grouped
KStream-KStream 結合の再パーティショントピック StreamJoined
KStream-KTable 結合の再パーティショントピック Joined
KStream-KStream 結合のステートストア StreamJoined
ステートストア(集約および KTable-KTable 結合用) Materialized
ストリーム/テーブルの非ステートフル操作 Named

注釈

このウェブサイトには、Apache License v2 の条件に基づいて Apache Software Foundation で開発されたコンテンツが含まれています。