Kafka Streams アプリケーションリセットツール

アプリケーションリセットツールを使用すると、アプリケーションをリセットし、強制的にそのデータを最初から再処理することができます。このツールは、開発とテストやバグの修正時に役立つことがあります。

アプリケーションリセットツールは、アプリケーションをリセットするとき、Kafka Streams の ユーザートピック (入力、出力、および中間トピック)と 内部トピック を別々の方法で処理します。

以下に、それぞれのトピックの種類に対するアプリケーションリセットツールの動作を示します。

  • 入力トピック: オフセットを指定の位置にリセットします。デフォルトでは、トピックの先頭にリセットされます。
  • 中間トピック : トピックの末尾までスキップします。つまり、すべてのパーティションについて、(コンシューマーグループ application.id に対する)アプリケーションのコミット済みのコンシューマーオフセットを、各パーティションの logSize に設定します。
  • 内部トピック: 内部トピックを削除します(これにより、コミットされたオフセットがすべて自動的に削除されます)。

アプリケーションリセットツールでは、以下の操作は行われません。

  • アプリケーションの出力トピックのリセット。いずれかの出力(または中間)トピックがダウンストリームアプリケーションで消費される場合、アップストリームアプリケーションのリセット時にダウンストリームアプリケーションを適切に調整するのは、開発者の責任となります。
  • アプリケーションインスタンスのローカル環境のリセット。アプリケーションインスタンスが実行されていたすべてのマシンでローカルステートを削除するのは、開発者の責任となります。これを行う方法については、「ステップ 2: アプリケーションインスタンスのローカル環境のリセット」セクションの手順を参照してください。
前提条件
  • アプリケーションのすべてのインスタンスが停止している必要があります。停止していないと、アプリケーションが無効なステートになったり、クラッシュしたり、不適切な結果を生成したりする可能性があります。bin/kafka-consumer-groups を使用すると、ID が application.id のコンシューマーグループがアクティブかどうかを確認できます。
  • ツールの使用時には十分な注意を払い、パラメーターをよく確認してください。パラメーターの値が間違っていたり(application.id の誤記など)、矛盾したパラメーターを指定したりすると(アプリケーションの入力トピックが正しくないなど)、ツールによってアプリケーションのステートが無効化されることがあり、他のアプリケーション、コンシューマーグループ、Apache Kafka® トピックに影響が生じる可能性もあります。
  • アプリケーションリセットツールを実行する前に、すべての中間トピックを手動で削除して再作成する必要があります。これにより、Kafka ブローカーのディスクの空き領域を増やすことができます。
  • 以下の状況に該当しない限り、アプリケーションリセットツールを実行する前に、中間トピックを削除して再作成する必要があります。
    • アプリケーションの中間トピックを消費する外部ダウンストリームコンシューマーがある。
    • 手動による中間トピックの削除と再作成が不要な開発環境を使用している。

ステップ 1: アプリケーションリセットツールの実行

コマンドラインからアプリケーションリセットツールを起動します。

<path-to-confluent>/bin/kafka-streams-application-reset

ツールには以下のパラメーターを指定できます。

Option (* = required)                 Description
---------------------                 -----------
* --application-id <String: ID>       The |kstreams| application ID
                                        (application.id).
--bootstrap-servers <String: urls>    Comma-separated list of broker urls with
                                        format: HOST1:PORT1,HOST2:PORT2
                                        (default: localhost:9092)
--by-duration <String: urls>          Reset offsets to offset by duration from
                                      current timestamp. Format: 'PnDTnHnMnS'
--config-file <String: file name>     Property file containing configs to be
                                        passed to admin clients and embedded
                                        consumer.
--dry-run                             Display the actions that would be
                                        performed without executing the reset
                                        commands.
--from-file <String: urls>            Reset offsets to values defined in CSV
                                      file.
--input-topics <String: list>         Comma-separated list of user input
                                        topics. For these topics, the tool will
                                        reset the offset to the earliest
                                        available offset.
--intermediate-topics <String: list>  Comma-separated list of intermediate user
                                        topics (topics used in the through()
                                        method). For these topics, the tool
                                        will skip to the end.
--shift-by <Long: number-of-offsets>  Reset offsets shifting current offset by
                                      'n', where 'n' can be positive or
                                      negative
--to-datetime <String>                Reset offsets to offset from datetime.
                                        Format: 'YYYY-MM-DDTHH:mm:SS.sss'
--to-earliest                         Reset offsets to earliest offset.
--to-latest                           Reset offsets to latest offset.
--to-offset <Long>                    Reset offsets to a specific offset.
--zookeeper                           Zookeeper option is deprecated by
                                        bootstrap.servers, as the reset tool
                                        would no longer access Zookeeper
                                        directly.

パラメーターは必要に応じて組み合わせることができます。たとえば、内部ステートが空の状態からアプリケーションを再起動し、以前のデータは再処理しない場合は、--input-topics--intermediate-topics の各パラメーターを省略します。

ステップ 2: アプリケーションインスタンスのローカル環境のリセット

アプリケーションを完全にリセットするためには、アプリケーションインスタンスが実行されていたすべてのマシンで、アプリケーションのローカルステートディレクトリを削除する必要があります。これは、同じマシンでアプリケーションインスタンスを再起動する前に実行する必要があります。次のいずれかの方法を使用できます。

  • アプリケーションコードで API メソッド KafkaStreams#cleanUp() を使用します。
  • 対応するローカルステートディレクトリ(デフォルトの場所は /var/lib/kafka-streams/<application.id> )を手動で削除します。詳細については、StreamsConfig クラスの state.dir を参照してください。

この例では、アプリケーションをローカルで開発およびテストし、"実行、リセット、変更" のサイクルを通じてアプリケーションを反復的に改良しようとしています。

package io.confluent.examples.streams;

import ...;

public class ResetDemo {

  public static void main(String[] args) throws Exception {
    // Kafka Streams configuration
    Properties props = new Properties();
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-streams-app");
    // ...and so on...

    // Define the processing topology
    StreamsBuilder builder = new StreamsBuilder();
    builder.stream("my-input-topic")
        .selectKey(...)
        .through("rekeyed-topic")
        .groupByKey()
        .count("global-count")
        .to("my-output-topic");

    KafkaStreams app = new KafkaStreams(builder.build(), props);

    // Delete the application's local state.
    // Note: In real application you'd call `cleanUp()` only under
    // certain conditions.  See tip on `cleanUp()` below.
    app.cleanUp();

    app.start();

    // Note: In real applications you would register a shutdown hook
    // that would trigger the call to `app.close()` rather than
    // using the sleep-then-close example we show here.
    Thread.sleep(30 * 1000L);
    app.close();
  }

}

ちなみに

反復に伴う回復オーバーヘッドを避けるには、アプリケーションインスタンスの再起動時または再開時に、毎回無条件で cleanUp() を呼び出すことを控える必要があります。たとえば、本稼働環境のアプリケーションでは、コマンドライン引数を使用して、必要に応じて cleanUp() 呼び出しを有効または無効にすることができます。その後、以下のように "実行、リセット、変更" のサイクルを実行できます。

# Run your application
  bin/kafka-run-class io.confluent.examples.streams.ResetDemo

# After stopping all application instances, reset the application
  bin/kafka-streams-application-reset --application-id my-streams-app \
                                      --input-topics my-input-topic \
                                      --intermediate-topics rekeyed-topic

# Now you can modify/recompile as needed and then re-run the application again.
# You can also experiment, for example, with different input data without
# modifying the application.

注釈

このウェブサイトには、Apache License v2 の条件に基づいて Apache Software Foundation で開発されたコンテンツが含まれています。