Kafka Streams アプリケーションリセットツール¶
アプリケーションリセットツールを使用すると、アプリケーションをリセットし、強制的にそのデータを最初から再処理することができます。このツールは、開発とテストやバグの修正時に役立つことがあります。
アプリケーションリセットツールは、アプリケーションをリセットするとき、Kafka Streams の ユーザートピック (入力、出力、および中間トピック)と 内部トピック を別々の方法で処理します。
以下に、それぞれのトピックの種類に対するアプリケーションリセットツールの動作を示します。
- 入力トピック: オフセットを指定の位置にリセットします。デフォルトでは、トピックの先頭にリセットされます。
- 中間トピック : トピックの末尾までスキップします。つまり、すべてのパーティションについて、(コンシューマーグループ
application.id
に対する)アプリケーションのコミット済みのコンシューマーオフセットを、各パーティションのlogSize
に設定します。 - 内部トピック: 内部トピックを削除します(これにより、コミットされたオフセットがすべて自動的に削除されます)。
アプリケーションリセットツールでは、以下の操作は行われません。
- アプリケーションの出力トピックのリセット。いずれかの出力(または中間)トピックがダウンストリームアプリケーションで消費される場合、アップストリームアプリケーションのリセット時にダウンストリームアプリケーションを適切に調整するのは、開発者の責任となります。
- アプリケーションインスタンスのローカル環境のリセット。アプリケーションインスタンスが実行されていたすべてのマシンでローカルステートを削除するのは、開発者の責任となります。これを行う方法については、「ステップ 2: アプリケーションインスタンスのローカル環境のリセット」セクションの手順を参照してください。
- 前提条件
- アプリケーションのすべてのインスタンスが停止している必要があります。停止していないと、アプリケーションが無効なステートになったり、クラッシュしたり、不適切な結果を生成したりする可能性があります。
bin/kafka-consumer-groups
を使用すると、ID がapplication.id
のコンシューマーグループがアクティブかどうかを確認できます。 - ツールの使用時には十分な注意を払い、パラメーターをよく確認してください。パラメーターの値が間違っていたり(
application.id
の誤記など)、矛盾したパラメーターを指定したりすると(アプリケーションの入力トピックが正しくないなど)、ツールによってアプリケーションのステートが無効化されることがあり、他のアプリケーション、コンシューマーグループ、Apache Kafka® トピックに影響が生じる可能性もあります。 - アプリケーションリセットツールを実行する前に、すべての中間トピックを手動で削除して再作成する必要があります。これにより、Kafka ブローカーのディスクの空き領域を増やすことができます。
- 以下の状況に該当しない限り、アプリケーションリセットツールを実行する前に、中間トピックを削除して再作成する必要があります。
- アプリケーションの中間トピックを消費する外部ダウンストリームコンシューマーがある。
- 手動による中間トピックの削除と再作成が不要な開発環境を使用している。
- アプリケーションのすべてのインスタンスが停止している必要があります。停止していないと、アプリケーションが無効なステートになったり、クラッシュしたり、不適切な結果を生成したりする可能性があります。
ステップ 1: アプリケーションリセットツールの実行¶
コマンドラインからアプリケーションリセットツールを起動します。
<path-to-confluent>/bin/kafka-streams-application-reset
ツールには以下のパラメーターを指定できます。
Option (* = required) Description
--------------------- -----------
* --application-id <String: ID> The |kstreams| application ID
(application.id).
--bootstrap-servers <String: urls> Comma-separated list of broker urls with
format: HOST1:PORT1,HOST2:PORT2
(default: localhost:9092)
--by-duration <String: urls> Reset offsets to offset by duration from
current timestamp. Format: 'PnDTnHnMnS'
--config-file <String: file name> Property file containing configs to be
passed to admin clients and embedded
consumer.
--dry-run Display the actions that would be
performed without executing the reset
commands.
--from-file <String: urls> Reset offsets to values defined in CSV
file.
--input-topics <String: list> Comma-separated list of user input
topics. For these topics, the tool will
reset the offset to the earliest
available offset.
--intermediate-topics <String: list> Comma-separated list of intermediate user
topics (topics used in the through()
method). For these topics, the tool
will skip to the end.
--shift-by <Long: number-of-offsets> Reset offsets shifting current offset by
'n', where 'n' can be positive or
negative
--to-datetime <String> Reset offsets to offset from datetime.
Format: 'YYYY-MM-DDTHH:mm:SS.sss'
--to-earliest Reset offsets to earliest offset.
--to-latest Reset offsets to latest offset.
--to-offset <Long> Reset offsets to a specific offset.
--zookeeper Zookeeper option is deprecated by
bootstrap.servers, as the reset tool
would no longer access Zookeeper
directly.
パラメーターは必要に応じて組み合わせることができます。たとえば、内部ステートが空の状態からアプリケーションを再起動し、以前のデータは再処理しない場合は、--input-topics
と --intermediate-topics
の各パラメーターを省略します。
ステップ 2: アプリケーションインスタンスのローカル環境のリセット¶
アプリケーションを完全にリセットするためには、アプリケーションインスタンスが実行されていたすべてのマシンで、アプリケーションのローカルステートディレクトリを削除する必要があります。これは、同じマシンでアプリケーションインスタンスを再起動する前に実行する必要があります。次のいずれかの方法を使用できます。
- アプリケーションコードで API メソッド
KafkaStreams#cleanUp()
を使用します。 - 対応するローカルステートディレクトリ(デフォルトの場所は
/var/lib/kafka-streams/<application.id>
)を手動で削除します。詳細については、StreamsConfig クラスの state.dir を参照してください。
例¶
この例では、アプリケーションをローカルで開発およびテストし、"実行、リセット、変更" のサイクルを通じてアプリケーションを反復的に改良しようとしています。
package io.confluent.examples.streams;
import ...;
public class ResetDemo {
public static void main(String[] args) throws Exception {
// Kafka Streams configuration
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-streams-app");
// ...and so on...
// Define the processing topology
StreamsBuilder builder = new StreamsBuilder();
builder.stream("my-input-topic")
.selectKey(...)
.through("rekeyed-topic")
.groupByKey()
.count("global-count")
.to("my-output-topic");
KafkaStreams app = new KafkaStreams(builder.build(), props);
// Delete the application's local state.
// Note: In real application you'd call `cleanUp()` only under
// certain conditions. See tip on `cleanUp()` below.
app.cleanUp();
app.start();
// Note: In real applications you would register a shutdown hook
// that would trigger the call to `app.close()` rather than
// using the sleep-then-close example we show here.
Thread.sleep(30 * 1000L);
app.close();
}
}
ちなみに
反復に伴う回復オーバーヘッドを避けるには、アプリケーションインスタンスの再起動時または再開時に、毎回無条件で cleanUp()
を呼び出すことを控える必要があります。たとえば、本稼働環境のアプリケーションでは、コマンドライン引数を使用して、必要に応じて cleanUp()
呼び出しを有効または無効にすることができます。その後、以下のように "実行、リセット、変更" のサイクルを実行できます。
# Run your application
bin/kafka-run-class io.confluent.examples.streams.ResetDemo
# After stopping all application instances, reset the application
bin/kafka-streams-application-reset --application-id my-streams-app \
--input-topics my-input-topic \
--intermediate-topics rekeyed-topic
# Now you can modify/recompile as needed and then re-run the application again.
# You can also experiment, for example, with different input data without
# modifying the application.
注釈
このウェブサイトには、Apache License v2 の条件に基づいて Apache Software Foundation で開発されたコンテンツが含まれています。