Streams の FAQ

注意

API、操作、ドキュメントに関するフィードバックや、エンドユーザーのエクスペリエンスを改善するためのご意見をお待ちしております。お気軽に users@kafka.apache.org まで電子メールでお寄せください。

全般

Kafka Streams は Kafka とは別のプロジェクトですか?

いいえ、そうではありません。Kafka Streams API、または単に Kafka Streams は、Apache Kafka® オープンソースプロジェクトのコンポーネントであり、Kafka 0.10 以降のリリースに含まれています。ソースコードは https://github.com/apache/kafka/tree/trunk/streams から入手できます。

Kafka Streams は Confluent の独占的なライブラリですか?

いいえ、そうではありません。Kafka Streams API、または単に Kafka Streams は、Kafka オープンソースプロジェクトのコンポーネントであり、Kafka 0.10 以降のリリースに含まれています。ソースコードは https://github.com/apache/kafka/tree/trunk/streams から入手できます。

Kafka Streams アプリケーションは Kafka ブローカーの内部で動作するのですか?

いいえ、Kafka ブローカーの内部で動作するのではありません。"Kafka Streams アプリケーション" は、Kafka Streams ライブラリを使用する通常の Java アプリケーションです。これらのアプリケーションは、Kafka クラスターの境界にあるクライアントマシンで実行します。つまり、Kafka Streams アプリケーションは Kafka ブローカー(サーバー)や Kafka クラスターの内部で動作するものではなく、クライアント側アプリケーションです。

Kafka Streams アプリケーションで大量のメモリーが使用される理由は?

Kafka Streams アプリケーションで永続ステートストアを使用している場合は、使い終わった Iterator をすべて終了して、リソースを回収する必要があります。Iterator を終了しないと、メモリー不足の問題につながる可能性があります。

Kafka Streams のシステム依存関係は?

Kafka Streams には、Kafka の他にシステムに関する外部依存関係はありません。

以前の Kafka Streams アプリケーションを最新の Confluent Platform バージョンに移行するには?

アップグレードガイド」で手順を説明しています。

Kafka Streams でサポートされる Kafka クラスターのバージョンは?

次のバージョンがサポートされます。

 
Kafka ブローカー(列)
Streams API(行)
3.0.x / 0.10.0.x
3.1.x / 0.10.1.x および
3.2.x / 0.10.2.x
3.3.x / 0.11.0.x および
4.0.x / 1.0.x および
4.1.x / 1.1.x および
5.0.x / 2.0.x および
5.1.x / 2.1.x および
5.2.x / 2.2.x および
5.3.x / 2.3.x および
5.4.x / 2.4.x および
5.5.x / 2.5.x および
6.0.x / 2.6.x および
7.0.x / 3.0.x および
7.1.x / 3.1.x
3.0.x / 0.10.0.x
互換性あり
互換性あり
互換性あり
3.1.x / 0.10.1.x および
3.2.x / 0.10.2.x
 
互換性あり
互換性あり
3.3.x / 0.11.0.x
 
"厳密に 1 回" がオフの場合に
互換性あり(ブローカーバージョン
Confluent Platform 3.3.x 以降が必要)
互換性あり
4.0.x / 1.0.x および
4.1.x / 1.1.x および
5.0.x / 2.0.x および
5.1.x / 2.1.x および
5.2.0 / 2.2.0 および
5.2.1 / 2.2.0
 
"厳密に 1 回" がオフの場合に
互換性あり(ブローカーバージョン
Confluent Platform 3.3.x 以降が必要)。
メッセージフォーマット 0.10 以降が必要。
メッセージヘッダーはサポート外
(ブローカーバージョン Confluent
Platform 3.3.x 以降およびメッセージ
フォーマット 0.11 以降が必要)
互換性あり。
メッセージフォーマットは
0.10 以降であることが必要。
メッセージヘッダーが
使用される場合、メッセージフォーマット
0.11 以降が必要
5.2.2 / 2.2.1 および
5.3.x / 2.3.x および
5.4.x / 2.4.x および
5.5.x / 2.5.x および
6.0.x / 2.6.x および
7.0.x / 3.0.x および
7.1.x / 3.1.x
   
互換性あり。
メッセージフォーマットは
0.11 以降が必要。
"厳密に 1 回" の v2 には
Confluent Platform
5.4.x 以降が必要
5.4.x 以降が必要

Streams API は、以前のバージョンの Kafka (0.7、0.8、0.9)を実行している Kafka クラスターとは互換性がありません。

サポートされるプログラミング言語は?

Kafka Streams API は Java で実装されています。「開発者ガイド」には、Java 7 および Java 8+ で記述されたアプリケーションの例が用意されています。また、Kafka Streams には、Java を基盤とする Scala ラッパー が付属しています。Kotlin や Clojure など、他の JVM ベースの言語でアプリケーションを作成することもできますが、これらの言語に対するネイティブサポートはありません。

アプリケーションでデータが最初から再処理される理由は?

Kafka では、コンシューマーのオフセットを特別なトピックに保存することでアプリケーションを記憶します。オフセットとは、Kafka ブローカーによってメッセージに割り当てられる番号で、メッセージがブローカーに到達した順序を示します。アプリケーションで最後にコミットされたオフセットが記憶されていれば、そのアプリケーションでは、新しく到達したメッセージだけを処理することができます。

Kafka がオフセットを特別なトピックに記憶する時間の長さは、構成設定 offsets.retention.minutes によって制御できます。デフォルト値は 10,080 分(7 日)です。ただし、以前のバージョンのデフォルト値はわずか 1,440 分(24 時間)でした。

アプリケーションがしばらく停止すると(Kafka クラスターに接続していない状態が続くと)、その間にブローカーによってオフセットが削除され、アプリケーションの再開時にデータの再処理が必要になることがあります。起動時の実際の動作は auto.offest.reset 構成によって異なります。これは "earliest"、"latest"、または "none" に設定できます。

この問題を回避するには、offsets.retention.minutes を適度に大きな値にすることをお勧めします。

拡張性

アプリケーションの並列処理の上限は? 実行できるアプリインスタンスの最大数は?

アプリケーションで実行できる最大限の並列処理は、アプリケーションが 処理トポロジー で読み取っている入力トピックのパーティションの最大数によって決まります。入力パーティションの数により、Kafka Streams が作成するアプリケーションの ストリームタスク の数が決まります。このストリームタスクの数が、アプリケーションの並列処理の上限となります。

例を考えてみましょう。アプリケーションで読み取っている入力トピックに、5 つのパーティションがあるとします。この場合、いくつのアプリインスタンスを実行することができるでしょうか。

簡潔に答えると、このアプリケーションの並列処理の最大数は 5 になるため、アプリケーションのインスタンスを 5 つまで実行できます。5 つを超えるアプリインスタンスを実行した場合、"超過分" のアプリインスタンスも問題なく起動しますが、それらはアイドル状態のままになります。処理中のインスタンスのいずれかが停止すると、アイドル状態のインスタンスの 1 つがその処理を引き継いで再開します。

この例についてより詳しく回答すると、次のようになります。

  • 5 つのストリームタスク が作成され、それぞれが 1 つの入力パーティションを処理します。実際には、アプリケーションの並列処理の上限はストリームタスクの数によって決まります。入力パーティションの数は、ストリームタスクの数を算出するための主要なパラメーターというだけです。

これで、アプリケーションの並列処理に関する理論上の上限がわかりました。では、実際にアプリケーションで最大限の並列処理を実行するにはどうすればよいでしょうか。そのためには、5 つのストリームタスクのすべてを並列に実行する必要があります。つまり、それぞれ 1 つのタスクを実行する 5 つの 処理スレッド が必要です。これを実現するには、いくつかの方法があります。

  • オプション 1: 水平スケーリング(スケールアウト): アプリケーションの 5 つのシングルスレッドインスタンスを実行し、それぞれで 1 つのスレッドまたはタスクを実行します(num.stream.threads1 に設定します。「オプションの構成パラメーター」を参照してください)。このオプションは、たとえば、多数の性能の低いマシンでアプリのインスタンスを実行する場合に使用します。
  • オプション 2: 垂直にスケーリング(スケールアップ): アプリケーションのマルチスレッドインスタンスを 1 つ実行し、そこですべてのスレッドまたはタスクを実行します(num.stream.threads5 に設定します)。このオプションは、たとえば、きわめて性能の高いマシンが 1 台あり、そこでアプリのインスタンスを実行する場合に適しています。
  • オプション 3: オプション 1 と 2 の組み合わせ : アプリケーションの複数のインスタンスを実行し、それぞれで複数のスレッドを実行します。このオプションは、たとえば、アプリケーションを大規模に実行する場合に使用します。マシンごとに複数のアプリインスタンスを実行することもできます。

処理

Streams アプリケーションの処理結果がクリーンアップされないように保持するには?

他の Kafka のトピックと同様に、シンクトピックのブローカー側で log.retention.mslog.retention.minuteslog.retention.hours の各構成を設定すると、これらのトピックに書き込まれた処理結果の保持時間を指定できます。ブローカーは、レコードに関連付けられたタイムスタンプと現在のシステム時刻を比較して、古いデータをクリーンアップするかどうかを決定します。ウィンドウまたはセッションステートについては、コードで Materialized#withRetention() を使用して retention policy を設定することもできます。Streams ライブラリは、ブローカーと同じように、格納されたレコードのタイムスタンプと現在のシステム時刻を比較して、設定されたポリシーを守ります(キーと値のステートストアでは、更新が永続的に保持されるため、保持ポリシーはありません)。

Kafka Streams アプリケーションのデフォルトでは、結果レコードのタイムスタンプは元のソーストピックから変更されません。つまり、過去の時刻のイベントレコードを処理し(累積した古いデータをブートストラップフェーズで処理する場合など)、その結果として 1 つ以上のレコードやステートの更新が発生した場合、結果レコードとステートの更新は、関連付けられたタイムスタンプで示される過去の時刻と同じ時刻のものとして反映されます。このタイムスタンプが、現在のシステム時刻と比較して保持のしきい値よりも古い場合、結果は Kafka のトピックまたはステートストアに書き込まれた直後にクリーンアップされることになります。

バージョン 5.1.x 以降では、必要に応じて、Streams アプリケーションコードで結果レコードのタイムスタンプを変更できます(詳細は 5.1 のアップグレードガイド を参照)。ただし、その意味するところに注意が必要です。ある時刻のイベントを処理することは、実際には別の時刻の結果を得ることになります。

トピック、パーティション、オフセット情報などのレコードメタデータにアクセスするには?

レコードメタデータには Processor API を通じてアクセスできます。また、DSLProcessor API 統合 により、DSL から間接的にアクセスすることもできます。

Processor API では、ProcessorContext を使用してレコードメタデータにアクセスできます。Processor#init() で、コンテキストへの参照をプロセッサーのインスタンスフィールドに格納します。その後、たとえば、Processor#process() でプロセッサーコンテキストを照会できます(Transformer でも同様です)。コンテキストは現在処理中のレコードに合わせて自動的に更新されるため、ProcessorContext#partition() などのメソッドからは、常に現在のレコードのメタデータが返されます。スケジュールされた punctuate() 関数からプロセッサーコンテキストを呼び出す場合も、同じ注意事項が適用されます。詳細については Javadoc を参照してください。

DSL をカスタムの Transformer と組み合わせて使用すると、たとえば、入力レコードの値を変換してパーティションとオフセットのメタデータを含め、その情報を mapfilter などの後続の DSL 操作で利用できます。

DSL の mappeekforeach の違いは?

mappeekforeach の 3 つのメソッドは互いによく似ています。ある意味では、peekforeachmap のバリエーションと言えます。3 つのすべてを明示的にサポートする大きな理由は、それによって、開発者が操作の "意図" を明確に伝えられるようになるからです。

  • map : map を使用する場合、開発者の意図は、ストリームを 変更 し、(変更された)入力の 処理を続行 することにあります。つまり、変更されたデータに対して追加の処理を行えるように、変更されたデータを含む出力ストリームを作成します。
  • foreach : foreach を使用する場合、開発者の意図は、(変更されていない)入力ストリームに基づいて 副作用を生成 した後、処理を終了 することにあります。この理由から foreach は戻り値が void であり、終端操作と呼ばれます。
  • peek : peek を使用する場合、開発者の意図は、(変更されていない)入力データに基づいて 副作用を生成 した後、(変更されていない)入力データの 処理を続行 することにあります。

Java 8 ドキュメントの java.util.Stream も参照してください。このクラスでも map、peek、foreach の各操作がサポートされています。

不要とわかっている再パーティション化を回避するには?

Kafka Streams では、集約や結合といったキーに基づく操作の前に、selectKey()mapflatMap() のようなキーを変更する操作が実行された場合、再パーティション化のステップが挿入されます。

一般には mapValues()flatMapValues() を使用することをお勧めします。これらはキーを変更しないため、再パーティション化のステップを省略できます。

値の変更時には、キーへの読み取り専用アクセスが必要になることがよくあります。この場合は、map() 演算子を使用する代わりに、mapValues()ValueMapperWithKey とともに呼び出すことができます。XxxWithKey 拡張機能は、他のいくつかの演算子にも用意されています。

特殊なケースとして、パーティションが維持される形でキーを変更しようとしているために、実際には再パーティション化の必要がない場合があります。このようなケースでは状況によって、集約の後にキーの変更操作を適用することで、再パーティション化のステップを回避できる可能性があります。この方法は、元のキーと変更後のキーが同じ "データのグループ" に属する場合に適用できます。

最後に、いつでも Processor API にフォールバックし、process()transform()、または transformValues() を使用してカスタムの集約を実行することができます。これらの操作では、直前の操作でキーが変更されたとしても、自動的に再パーティション化がトリガーされることはありません。

Serde の config メソッド

カスタムの Serde を実装し、その Serde を構成プロパティに指定した場合、このクラスに org.apache.kafka.common.Configurable が実装されていると、Serde#configure(...) が自動的に呼び出されます。コードでは、この呼び出しを対応するシリアライザーと逆シリアライザーに転送する必要があります。ライブラリで提供される serde には、このパターンが既に実装されています。

手動で new を呼び出して Serde を作成し、その Serde をメソッド呼び出しに渡す場合、ライブラリとカスタム Serde の configure(...) を手動で呼び出す必要があります。

ちなみに

この手順を忘れないように、手動での configure() の呼び出しは Serde オブジェクトの作成直後に行ってください。

RocksDB を別のストアに置き換えるには?

RocksDB を置き換えるグローバル設定はありません。ただし、オペレーターごとに個別にカスタムストアを設定することは可能です。たとえば、aggregate() (および 内部ストアを持つその他の操作)には Materialized を受け取るオーバーロードがあり、これを通じて KeyValueBytesStoreSupplier を指定できます。これらのオーバーロードを使用すると、別のストア を返すサプライヤーを提供して、対応する操作の RocksDB を置き換えることができます。

注 : すべての操作の RocksDB を置き換えるには、別の XxxByteStoreSupplier を操作ごとに提供する必要があります。

RocksDB ベースのステートストアとは別に、Kafka Streams API はインメモリーストアも備えています。このインメモリーストアは changelog トピックに基づき、完全に フォールトトレラント です。インメモリーサプライヤーは次のように取得できます。

KeyValueBytesStoreSupplier inMemoryStoreSupplier = Stores.inMemoryKeyValueStore("myStoreName-mustBeUniqueForEachOperator");
Materialized materialized = Materialized.as(inMemoryStoreSupplier);

インメモリーストアの フォールトトレランス を無効にすることもできます。そのためには、aggregate() (またはその他の任意の操作)に渡す前に、Materialized#disableLogging() の呼び出しを追加します。

他のサードパーティのストアを利用する には、StoreSupplier および StateStore インターフェイスを実装する必要があります。StateStore インターフェイスは、サードパーティのストアを実際に Kafka Streams に統合するコードです。StoreSupplier インターフェイスは、get() が呼び出されるたびに、実装された独自のストアの新しいインスタンスを返します。

注: サードパーティのストアを統合する場合、フォールトトレランスへの対応はすべて開発者の責任になります。changelog トピックを通じたストアのバックアップは提供されないため、必要に応じて独自に実装する必要があります。

エラーと例外処理

破損レコードや逆シリアル化のエラー("ポイズンピルレコード")を処理するには?

場合によっては、Kafka Streams アプリケーションからの受信レコードの一部が破損していることや、シリアライザーと逆シリアライザーが誤りやバグを含んでいたり、すべての種類のレコードを処理できなかったりすることがあります。このような種類のレコードは "ポイズンピル" と呼ばれます。

org.apache.kafka.streams.errors.DeserializationExceptionHandler インターフェイスを使用すると、このようなポイズンピルレコードの処理方法をカスタマイズできます。ライブラリには、一般的な処理パターンを表すいくつかの実装も含まれています。

オプション 1: エラーを記録してアプリケーションをシャットダウンする

LogAndFailExceptionHandlerDeserializationExceptionHandler の実装であり、Kafka Streams のデフォルト設定です。これはすべての逆シリアル化例外を処理します。例外が発生すると、そのエラーを記録し、致命的エラーをスローして Streams アプリケーションを停止します。アプリケーションが LogAndFailExceptionHandler を使用するように構成されている場合、アプリケーションのインスタンスはフェイルファストとなり、破損レコードを検出すると終了します。

Properties streamsSettings = new Properties();
streamsSettings.put(
  StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
  LogAndFailExceptionHandler.class.getName()
);

オプション 2: 破損レコードをスキップしてエラーを記録する

LogAndContinueExceptionHandler は、もう 1 つの DeserializationExceptionHandler の実装です。破損レコードを検出すると、アプリケーションはエラーメッセージを記録し、そのレコードの処理をスキップして、次のレコードの処理を続行します。スキップしたレコードの割合は、skippedDueToDeserializationError という名前のプロセッサーノードレベルのメトリックに記録されます。モニタリングとアラートに使用できるすべてのメトリクスのリストについては、ドキュメント を参照してください。

Properties streamsSettings = new Properties();
streamsSettings.put(
  StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
  LogAndContinueExceptionHandler.class.getName()
);

オプション 3: 破損レコードを隔離する(配信不能キュー)

独自の DeserializationExceptionHandler の実装を提供できます。たとえば、破損レコードを隔離トピック("配信不能キュー")に転送し、処理を続行する方法があります。これを行うには、 Producer API を使用して、破損レコードを隔離トピックに直接書き込みます。このアプローチの欠点として、"手動" での書き込みは Kafka Streams ランタイムライブラリに認識されない副作用であるため、Streams API の利点であるエンドツーエンドの処理の保証は適用されません。

以下にコード例を示します。

public class SendToDeadLetterQueueExceptionHandler implements DeserializationExceptionHandler {
    KafkaProducer<byte[], byte[]> dlqProducer;
    String dlqTopic;

    @Override
    public DeserializationHandlerResponse handle(final ProcessorContext context,
                                                 final ConsumerRecord<byte[], byte[]> record,
                                                 final Exception exception) {

        log.warn("Exception caught during Deserialization, sending to the dead queue topic; " +
                  "taskId: {}, topic: {}, partition: {}, offset: {}",
                  context.taskId(), record.topic(), record.partition(), record.offset(),
                  exception);

        dlqProducer.send(new ProducerRecord<>(dlqTopic, null, record.timestamp(), record.key(), record.value()));

        return DeserializationHandlerResponse.CONTINUE;
    }

    @Override
    public void configure(final Map<String, ?> configs) {
        dlqProducer = .. // get a producer from the configs map
        dlqTopic = .. // get the topic name from the configs map
    }
}

Properties streamsSettings = new Properties();
streamsSettings.put(
  StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
  SendToDeadLetterQueueExceptionHandler.class.getName()
);

オプション 4: 破損レコードをセンチネル値として解釈する

DeserializationExceptionHandler の設定を利用するのではなく、 カスタムの serde を実装して、内部で破損レコードを処理することもできます。たとえば、org.apache.kafka.common.serialization.Deserializer を実装して、例外をスローする代わりに、開発者定義のセンチネルレコード(null など)として扱われる "特殊用途" のレコードを返すことができます。そうすることで、逆シリアル化が失敗した場合に、このようなセンチネルレコードをアプリケーションのダウンストリームプロセッサーで認識できるようになります。

以下にコード例を示します。

public class ExceptionHandlingDeserializer implements Deserializer<MyObject> {

    public void configure(Map<String, ?> configs, boolean isKey) {
        // nothing to do
    }

    public MyObject deserialize(String topic, byte[] data) {
        try {
            // Attempt deserialization
            MyObject deserializedValue = deserialize(topic, data)

            // Ok, the record is valid (not corrupted).
            return deserializedValue;
        } catch (SerializationException e) {
            log.warn("Exception caught during Deserialization: {}", e.getMessage());

            // return the sentinel record upon corrupted data
            return null;
        }
    }

    public void close() {
        // nothing to do
    }
}

重要

null を返す場合は注意が必要: null のレコード値は、テーブルにおいて特別な意味を持ちます。これらは、テーブルからレコードキーを削除するためのトゥームストーンとして解釈されます。

カスタムの serde を実装したら、それをキーや値のデフォルトの serde として使用するように アプリケーションを構成 したり、KStream#to() などの API メソッドを呼び出すときに明示的に使用したり、その両方の使い方を組み合わせたりできます。

破損レコードを隔離トピックまたは配信不能キューに送信するには?

破損レコードや逆シリアル化のエラー("ポイズンピルレコード")を処理するには?」で説明されている「オプション 3: 破損レコードを隔離する(配信不能キュー)」を参照してください。

対話型クエリ

InvalidStateStoreException: "the state store may have migrated to another instance" を処理するには?

ローカルステートストアにアクセスしようとすると、次のようなエラーが発生することがあります。

org.apache.kafka.streams.errors.InvalidStateStoreException: the state store, my-key-value-store, may have migrated to another instance.
    at org.apache.kafka.streams.state.internals.StreamThreadStateStoreProvider.stores(StreamThreadStateStoreProvider.java:49)
    at org.apache.kafka.streams.state.internals.QueryableStoreProvider.getStore(QueryableStoreProvider.java:55)
    at org.apache.kafka.streams.KafkaStreams.store(KafkaStreams.java:699)

これには通常、次の 2 つの原因が考えられます。

  • ローカルの KafkaStreams インスタンスの準備ができていない(ランタイムステートが RUNNING になっていない。「ランタイムステータス情報」を参照)ため、ローカルステートストアに対してまだクエリを実行することができない。
  • ローカルの KafkaStreams インスタンスの準備はできている(ランタイムステートは RUNNING になっている)が、該当するステートストアが内部で別のインスタンスに移行されている。この状況は特に、分散アプリケーションの起動フェーズや、アプリケーションインスタンスを追加または削除するときに発生することがあります。

InvalidStateStoreException の発生を防ぐ方法はいくつかありますが、いずれの場合も、取り扱う情報は特定の時点でしか有効でないという点に注意してください。ステートストアは内部的にどのタイミングでも移行される可能性があるため、アプリケーションでは、ローカルステートストアへのアクセスはそのつど変わることを受け入れる必要があります。

最もシンプルなアプローチは、KafkaStreams#store() を呼び出すときに InvalidStateStoreException を防ぐことです。

// Example: Wait until the store of type T is queryable.  When it is, return a reference to the store.
public static <T> T waitUntilStoreIsQueryable(final String storeName,
                                              final QueryableStoreType<T> queryableStoreType,
                                              final KafkaStreams streams) throws InterruptedException {
  while (true) {
    try {
      return streams.store(storeName, queryableStoreType);
    } catch (InvalidStateStoreException ignored) {
      // store not yet ready for querying
      Thread.sleep(100);
    }
  }
}

エンドツーエンドの例は ValidateStateWithInteractiveQueriesLambdaIntegrationTest に用意されています。

セキュリティ

セキュアな Kafka クラスターに対してアプリケーションを実行した場合のエラー

セキュアな Kafka クラスターからのデータをアプリケーションで処理しようとすると、次のようなエラーメッセージが発生することがあります。

> Could not create internal topics: Could not create topic: <NAME OF TOPIC> due to Cluster authorization failed”

アプリケーションを実行しているプリンシパルの ACL に --cluster --operation Create が設定されていて、内部トピック を作成するアクセス許可がアプリケーションに与えられていることを確認してください。

トラブルシューティングとデバッグ

Java のスタックトレースをより簡単に解釈するには?

Kafka Streams アプリケーションを記述するときには、次のようにメソッド呼び出しを連結することがよくあります。

// Code all on one line.  Unfortunately, this is bad practice when it comes to stacktraces.
myStream.map(...).filter(...).groupByKey(...).count(...);

このようなコードでランタイムエラーがトリガーされると、JVM ではエラーの発生場所を示す行番号情報しか提供されないため("NullPointerException at line 123")、Java スタックトレースはそれほど役に立たない可能性があります。上記の例の場合、連結された map、filter、countByKey の操作の "どこか" でエラーが発生したことは推測できても、行内での正確な場所をスタックトレースから特定することはできません。

簡単な対応策として、メソッドチェーンを複数行に分割すると、スタックトレースによって返される行番号から実際のエラーの場所を特定しやすくなります。

// Split the same code across multiple lines to benefit from more actionable stacktraces.
myStream
  .map(...)
  .filter(...)
  .groupByKey(...)
  .count(...);

トポロジーを視覚化するには?

Kafka Streams アプリケーションのトポロジーは、トポロジーの TopologyDescription にアクセスすることで視覚化できます。TopologyDescription には、処理グラフに関する "静的" な情報が含まれています(つまり、ランタイム情報ではありません)。この情報は、すべてのノード、(グローバル)ストア、そしてノードやストアが互いにどのように接続されているかを表します。各ノードについて、ノード名や入出力トピック(ソースおよびシンクの場合)などの詳細情報を取得できます。さらに、ノードはサブトポロジー(トピックのみを通じて接続されたノードのグループ)でグループ化されます。

// for DSL
StreamsBuilder builder = new StreamsBuilder();
Topology topology = builder.build();
// for Processor API
Topology topology = new Topology();

TopologyDescription description = topology.describe();

// Get sub-topologies
Set<Subtopology> subtopologies = description.subtopologies();
// You can also get all nodes of a sub-topology (can be a `Source`, `Processor`, or `Sink`)
// All nodes have a name and a set of predecessors as well as successors
// Source and Sink also have input/output topic information
// Processor can have stores attached
// Each subtopology has a unique ID
Subtopology subtopology = ...
Set<Node> nodes = subtopology.nodes();
// access node informaton...

// You can also get information about global stores
// a GlobalStore has a source topic and an `Processor`
Set<GlobalStore> globalStores = description.globalStores();
// access global store informaton...

// Or simply print all information at once
System.out.println(description);

WordCountLambdaExampleTopologyDescription を出力すると次のようになります。

Sub-topologies:
  Sub-topology: 0
    Processor: KSTREAM-FILTER-0000000005(stores: []) --> KSTREAM-SINK-0000000004 <-- KSTREAM-KEY-SELECT-0000000002
    Processor: KSTREAM-KEY-SELECT-0000000002(stores: []) --> KSTREAM-FILTER-0000000005 <-- KSTREAM-FLATMAPVALUES-0000000001
    Processor: KSTREAM-FLATMAPVALUES-0000000001(stores: []) --> KSTREAM-KEY-SELECT-0000000002 <-- KSTREAM-SOURCE-0000000000
    Source: KSTREAM-SOURCE-0000000000(topics: inputTopic) --> KSTREAM-FLATMAPVALUES-0000000001
    Sink: KSTREAM-SINK-0000000004(topic: Counts-repartition) <-- KSTREAM-FILTER-0000000005
  Sub-topology: 1
    Source: KSTREAM-SOURCE-0000000006(topics: Counts-repartition) --> KSTREAM-AGGREGATE-0000000003
    Processor: KTABLE-TOSTREAM-0000000007(stores: []) --> KSTREAM-SINK-0000000008 <-- KSTREAM-AGGREGATE-0000000003
    Sink: KSTREAM-SINK-0000000008(topic: outputTopic) <-- KTABLE-TOSTREAM-0000000007
    Processor: KSTREAM-AGGREGATE-0000000003(stores: [Counts]) --> KTABLE-TOSTREAM-0000000007 <-- KSTREAM-SOURCE-0000000006
Global Stores:
  none

以前のリリース

Confluent Platform 4.0.x より前のリリースでは、KafkaStreams#toString() メソッドを呼び出すことで Kafka Streams アプリケーションの内部トポロジーを視覚化できます。

KafkaStreams streams = new KafkaStreams(topology, config);
// Start the Kafka Streams threads
streams.start();
// Print the internal topology to stdout
System.out.println(streams.toString());

この呼び出しから出力される WordCountLambdaExample のトポロジーの例を以下に示します。すべてのソース、シンク、それらを支えるトピック、およびすべての中間ノードとそのステートストアが収集され、出力されます。

KafkaStreams processID: 5fe8281e-d756-42ec-8a92-111e4af3d4ca
    StreamsThread appId: wordcount-lambda-integration-test
    StreamsThread clientId: wordcount-lambda-integration-test-5fe8281e-d756-42ec-8a92-111e4af3d4ca
    StreamsThread threadId: wordcount-lambda-integration-test-5fe8281e-d756-42ec-8a92-111e4af3d4ca-StreamThread-1
    Active tasks:
        Running:
            StreamsTask taskId: 0_0
                ProcessorTopology:
                    KSTREAM-SOURCE-0000000000:
                        topics:        [inputTopic]
                        children:    [KSTREAM-FLATMAPVALUES-0000000001]
                    KSTREAM-FLATMAPVALUES-0000000001:
                        children:    [KSTREAM-KEY-SELECT-0000000002]
                    KSTREAM-KEY-SELECT-0000000002:
                        children:    [KSTREAM-FILTER-0000000005]
                    KSTREAM-FILTER-0000000005:
                        children:    [KSTREAM-SINK-0000000004]
                    KSTREAM-SINK-0000000004:
                        topic:        wordcount-lambda-integration-test-Counts-repartition
                Partitions [inputTopic-0]
            StreamsTask taskId: 1_0
                ProcessorTopology:
                    KSTREAM-SOURCE-0000000006:
                        topics:        [wordcount-lambda-integration-test-Counts-repartition]
                        children:    [KSTREAM-AGGREGATE-0000000003]
                    KSTREAM-AGGREGATE-0000000003:
                        states:        [Counts]
                        children:    [KTABLE-TOSTREAM-0000000007]
                    KTABLE-TOSTREAM-0000000007:
                        children:    [KSTREAM-SINK-0000000008]
                    KSTREAM-SINK-0000000008:
                        topic:        outputTopic
                Partitions [wordcount-lambda-integration-test-Counts-repartition-0]

            Suspended:
            Restoring:
            New:
        Standby tasks:
            Running:
            Suspended:
            Restoring:
            New:

KafkaStreams#toString() では TopologyDescription よりも詳しい情報が返され、たとえば、実行中の各ストリームスレッドに現在割り当てられているタスクが含まれます。4.0.x 以降では、このようなランタイム情報は KafkaStreams#localThreadsMetadata() API によって取得できるため、KafkaStreams#toString() は非推奨となっています。

ストリームやテーブルを調べるには?

ストリームまたはテーブルのレコードを調べるには、KStream#print() メソッドを呼び出すことができます。KTable の場合は、KTable#toStream()KTable の changelog ストリームを取得すると、加えられた変更を調査できます。下記のように print() を使用すると、要素を STDOUT に出力できます。または、Printed.toFile("fileName") を使用するとファイルに書き込むことができます。

KStream#print(Printed.toSysOut()) を使用する例を以下に示します。

import java.util.concurrent.TimeUnit;
KStream<String, Long> left = ...;
KStream<String, Long> right = ...;

// Java 8+ example, using lambda expressions
KStream<String, String> joined = left
    .join(right,
      (leftValue, rightValue) -> leftValue + " --> " + rightValue, /* ValueJoiner */
      JoinWindows.of(Duration.ofMinutes(5)),
      Joined.with(
        Serdes.String(), /* key */
        Serdes.Long(),   /* left value */
        Serdes.Long()))   /* right value */
    .print(Printed.toSysOut());

出力は結合後のレコードになります。たとえば、同じキー K を持ち、値が V1V2 である 2 つのレコードを結合する場合、コンソールには K, V1 --> V2 が出力されます。いくつかのサンプルデータでの出力例は次のようになります。

alice, 5 --> 7
bob, 234 --> 19
charlie, 9 --> 10

KTable の内部ストアのコンテンツを調べる場合は、Kafka Streams の対話型クエリ を使用できます。

無効なタイムスタンプの例外

次のような例外が発生した場合は、複数の原因が考えられます。

Exception in thread "StreamThread-1" org.apache.kafka.streams.errors.StreamsException: Input record {...} has invalid (negative) timestamp. \
        Possibly because a pre-0.10 producer client was used to write this record to Kafka without embedding a timestamp, \
        or because the input topic was created before upgrading the Kafka cluster to 0.10+. \
        Use a different TimestampExtractor to process this data.
  at org.apache.kafka.streams.processor.FailOnInvalidTimestamp.onInvalidTimestamp(FailOnInvalidTimestamp.java:62)

このエラーは、Kafka Streams アプリケーションのタイムスタンプエクストラクターが、レコードから有効なタイムスタンプを抽出できなかったことを示します。これは通常、レコードの問題(レコードにタイムスタンプが含まれていないなど)を示唆していますが、アプリケーションで使用されているタイムスタンプエクストラクターの問題やバグを示している場合もあります。

レコードに有効なタイムスタンプが含まれない場合の原因は?

  • デフォルトの FailOnInvalidTimestamp タイムスタンプエクストラクターを使用している場合は、レコードにタイムスタンプが埋め込まれていないことが原因として考えられます(埋め込みのタイムスタンプは、Kafka 0.10 で Kafka のメッセージフォーマットに導入されました)。この状況は、たとえば、以前の Kafka プロデューサークライアント(バージョン 0.9 以前)またはサードパーティのプロデューサークライアントによって書き込まれたトピックを消費している場合に発生する可能性があります。もう 1 つの状況として、この現象は Kafka クラスターを 0.8 または 0.9 から 0.10 にアップグレードした後にも発生することがあります。この場合、0.8 または 0.9 で生成されたすべてのデータには、0.10 のメッセージのタイムスタンプが含まれません。UsePartitionTimeOnInvalidTimestampLogAndSkipOnInvalidTimestamp のような代替のタイムスタンプエクストラクターを使用すると、負のタイムスタンプをより穏便に処理できます。詳細については、「Kafka Streams 開発者ガイド」の タイムスタンプエクストラクター の項を参照してください。
  • カスタムのタイムスタンプエクストラクターを使用している場合は、エクストラクターで無効な(負の)タイムスタンプが適切に処理されることを確認してください。"適切" の定義はアプリケーションのセマンティクスによって異なります。たとえば、レコードから有効なタイムスタンプを直接抽出できない場合は、デフォルトのタイムスタンプや推定のタイムスタンプを返すことができます。
  • WallclockTimestampExtractor を使用して、イベント時刻の処理から処理時刻のセマンティクスに切り替えることもできます。この状況に対してこのようなフォールバックが適切な対応かどうかは、ユースケースによって異なります。

ただし、最初のステップとして、問題のあるレコードが Kafka に書き込まれた根本的な原因を特定して修正することが何よりもまず必要です。問題のあるレコードを取り扱うときに上記のような回避策の適用を検討するのは、修正を試みた後の 2 番目のステップです。別の方法として、正しいタイムスタンプを持つレコードを再生成し、新しい Kafka トピックに書き込むこともできます。

タイムスタンプエクストラクターが問題を引き起こす場合

この状況では、エラーのあるエクストラクターをデバッグして修正する必要があります。Kafka の組み込みのエクストラクターの場合は、Kafka 開発者メーリングリスト(dev@kafka.apache.org)にバグを報告してください(手順については https://kafka.apache.org/contact を参照してください)。解決するまでの間、問題を修正するカスタムのタイムスタンプエクストラクターを作成し、当面はそのエクストラクターを使用するようにアプリケーションを構成できます。

レコードのメタデータにアクセスしようとすると IllegalStateException が発生する理由は?

新しい Processor/Transformer/ValueTransformer を、それぞれに対応するサプライヤーを使用してトポロジーにアタッチする場合、サプライヤーは get() が呼び出されるたびに "新しい" インスタンスを返す必要があります。同じオブジェクトを返すと、単一の Processor/Transformer/ValueTransformer が複数のタスクで共有され、IllegalStateException とエラーメッセージ "This should not happen as topic() should only be called while a record is processed" が発生する結果となります(呼び出したメソッドに応じて、topic() の部分は partition()offset()、または timestamp() になることがあります)。

punctuate() が呼び出されない理由は?

punctuate() 関数を "イベント時刻" (つまり PunctuationType.STREAM_TIME)に基づいてスケジュールすると、この関数は、"ウォールクロック時間" に基づいてトリガーされるのではなく、純粋にデータドリブンになります(つまり、内部で追跡されるイベント時刻によって駆動されます)。イベント時刻は、TimestampExtractor によって抽出されたレコードのタイムスタンプから導かれます。

たとえば、PunctuationType.STREAM_TIME に基づいて 10 秒ごとに punctuate() 関数をスケジュールしたとします。1 秒(最初のレコード)から 60 秒(最後のレコード)までの連続したタイムスタンプを持つ 60 個のレコードのストリームを処理する場合、punctuate() は 6 回呼び出されます。これらのレコードを実際に処理するために必要な時間とは関係なく、60 個のレコードの処理にかかる時間が 1 秒でも、1 分でも、1 時間でも、punctuate() は 6 回呼び出されます。

注意

イベント時刻が進行するのは、すべての入力トピックのすべての入力パーティションに新しいデータ(タイムスタンプが新しいもの)がある場合だけです。1 つ以上のパーティションで新しいデータが利用できない場合、イベント時刻は進行せず、punctuate() もトリガーされません。これはタイムスタンプエクストラクターの構成とは独立した動作です。つまり、WallclockTimestampExtractor を使用しても、実際の時刻に基づいて punctuate() がトリガーされるわけではありません。

Scala: コンパイルエラー "no type parameter"、"Java-defined trait is invariant in type T"

Scala ラッパー を使用せずに Scala を使用すると、次のような Scala コンパイルエラーが発生する場合があります。

Error:(156, 8) no type parameters for method leftJoin:
    (x$1: org.apache.kafka.streams.kstream.KTable[String,VT],
     x$2: org.apache.kafka.streams.kstream.ValueJoiner[_ >: Long, _ >: VT, _ <: VR])
    org.apache.kafka.streams.kstream.KStream[String,VR]
exist so that it can be applied to arguments
    (org.apache.kafka.streams.kstream.KTable[String,String],
     org.apache.kafka.streams.kstream.ValueJoiner[Long,String,(String, Long)] with Serializable)
 --- because ---
argument expression's type is not compatible with formal parameter type;
 found   : org.apache.kafka.streams.kstream.ValueJoiner[Long,String,(String, Long)] with Serializable
 required: org.apache.kafka.streams.kstream.ValueJoiner[_ >: Long, _ >: ?VT, _ <: ?VR]
Note: Long <: Any (and org.apache.kafka.streams.kstream.ValueJoiner[Long,String,(String, Long)]
      with Serializable <: org.apache.kafka.streams.kstream.ValueJoiner[Long,String,(String, Long)]),
      but Java-defined trait ValueJoiner is invariant in type V1.
You may wish to investigate a wildcard type such as `_ <: Any`. (SLS 3.2.10)
Note: String <: Any (and org.apache.kafka.streams.kstream.ValueJoiner[Long,String,(String, Long)]
      with Serializable <: org.apache.kafka.streams.kstream.ValueJoiner[Long,String,(String, Long)]),
      but Java-defined trait ValueJoiner is invariant in type V2.
You may wish to investigate a wildcard type such as `_ <: Any`. (SLS 3.2.10)

この問題の根本的な原因は、Scala と Java の相互運用性にあります。Kafka Streams API は Java で実装されていますが、アプリケーションは Scala で記述されています。特に、この問題は Java と Scala の型システムの相互作用によって発生します。たとえば、Java のジェネリックワイルドカードが、このような Scala の問題を引き起こすことがよくあります。

この問題を回避するには、 Scala ラッパー を使用することをお勧めします。それが不可能な場合、コードをコンパイルするためには、Scala アプリケーションで明示的に型を宣言する必要があります。たとえば、複数の DSL 操作を連結した単一のステートメントは複数のステートメントに分け、各ステートメントで明示的にそれぞれの戻り値の型を宣言することが必要になる場合があります。StreamToTableJoinScalaIntegrationTest には、戻り変数の型を明示的に宣言する方法の実例が示されています。

集約を実行せずに KStream を KTable に変換するには?

派生した KStream (Kafka トピックから読み取られたのではない KSTream)を KTable に変換するには、次の 3 つの方法があります。

オプション 2: KStream を Kafka に書き込み、KTable として再び読み取る

KStream を Kafka トピックに書き込み、KTable として再び読み取ることができます。一般的な推奨事項として、このトピックは事前に手動で作成して、パーティション数やトピック設定などを適切に構成することをお勧めします。

StreamsBuilder builder = new StreamsBuilder();
KStream<String, Long> stream = ...; // some computation that creates the derived KStream

// You should manually create the dummy topic before starting your application.
//
// Also, because you want to read the topic back as a KTable, you might want to enable
// log compaction for this topic to align the topic's cleanup policy with KTable semantics.
stream.to("dummy-topic", Produced.with(Serdes.String(), Serdes.Long()));
KTable<String, Long> table = builder.table("dummy-topic", Consumed.with(Serdes.String(), Serdes.Long()));

コードに関してはこのアプローチが最も簡単です。ただし、デメリットとして、(a)追加のトピックを管理する必要があり、(b) Kafka に対してデータの書き込みと再読み取りを行うため、追加のネットワークトラフィックが発生します。

オプション 3: ダミーの集約を実行する

オプション 2 の代わりに、ダミーの集約ステップを作成する方法があります。

StreamsBuilder builder = new StreamsBuilder();
KStream<String, Long> stream = ...; // some computation that creates the derived KStream

// Java 8+ example, using lambda expressions
KTable<String, Long> table = stream.groupByKey().reduce(
    (aggValue, newValue) -> newValue);

// Java 7 example
KTable<String, Long> table = stream.groupByKey().reduce(
    new Reducer<Long>() {
      @Override
      public Long apply(Long aggValue, Long newValue) {
        return newValue;
      }
    });

このアプローチでは、オプション 2 と比べてコードは複雑になりますが、(a)手動でトピックを管理する必要がなく、(b)|ak| からデータを再読み取りする必要もないというメリットがあります。

オプション 3 では、Kafka Streams によって内部 changelog トピックが作成され、KTable がフォールトトレランス用にバックアップされます。したがって、どちらのアプローチでも Kafka に追加のストレージが必要となり、ネットワークトラフィックも増加します。全体として、オプション 3 のやや複雑なコードを許容するか、オプション 2 の手動によるトピック管理を許容するかのトレードオフとなります。

1 コア環境での RocksDB の動作

RocksDB には、CPU コアが 1 つしかない環境で実行した場合に既知の問題があります。一部のシナリオで、アプリケーションのパフォーマンスが大幅に低下するか、応答しなくなる現象が発生することがあります。これを回避するには、RocksDB config setter を使用して、特定の RocksDB 構成を次のように設定します。

 public static class CustomRocksDBConfig implements RocksDBConfigSetter {
   @Override
   public void setConfig(final String storeName, final Options options, final Map<String, Object> configs) {
      // Workaround: We must ensure that the parallelism is set to >= 2.
      int compactionParallelism = Math.max(Runtime.getRuntime().availableProcessors(), 2);
      // Set number of compaction threads (but not flush threads).
      options.setIncreaseParallelism(compactionParallelism);
   }
 }

Properties streamsSettings = new Properties();
streamsConfig.put(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG, CustomRocksDBConfig.class);