Streams アプリケーションの構成¶
Streams を使用する前に、Apache Kafka® と Kafka Streams の構成オプションを構成する必要があります。Kafka Streams を構成するには、java.util.Properties
インスタンスでパラメーターを指定します。
java.util.Properties
インスタンスを作成します。パラメーター を設定します。以下に例を示します。
import java.util.Properties; import org.apache.kafka.streams.StreamsConfig; Properties props = new Properties(); // Set a few key parameters props.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-first-streams-application"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker1:9092"); // Any further settings props.put(... , ...);
構成パラメーターのリファレンス¶
このセクションでは、Streams で最もよく使用される構成パラメーターについて説明します。完全なリファレンスについては、Streams および クライアント の Javadoc を参照してください。
必須の構成パラメーター¶
以下に、Streams の必須の構成パラメーターを示します。
パラメーター名 | 重要度 | 説明 | デフォルト値 |
---|---|---|---|
application.id | 必須 | ストリーム処理アプリケーションの識別子。Kafka クラスター内で一意である必要があります。 | なし |
bootstrap.servers | 必須 | Kafka クラスターへの初期接続の確立に使用するための、ホストとポートのペアのリスト。 | なし |
application.id¶
(必須)アプリケーションの ID。各ストリーム処理アプリケーションには一意の ID が必要です。アプリケーションのすべてのインスタンスには、同じ ID を割り当てる必要があります。使用する文字は、英数字、.
(ドット)、-
(ハイフン)、および _
(アンダースコア)だけにすることをお勧めします。たとえば、"hello_world"
や "hello_world-v1.0.0"
のような ID を使用できます。
この ID は、以下の場所で、アプリケーションによって使用されるリソースを他のリソースから分離するために使用されます。
- デフォルトの Kafka コンシューマーおよびプロデューサーの
client.id
プレフィックス - Kafka コンシューマーの
group.id
の調整 - ステートディレクトリのサブディレクトリ名(
state.dir
を参照) - 内部 Kafka トピック名のプレフィックス
- ヒント:
- アプリケーションをアップデートするときは、内部トピックとステートストアにある既存のデータを再利用する場合を除き、
application.id
を変更する必要があります。たとえば、application.id
にバージョン情報を埋め込んで、my-app-v1.0.0
やmy-app-v1.0.2
のようにすることができます。
オプションの構成パラメーター¶
ここでは、オプションの Streams の構成パラメーター を重要度別に整理して説明します。
- 高: これらのパラメーターは、パフォーマンスに重大な影響を及ぼす可能性があります。パラメーターの値を変更する場合は注意してください。
- 中: これらのパラメーターは、パフォーマンスにある程度の影響を及ぼす可能性があります。パラメーターにどの程度の調整が必要になるかは、具体的な環境によります。
- 低: これらのパラメーターがパフォーマンスに影響を及ぼす範囲はそれほど大きくなく、重大でもありません。
パラメーター名 | 重要度 | 説明 | デフォルト値 |
---|---|---|---|
acceptable.recovery.lag | 中 | インスタンスがキャッチアップされ、アクティブタスクを受け取ることができると見なされる、許容可能な最大ラグ(キャッチアップが必要なオフセットの数)。 | 10,000 |
application.server | 低 | 単一の Kafka Streams アプリケーション内のステートストアの位置を検出するために使用できる、ユーザー定義の組み込みのエンドポイントを指す "ホスト:ポート" のペア。この値は、アプリケーションのインスタンスごとに異なっている必要があります。 | 空の文字列 |
buffered.records.per.partition | 低 | パーティションごとにバッファに保持する最大レコード数。 | 1000 |
cache.max.bytes.buffering | 中 | すべてのスレッドでレコードキャッシュに使用されるメモリーの最大バイト数。 | 10485760 バイト |
client.id | 中 | リクエストを実行する際にサーバーに渡す ID 文字列。(この設定は、Kafka Streams が内部で使用するコンシューマークライアントやプロデューサークライアントに渡されます)。 | 空の文字列 |
commit.interval.ms | 低 | タスクの位置(ソーストピック内のオフセット)を保存する頻度。 | 30000 ミリ秒(at_least_once )または 100 ミリ秒(exactly_once および exactly_once_beta ) |
default.deserialization.exception.handler | 中 | DeserializationExceptionHandler インターフェイスを実装する例外処理クラス。 |
「default.deserialization.exception.handler」を参照 |
default.production.exception.handler | 中 | ProductionExceptionHandler インターフェイスを実装する例外処理クラス。 |
「default.production.exception.handler」を参照 |
default.key.serde | 中 | Serde インターフェイスを実装する、レコードキーに対するデフォルトのシリアライザーおよび逆シリアライザークラス(value.serde も参照)。 |
Serdes.ByteArray().getClass().getName() |
default.value.serde | 中 | Serde インターフェイスを実装する、レコード値に対するデフォルトのシリアライザーおよび逆シリアライザークラス(key.serde も参照)。 |
Serdes.ByteArray().getClass().getName() |
default.windowed.key.serde.inner | 中 | Serde インターフェイスを実装する、レコードキーに対するデフォルトの内部シリアライザーおよび逆シリアライザークラス。default.key.serde がウィンドウ化された serde の場合にのみ効果があります。 |
Serdes.ByteArray().getClass().getName() |
default.windowed.value.serde.inner | 中 | Serde インターフェイスを実装する、レコード値に対するデフォルトの内部シリアライザーおよび逆シリアライザークラス。 default.value.serde がウィンドウ化された serde の場合にのみ効果があります。 |
Serdes.ByteArray().getClass().getName() |
default.timestamp.extractor | 中 | TimestampExtractor インターフェイスを実装するデフォルトのタイムスタンプエクストラクタークラス。 |
「タイムスタンプエクストラクター」を参照 |
max.task.idle.ms | 中 | パーティションバッファの一部にしかレコードが含まれていない場合に、ストリームタスクをアイドル状態にしておく最大時間。 | 0 ミリ秒 |
max.warmup.replicas | 中 | 一度に割り当てることができるウォームアップレプリカ(構成されている num.standbys を超える追加のスタンバイ)の最大数。 | 2 |
metric.reporters | 低 | Metrics Reporter として使用するクラスのリスト。 | 空のリスト |
metrics.num.samples | 低 | メトリクスの計算用に維持されるサンプルの数。 | 2 |
metrics.recording.level | 低 | メトリクスの最高記録レベル。 | INFO |
metrics.sample.window.ms | 低 | メトリクスサンプルが計算される時間枠。 | 30000 ミリ秒 |
num.standby.replicas | 中 | タスクごとのスタンバイレプリカの数。 | 0 |
num.stream.threads | 中 | ストリーム処理を実行するスレッドの数。 | 1 |
poll.ms | 低 | 入力を待機してブロックする時間(ミリ秒)。 | 100 ミリ秒 |
probing.rebalance.interval.ms | 低 | 十分にキャッチアップされたウォームアップレプリカを探すために、バランス調整をトリガーするまでに待機する最大時間。 | 600000 ミリ秒(10 分) |
processing.guarantee | 中 | 処理モード。at_least_once (デフォルト)、exactly_once (EOS バージョン 1 用)、exactly_once_beta (EOS バージョン 2 用)のいずれかを指定できます。 |
「処理の保証」を参照 |
replication.factor | 高 | アプリケーションによって作成された changelog トピックと再パーティショントピックのレプリケーション係数。ブローカークラスターがバージョン Confluent Platform 5.4.x (Kafka 2.4.x) 以降の場合、-1 を設定することで、ブローカーのデフォルトのレプリケーション係数を使用することができます。 | 1 |
retries | 中 | ブローカーリクエストが再試行可能なエラーを返した場合の再試行回数。 | 0 |
retry.backoff.ms | 中 | リクエストが再試行されるまでの時間(ミリ秒)。retries パラメーターが 0 より大きい値に構成されている場合に適用されます。 |
100 |
rocksdb.config.setter | 中 | RocksDB 構成。 | |
state.cleanup.delay.ms | 低 | パーティションの移行時にステートを削除するまでの待ち時間(ミリ秒)。 | 600000 ミリ秒 |
state.dir | 高 | ステートストアのディレクトリの場所。 | /var/lib/kafka-streams |
topology.optimization | 低 | トポロジーの最適化を有効または無効にします。文字列 none または all を指定できます。 |
none |
upgrade.from | 中 | ローリングアップグレードでのアップグレード元のバージョン。 | 「アップグレード元」を参照 |
windowstore.changelog.additional.retention.ms | 低 | データが不完全な状態でログから削除されるのを防ぐために、ウィンドウの maintainMs に追加されます。クロックドリフトを許容します。 | 86400000 ミリ秒(1 日) |
window.size.ms | 低 | ウィンドウの終了時間を計算するために、逆シリアライザーのウィンドウサイズを設定します。 | null |
acceptable.recovery.lag¶
インスタンスがキャッチアップされ、アクティブタスクを受け取ることができると見なされる、許容可能な最大ラグ(changelog からのキャッチアップが必要なオフセットの合計数)。Streams は、ステートストアが許容可能なリカバリーラグの範囲内にあるインスタンスが存在する場合にのみ、それらにステートフルなアクティブタスクを割り当てます。まだキャッチアップされていないインスタンスについては、ウォームアップレプリカを割り当ててバックグラウンドでステートを復元します。任意のワークロードについて、この設定に相当するリカバリー時間は 1 分未満でなければなりません。0 以上に設定する必要があります。
default.deserialization.exception.handler¶
デフォルトの逆シリアル化例外ハンドラーを使用すると、逆シリアル化に失敗したレコードの例外を管理できます。例外は、破損したデータ、不適切な逆シリアル化ロジック、または処理できないレコードの種類が原因で発生する可能性があります。例外ハンドラーの実装では、レコードとスローされた例外に応じて FAIL
または CONTINUE
を返す必要があります。FAIL
を返すと Streams にシャットダウンを指示し、CONTINUE
を返すと、Streams に問題を無視して処理を続行するように指示することになります。デフォルトの実装クラスは LogAndFailExceptionHandler です。以下の例外ハンドラーを使用できます。
- LogAndContinueExceptionHandler: このハンドラーは、逆シリアル化例外を記録し、レコードの処理を続行するように処理パイプラインに指示します。記録してスキップするこの戦略では、逆シリアル化できないレコードがある場合でも、Kafka Streams を失敗させずに先へ進めることができます。
- LogAndFailExceptionHandler。このハンドラーは、逆シリアル化例外を記録し、それ以上のレコードの処理を停止するように処理パイプラインに指示します。
ライブラリによって提供されるハンドラーの他に、ニーズに合わせてカスタマイズした独自の例外ハンドラーを用意することもできます。カスタマイズした例外ハンドラーの実装例については、FAQ の「エラーと例外処理」を参照してください。
default.production.exception.handler¶
デフォルトの生成例外ハンドラーでは、生成しようとしたレコードが大きすぎる場合など、ブローカーとのやり取りの試行時にトリガーされた例外を管理できます。デフォルトでは、Kafka が提供する DefaultProductionExceptionHandler が使用されます。このハンドラーは、これらの例外の発生時に常にエラーを生成します。
各例外ハンドラーでは、レコードとスローされた例外に応じて FAIL
または CONTINUE
を返すことができます。FAIL
を返すと Streams にシャットダウンを指示し、CONTINUE
を返すと、Streams に問題を無視して処理を続行するように指示することになります。たとえば、大きすぎるレコードを常に無視する例外ハンドラーを用意する場合、以下のようなコードを実装できます。
import java.util.Properties;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.common.errors.RecordTooLargeException;
import org.apache.kafka.streams.errors.ProductionExceptionHandler;
import org.apache.kafka.streams.errors.ProductionExceptionHandler.ProductionExceptionHandlerResponse;
class IgnoreRecordTooLargeHandler implements ProductionExceptionHandler {
public void configure(Map<String, Object> config) {}
public ProductionExceptionHandlerResponse handle(final ProducerRecord<byte[], byte[]> record,
final Exception exception) {
if (exception instanceof RecordTooLargeException) {
return ProductionExceptionHandlerResponse.CONTINUE;
} else {
return ProductionExceptionHandlerResponse.FAIL;
}
}
}
Properties settings = new Properties();
// other various kafka streams settings, e.g. bootstrap servers, application ID, etc
settings.put(StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG,
IgnoreRecordTooLargeHandler.class);
default.key.serde¶
レコードキーに対するデフォルトの内部シリアライザーおよび逆シリアライザークラス。Kafka Streams では、データを具現化する必要が生じるたびにシリアル化と逆シリアル化が行われます。たとえば、以下のような場合が該当します。
- "Kafka のトピック" との間でデータの読み書きが行われるとき(
StreamsBuilder#stream()
メソッドやKStream#to()
メソッドの使用時など)。 - "ステートストア" との間でデータの読み書きが行われるとき。
詳細については、「データ型とシリアル化」を参照してください。
default.value.serde¶
レコード値に対するデフォルトのシリアライザーおよび逆シリアライザークラス。Kafka Streams では、データを具現化する必要が生じるたびにシリアル化と逆シリアル化が行われます。たとえば、以下のような場合が該当します。
- "Kafka のトピック" との間でデータの読み書きが行われるとき (
KStreamBuilder#stream()
メソッドやKStream#to()
メソッドの使用時など)。 - "ステートストア" との間でデータの読み書きが行われるとき。
詳細については、「データ型とシリアル化」を参照してください。
default.timestamp.extractor¶
タイムスタンプエクストラクターは、ConsumerRecord のインスタンスからタイムスタンプを取得します。タイムスタンプは、ストリームの進行状況を制御するために使用されます。
デフォルトのエクストラクターは FailOnInvalidTimestamp です。このエクストラクターは、Kafka バージョン 0.10 以降で Kafka プロデューサークライアントによって自動的に Kafka メッセージに埋め込まれる、組み込みのタイムスタンプを取得します。このエクストラクターは、Kafka のサーバー側の log.message.timestamp.type
ブローカーパラメーターおよび message.timestamp.type
トピックパラメーターの設定に応じて、次のいずれかを提供します。
- イベント時刻 の処理セマンティクス。これは、
log.message.timestamp.type
がCreateTime
つまり "プロデューサーの時刻" に設定されている場合(デフォルト)に提供されます。これは、Kafka プロデューサーが元のメッセージを送信した時刻を表します。Kafka の公式のプロデューサークライアント、または Confluent のいずれかのプロデューサークライアントを使用している場合、タイムスタンプはエポックからのミリ秒数を表します。 - インジェスト時刻 の処理セマンティクス。これは、
log.message.timestamp.type
がLogAppendTime
つまり "ブローカーの時刻" に設定されている場合に提供されます。これは、Kafka ブローカーが元のメッセージを受信した時刻を、エポックからのミリ秒数で表します。
レコードに含まれている組み込みのタイムスタンプが無効(負)の場合、Kafka Streams ではそのレコードが処理されず、警告なしで破棄されるため、FailOnInvalidTimestamp
エクストラクターは例外をスローします。無効な組み込みのタイムスタンプが発生する原因には、さまざまなものが考えられます。たとえば、0.10 より前の Kafka プロデューサークライアント、または新しい Kafka 0.10 のメッセージフォーマットをまだサポートしていないサードパーティ製プロデューサークライアントによって書き込まれたトピックを消費した場合に発生します。別の状況として、Kafka クラスターを 0.9
から 0.10
にアップグレードした後にも発生する可能性があります。この場合、0.9
で生成されたデータには、いずれも 0.10
のメッセージタイムスタンプが含まれていないためです。
タイムスタンプが無効でもデータを処理する場合は、代わりに使用できるエクストラクターが 2 つあります。どちらも組み込みのタイムスタンプを取得しますが、無効なタイムスタンプの処理方法が異なります。
- LogAndSkipOnInvalidTimestamp: このエクストラクターは、警告メッセージを記録し、無効なタイムスタンプを Kafka Streams に返します。その結果、レコードは処理されずに破棄されます。記録してスキップするこの戦略では、入力データに無効な組み込みのタイムスタンプを持つレコードがある場合でも、Kafka Streams を失敗させずに先へ進めることができます。
- UsePartitionTimeOnInvalidTimestamp。このエクストラクターは、レコードの組み込みのタイムスタンプが有効であれば(負でなければ)、そのタイムスタンプを返します。レコードに有効な組み込みのタイムスタンプが含まれていない場合、エクストラクターは、同じトピックパーティションのレコードから抽出された直前の有効なタイムスタンプを、現在のレコードの推定タイムスタンプとして返します。タイムスタンプを推定できない場合は、例外がスローされます。
もう 1 つの組み込みのエクストラクターとして WallclockTimestampExtractor があります。このエクストラクターは、消費されたレコードから実際にタイムスタンプを "抽出" するのではなく、システムクロックから現在の時刻をミリ秒で返します(System.currentTimeMillis()
)。これは実質的に、Streams がイベントの 処理時刻 に基づいて動作することを意味します。
独自のタイムスタンプエクストラクターを提供することもできます。たとえば、メッセージのペイロードに埋め込まれたタイムスタンプを取得するエクストラクターを作成できます。有効なタイムスタンプを抽出できない場合は、例外をスローするか、負のタイムスタンプを返すか、または推定のタイムスタンプを返すことができます。負のタイムスタンプを返すと、対応するレコードは処理されずに警告なしで破棄されるため、データの損失が発生します。新しいタイムスタンプを推定する場合は、previousTimestamp
で提供される値を使用できます(これは Kafka Streams によるタイムスタンプの推定と同じです)。カスタムの TimestampExtractor
の実装を以下に示します。
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.streams.processor.TimestampExtractor;
// Extracts the embedded timestamp of a record (giving you "event-time" semantics).
public class MyEventTimeExtractor implements TimestampExtractor {
@Override
public long extract(final ConsumerRecord<Object, Object> record, final long previousTimestamp) {
// `Foo` is your own custom class, which we assume has a method that returns
// the embedded timestamp (milliseconds since midnight, January 1, 1970 UTC).
long timestamp = -1;
final Foo myPojo = (Foo) record.value();
if (myPojo != null) {
timestamp = myPojo.getTimestampInMillis();
}
if (timestamp < 0) {
// Invalid timestamp! Attempt to estimate a new timestamp,
// otherwise fall back to wall-clock time (processing-time).
if (previousTimestamp >= 0) {
return previousTimestamp;
} else {
return System.currentTimeMillis();
}
}
return timestamp;
}
}
その後、以下のように Streams 構成でカスタムのタイムスタンプエクストラクターを定義します。
import java.util.Properties;
import org.apache.kafka.streams.StreamsConfig;
Properties streamsConfiguration = new Properties();
streamsConfiguration.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, MyEventTimeExtractor.class);
max.task.idle.ms¶
パーティションバッファの一部にしかレコードが含まれていない場合に、複数の入力ストリームで順序外のレコード処理が発生する可能性を避けるために、タスクをアイドル状態にしておく最大時間。タスクの入力トピックパーティションの一部にしか処理するデータがない場合、空のパーティションから次のレコードのタイムスタンプが取得されることは期待できません。データのあるパーティションのレコードを処理し続けると、順序外のデータ処理が発生するリスクがあります。つまり、以前のタイムスタンプを持つレコードが後から受信され、新しいタイムスタンプを持つ他のレコードの後に処理される可能性があります。max.task.idle.ms
を大きい値に設定すると、アプリケーションである程度の処理レイテンシを許容する代わりに、順序外のデータ処理が発生する可能性を低減できます。Kafka Streams は、使用可能な既存のレコードの処理を一時停止し、空のトピックパーティションからのフェッチを続行します。
max.warmup.replicas¶
ウォームアップレプリカの最大数。ウォームアップレプリカは、構成されている num.standbys
を超える追加のスタンバイです。これらは、いずれかのインスタンスでタスクを利用可能な状態に保つために割り当てられることがあり、それまでは再割り当て先の別のインスタンスでウォームアップしています。この構成は、高可用性を目的とした追加のブローカートラフィックとクラスターステートを抑えるために使用されます。値を大きくすると、Kafka Streams で同時にウォームアップできるタスクの数が増えるため、再割り当てされているウォームアップをアクティブタスクに移行するときに、適切なステートに復元する時間が短縮されます。1 以上に設定する必要があります。
num.standby.replicas¶
スタンバイレプリカの数。スタンバイレプリカは、ローカルステートストアのシャドウコピーです。Kafka Streams は、指定された数のレプリカをストアごとに作成し、十分な数のインスタンスが実行されている間、各レプリカを最新の状態に維持しようと試みます。スタンバイレプリカは、タスクのフェールオーバーのレイテンシを最小化するために使用されます。障害が発生したインスタンスで以前に実行されていたタスクは、スタンバイレプリカのあるインスタンスで再起動するのが望ましく、そうすれば changelog からローカルステートストアを復元するプロセスが最小限に抑えられます。フェールオーバー時のタスクの再開コストを最小化するために Kafka Streams でスタンバイレプリカが使用される方法の詳細については、「ステート」セクションで説明されています。
注釈
n 個のスタンバイレプリカを構成した場合、n+1 個の KafkaStreams
インスタンスをプロビジョニングする必要があります。
num.stream.threads¶
Kafka Streams アプリケーションのインスタンスあたりのストリームスレッドの数を指定します。これらのスレッドでストリーム処理コードが実行されます。Kafka Streams のスレッドモデルの詳細については、「スレッドモデル」を参照してください。
probing.rebalance.interval.ms¶
バランス調整をトリガーするまでの最大待ち時間。バランス調整では、キャッチアップされていると見なせる状態に復元されたウォームアップレプリカが探索されます。Kafka Streams は、 acceptable.recovery.lag の範囲内にあり、キャッチアップされているインスタンスが存在する場合にのみ、それらにステートフルなアクティブタスクを割り当てます。バランス調整の探索は、ウォームアップレプリカの最新の合計ラグをクエリで照会し、準備ができたものをアクティブタスクに移行するために使用されます。これは、ウォームアップタスクがある限り、割り当てのバランスが取れるまでトリガーされ続けます。1 分以上に設定する必要があります。
processing.guarantee¶
使用する処理の保証。指定可能な値は、"at_least_once"
(デフォルト)、"exactly_once"
、および "exactly_once_beta"
です。"exactly_once"
を使用するには、ブローカーバージョン 0.11.0 以降が必要です。 "exactly_once_beta"
を使用するには、ブローカーバージョン 2.5 以降が必要です。"厳密に 1 回" の処理を有効にすると、commit.interval.ms
パラメーターのデフォルト値は 100ms
に変更されます。また、デフォルトでは、コンシューマーの構成は isolation.level="read_committed"
、プロデューサーの構成は enable.idempotence=true
になります。"exactly_once"
の処理では、デフォルトで 3 つ以上のブローカーからなるクラスターが必要とされることに注意してください。これは本稼働環境で推奨される設定です。開発環境では、ブローカーの設定を調整し、使用するブローカーの数を transaction.state.log.replication.factor
と transaction.state.log.min.isr
に指定することで、これを変更できます。詳細については、「処理の保証」を参照してください。
replication.factor¶
ローカルステートが使用されるとき、またはストリームが集約のために再パーティション化されるときに Kafka Streams が作成する内部トピックの、レプリケーション係数を指定します。レプリケーションはフォールトトレランスを維持するために重要です。レプリケーションを行わない場合、ブローカーで障害が 1 つのブローカーで障害が発生しただけで、ストリーム処理アプリケーションを続行できなくなる可能性があります。ソーストピックと同様のレプリケーション係数を使用することをお勧めします。
- 推奨事項:
- 内部 Kafka Streams トピックで 最大 2 つのブローカーの障害を許容できるように、レプリケーション係数を 3 に増やすことをお勧めします。ただし、必要なストレージ領域も増えることに注意してください(レプリケーション係数が 3 の場合は 3 倍)。
rocksdb.config.setter¶
RocksDB 構成。Kafka Streams では、永続ストアのデフォルトのストレージエンジンとして RocksDB が使用されます。RocksDB のデフォルト構成を変更するには、RocksDBConfigSetter
を実装し、そのカスタムクラスを rocksdb.config.setter に指定します。
以下に、RocksDB で消費されるメモリーサイズを調整する例を示します。
public static class CustomRocksDBConfig implements RocksDBConfigSetter {
// This object should be a member variable so it can be closed in RocksDBConfigSetter#close.
private org.rocksdb.Cache cache = new org.rocksdb.LRUCache(16 * 1024L * 1024L);
@Override
public void setConfig(final String storeName, final Options options, final Map<String, Object> configs) {
// See #1 below.
BlockBasedTableConfig tableConfig = (BlockBasedTableConfig) options.tableFormatConfig();
tableConfig.setBlockCache(cache);
// See #2 below.
tableConfig.setBlockSize(16 * 1024L);
// See #3 below.
tableConfig.setCacheIndexAndFilterBlocks(true);
options.setTableFormatConfig(tableConfig);
// See #4 below.
options.setMaxWriteBufferNumber(2);
}
@Override
public void close(final String storeName, final Options options) {
// See #5 below.
cache.close();
}
}
Properties streamsSettings = new Properties();
streamsConfig.put(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG, CustomRocksDBConfig.class);
- 例に関する注記:
BlockBasedTableConfig tableConfig = (BlockBasedTableConfig) options.tableFormatConfig();
新しく作成するのではなく、既存のTableFormatConfig
への参照を取得します。これにより、重要な最適化設定であるBloomFilter
などのデフォルト値を誤って上書きする心配がなくなります。tableConfig.setBlockSize(16 * 1024L);
RocksDB GitHub(『Indexes and filter blocks』) に記載されている手順に従って、 デフォルト値 を変更します。tableConfig.setCacheIndexAndFilterBlocks(true);
インデックスブロックとフィルターブロックが制限なく増大するのを防ぎます。詳細については、RocksDB GitHub(インデックスブロックとフィルターブロックのキャッシュ) を参照してください。options.setMaxWriteBufferNumber(2);
RocksDB GitHub で高度なオプションを参照してください。cache.close();
メモリーリークを防ぐために、org.rocksdb.RocksObject を継承するすべての構築済みオブジェクトを終了する必要があります。詳細については、RocksJava のドキュメント を参照してください。
state.dir¶
ステートディレクトリ。Kafka Streams は、ステートディレクトリにローカルステートを永続化します。アプリケーションごとに、そのアプリケーションをホストしているマシンのステートディレクトリの下にサブディレクトリが作成されます。サブディレクトリの名前はアプリケーション ID になります。このサブディレクトリの下に、アプリケーションに関連付けられたステートストアが作成されます。
topology.optimization¶
Kafka Streams でトポロジーの最適化を適用する必要があることを示します。現在、最適化の設定はすべてを有効にするか無効にするかの切り替えしかなく、デフォルトでは無効になっています。これらの最適化には、再パーティショントピックを移動および削減する機能と、ソーストピックをソース KTable の changelog として再利用する機能があります。このオプションを有効にすることをお勧めします。
2.3 の時点では、最適化を有効にするには 2 つの作業が必要です。この構成を StreamsConfig.OPTIMIZE
に設定することに加えて、トポロジーの構築時に、オーバーロードされた StreamsBuilder.build(Properties)
メソッドを使用して構成プロパティを渡す必要があります。
KafkaStreams myStream = new KafkaStreams(streamsBuilder.build(properties), properties)
upgrade.from¶
アップグレード元のバージョン。アップグレードガイドに記載されているように、特定のバージョンへのローリングアップグレードを実行する場合、この構成を設定することが重要です。この構成を適切なバージョンに設定してから、インスタンスを再起動して新しいバージョンにアップグレードする必要があります。すべてが新しいバージョンになったら、この構成を削除し、2 回目のローリング再起動を実行します。この構成を設定して 2 回の再起動を伴うアップグレードパスに従う必要があるのは、2.0 より前のバージョンからアップグレードする場合か、2.4 より前のバージョンから 2.4 以降にアップグレードする場合だけです。
Kafka コンシューマー、プロデューサー、および管理クライアントの構成パラメーター¶
内部で使用される Kafka コンシューマー、プロデューサー、および 管理クライアント のパラメーターを指定できます。コンシューマー、プロデューサー、および管理クライアントの設定は、StreamsConfig
インスタンスのパラメーターを指定することで定義されます。
以下の例では、Streams の設定で、Kafka コンシューマーのセッションタイムアウト を 60000 ミリ秒に構成します。
Properties streamsSettings = new Properties();
// Example of a "normal" setting for Kafka Streams
streamsSettings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker-01:9092");
// Customize the Kafka consumer settings of your Streams application
streamsSettings.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 60000);
名前の指定¶
コンシューマー、プロデューサー、および管理クライアントのいくつかの構成パラメーターでは、同じパラメーター名が使用されます。たとえば、TCP バッファの構成には send.buffer.bytes
と receive.buffer.bytes
が使用されます。 request.timeout.ms
と retry.backoff.ms
は、クライアントリクエストの再試行を制御します。名前の重複を避けるには、パラメーター名に consumer.
、producer.
、または admin
というプレフィックスを付けます( consumer.send.buffer.bytes
や producer.send.buffer.bytes
など)。
Properties streamsSettings = new Properties();
// same value for consumer and producer
streamsSettings.put("PARAMETER_NAME", "value");
// different values for consumer, producer, and admin client
streamsSettings.put("consumer.PARAMETER_NAME", "consumer-value");
streamsSettings.put("producer.PARAMETER_NAME", "producer-value");
streamsSettings.put("admin.PARAMETER_NAME", "admin-value");
// alternatively, you can use
streamsSettings.put(StreamsConfig.consumerPrefix("PARAMETER_NAME"), "consumer-value");
streamsSettings.put(StreamsConfig.producerPrefix("PARAMETER_NAME"), "producer-value");
streamsSettings.put(StreamsConfig.adminClientPrefix("PARAMETER_NAME"), "admin-value");
メインコンシューマー、復元コンシューマー、およびグローバルコンシューマーのパラメーター名には、先頭に以下のプレフィックスが付きます。
main.consumer.
-- メインコンシューマーの場合。メインコンシューマーは、ストリームソースのデフォルトのコンシューマーです。restore.consumer.
-- 復元コンシューマーの場合。復元コンシューマーは、ステートストアの回復を管理します。global.consumer.
-- グローバルコンシューマーの場合。グローバルコンシューマーは、グローバル KTable の構築に使用されます。
これらのプレフィックスを持つパラメーターの値を設定すると、consumer
パラメーターの設定値がオーバーライドされます。たとえば、以下の構成では consumer.max.poll.record
の値がオーバーライドされます。
consumer.max.poll.record = 5
main.consumer.max.poll.record = 100
restore.consumer.max.poll.record = 50
初期化中に、各コンシューマーではこれらの設定が以下のように反映されます。
コンシューマーの種類 | max.poll.record value | 理由 |
---|---|---|
コンシューマー | 5 | すべての種類のコンシューマーでデフォルト値は 5 になります。 |
メインコンシューマー | 100 | main.consumer. プレフィックスで対象として指定されています。 |
復元コンシューマー | 50 | restore.consumer. プレフィックスでデフォルト値がオーバーライドされます。 |
グローバルコンシューマー | 5 | global.consumer プレフィックスは指定されていないため、デフォルト値が使用されます。 |
たとえば、復元コンシューマーだけを構成し、他のコンシューマーの設定は変更しない場合は、restore.consumer.
を使用して構成を設定できます。
Properties streamsSettings = new Properties();
// same config value for all consumer types
streamsSettings.put("consumer.PARAMETER_NAME", "general-consumer-value");
// set a different restore consumer config. This would make restore consumer take restore-consumer-value,
// while main consumer and global consumer stay with general-consumer-value
streamsSettings.put("restore.consumer.PARAMETER_NAME", "restore-consumer-value");
// alternatively, you can use
streamsSettings.put(StreamsConfig.restoreConsumerPrefix("PARAMETER_NAME"), "restore-consumer-value");
内部トピックパラメーター¶
内部の再パーティショントピックや changelog トピックを構成するには、topic.
プレフィックスの後に標準のトピック構成プロパティを続けます。
Properties streamsSettings = new Properties();
// Override default for both changelog and repartition topics
streamsSettings.put("topic.PARAMETER_NAME", "topic-value");
// alternatively, you can use
streamsSettings.put(StreamsConfig.topicPrefix("PARAMETER_NAME"), "topic-value");
デフォルト値¶
Kafka Streams では、基盤となるクライアント構成の一部に対してさまざまなデフォルト値が使用されます。以下は、これらを簡単にまとめたものです。これらの構成の詳細については、「プロデューサーの構成」と「コンシューマーの構成」を参照してください。
パラメーター名 | 対応するクライアント | Streams のデフォルト |
---|---|---|
auto.offset.reset | コンシューマー | earliest |
linger.ms | プロデューサー | 100 |
max.poll.records | コンシューマー | 1000 |
Kafka Streams によって制御されるパラメーター¶
以下の構成パラメーターは Kafka Streams によって割り当てられます。Kafka Streams アプリケーションでは、allow.auto.create.topics
を変更しようとしてもユーザーの値は無視され、設定の効果はありません。他のパラメーターは設定できます。Kafka Streams が設定するデフォルト値は、プレーンな KafkaConsumer
とは異なります。
Kafka Streams は、client.id
パラメーターを使用して内部クライアント用のクライアント ID を生成します。client.id
が指定されていない場合は、Kafka Streams によって <application.id>-<random-UUID>
に設定されます。
注釈
Kafka Streams による構成パラメーターへの値の割り当てに関しては、いくつかの特別な考慮事項があります。
- グローバルコンシューマーは Kafka Streams インスタンスごとに 1 つだけ存在します。
- 復元コンシューマーはスレッドごとに 1 つ存在します。
- プロデューサーの
client.id
: 値は構成されている処理の保証によって異なります。- EOS が無効または EOS バージョン 2 が有効: プロデューサーはスレッドごとに 1 つだけ存在します。
- EOS バージョン 1 が有効: プロデューサーはタスクごとに 1 つだけ存在します。
partition.assignment.strategy
: 割り当て方法のパラメーターは、メインコンシューマーにのみ影響します。グローバルコンシューマーと復元コンシューマーでは、"トピックサブスクリプション" の代わりに "パーティション割り当て" が使用されます。また、コンシューマーグループは形成されないため、PartitionAssignor
が使用されることはありません。
パラメーター名 | 対応するクライアント | Kafka Streams によって割り当てられる値 |
---|---|---|
allow.auto.create.topics | コンシューマー | false |
auto.offset.reset | グローバルコンシューマー | なし |
auto.offset.reset | 復元コンシューマー | なし |
client.id | 管理 | <client.id>-admin |
client.id | コンシューマー | <client.id>-StreamThread-<threadIndex>-consumer |
client.id | グローバルコンシューマー | <client.id>-global-consumer |
client.id | 復元コンシューマー | <client.id>-StreamThread-<threadIndex>-restore-consumer |
client.id | プロデューサー |
|
enable.auto.commit | コンシューマー | false |
group.id | コンシューマー | application.id と同じです。 |
group.id | グローバルコンシューマー | null |
group.id | 復元コンシューマー | null |
group.instance.id | コンシューマー | ユーザーが指定した設定に -<threadIndex> というサフィックスが追加されます。 |
partition.assignment.strategy | コンシューマー | 常に StreamsPartitionAssignor に設定されます。 |
enable.auto.commit¶
コンシューマーの自動コミット。"少なくとも 1 回" の処理セマンティクスを保証し、自動コミットを無効にするために、Kafka Streams はこのコンシューマー構成値を false
にオーバーライドします。コンシューマーのコミットは、Kafka Streams ライブラリまたはユーザーが現在の処理ステートをコミットすることを決定したときに、明示的に commitSync の呼び出しを通じてのみ行われます。
回復性を維持するために推奨される構成パラメーター¶
ブローカーで障害が発生した場合の回復性を維持するには、Kafka および Kafka Streams の構成オプションをいくつか明示的に構成する必要があります。
パラメーター名 | 対応するクライアント | デフォルト値 | 推奨される設定 |
---|---|---|---|
acks | プロデューサー | acks=1 |
acks=all |
replication.factor | Streams | 1 |
3 |
min.insync.replicas | ブローカー | 1 |
2 |
レプリケーション係数を 3 に増やすと、内部 Kafka Streams トピックで 最大 2 つのブローカーの障害を許容できるようになります。acks の設定を "all" に変更すると、少なくとも 1 つのレプリカが存続している限り、レコードは失われないことが保証されます。ただし、デフォルト値から推奨値に変更する場合のトレードオフとして、回復性を高めるために、パフォーマンスにある程度の負荷がかかるとともに、より多くのストレージ領域(レプリケーション係数が 3 の場合は 3 倍)が必要になります。
acks¶
リクエストが完了したと見なされるまでにリーダーが受け取る必要のある確認応答の数。この設定は、送信されるレコードの永続性を制御します。指定可能な値は以下のとおりです。
acks=0
プロデューサーはサーバーからの確認応答を待機しません。レコードはすぐにソケットバッファに追加され、送信済みと見なされます。この場合、サーバーがレコードを受信したかどうかの保証はなく、retries
構成は効果を持ちません(通常はクライアントでエラーの発生を知ることができないため)。各レコードに対して返されるオフセットは、常に-1
に設定されます。acks=1
リーダーはレコードをローカルログに書き込み、すべてのフォロワーからの完全な確認応答を待たずに応答します。レコードを確認した直後、フォロワーでレコードがレプリケートされる前にリーダーで障害が発生すると、レコードは失われます。acks=all
リーダーは、一連のすべての同期レプリカを待機してレコードを確認します。これにより、少なくとも 1 つの同期レプリカが存続している限り、レコードは失われないことが保証されます。これが最も強力な保証の設定です。
詳細については、Kafka プロデューサーのドキュメント を参照してください。
replication.factor¶
こちらの説明 を参照してください。
これらの設定は StreamsConfig
を通じて定義します。
Properties streamsSettings = new Properties();
streamsSettings.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3);
streamsSettings.put(StreamsConfig.topicPrefix(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG), 2);
streamsSettings.put(StreamsConfig.producerPrefix(ProducerConfig.ACKS_CONFIG), "all");
注釈
将来のバージョンの Kafka Streams では、開発者が Properties
インスタンスを使用してアプリ固有の構成設定を独自に設定し、それらの設定には ProcessorContext を通じてアクセスできるようになる予定です。
注釈
このウェブサイトには、Apache License v2 の条件に基づいて Apache Software Foundation で開発されたコンテンツが含まれています。