Streams アプリケーションの構成

Streams を使用する前に、Apache Kafka® と Kafka Streams の構成オプションを構成する必要があります。Kafka Streams を構成するには、java.util.Properties インスタンスでパラメーターを指定します。

  1. java.util.Properties インスタンスを作成します。

  2. パラメーター を設定します。以下に例を示します。

    import java.util.Properties;
    import org.apache.kafka.streams.StreamsConfig;
    
    Properties props = new Properties();
    // Set a few key parameters
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-first-streams-application");
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker1:9092");
    // Any further settings
    props.put(... , ...);
    

構成パラメーターのリファレンス

このセクションでは、Streams で最もよく使用される構成パラメーターについて説明します。完全なリファレンスについては、Streams および クライアント の Javadoc を参照してください。

必須の構成パラメーター

以下に、Streams の必須の構成パラメーターを示します。

パラメーター名 重要度 説明 デフォルト値
application.id 必須 ストリーム処理アプリケーションの識別子。Kafka クラスター内で一意である必要があります。 なし
bootstrap.servers 必須 Kafka クラスターへの初期接続の確立に使用するための、ホストとポートのペアのリスト。 なし

application.id

(必須)アプリケーションの ID。各ストリーム処理アプリケーションには一意の ID が必要です。アプリケーションのすべてのインスタンスには、同じ ID を割り当てる必要があります。使用する文字は、英数字、. (ドット)、- (ハイフン)、および _ (アンダースコア)だけにすることをお勧めします。たとえば、"hello_world""hello_world-v1.0.0" のような ID を使用できます。

この ID は、以下の場所で、アプリケーションによって使用されるリソースを他のリソースから分離するために使用されます。

  • デフォルトの Kafka コンシューマーおよびプロデューサーの client.id プレフィックス
  • Kafka コンシューマーの group.id の調整
  • ステートディレクトリのサブディレクトリ名(state.dir を参照)
  • 内部 Kafka トピック名のプレフィックス
ヒント:
アプリケーションをアップデートするときは、内部トピックとステートストアにある既存のデータを再利用する場合を除き、application.id を変更する必要があります。たとえば、application.id にバージョン情報を埋め込んで、my-app-v1.0.0my-app-v1.0.2 のようにすることができます。

bootstrap.servers

(必須)|ak| ブートストラップサーバー。これは、基盤となるプロデューサークライアントおよびコンシューマークライアントで Kafka クラスターへの接続に使用される 設定 と同じです。たとえば、"kafka-broker1:9092,kafka-broker2:9092" のように指定します。

ヒント:
Kafka Streams アプリケーションは、この構成値で指定された単一の Kafka クラスターとのみ通信できます。

オプションの構成パラメーター

ここでは、オプションの Streams の構成パラメーター を重要度別に整理して説明します。

  • 高: これらのパラメーターは、パフォーマンスに重大な影響を及ぼす可能性があります。パラメーターの値を変更する場合は注意してください。
  • 中: これらのパラメーターは、パフォーマンスにある程度の影響を及ぼす可能性があります。パラメーターにどの程度の調整が必要になるかは、具体的な環境によります。
  • 低: これらのパラメーターがパフォーマンスに影響を及ぼす範囲はそれほど大きくなく、重大でもありません。
パラメーター名 重要度 説明 デフォルト値
acceptable.recovery.lag インスタンスがキャッチアップされ、アクティブタスクを受け取ることができると見なされる、許容可能な最大ラグ(キャッチアップが必要なオフセットの数)。 10,000
application.server 単一の Kafka Streams アプリケーション内のステートストアの位置を検出するために使用できる、ユーザー定義の組み込みのエンドポイントを指す "ホスト:ポート" のペア。この値は、アプリケーションのインスタンスごとに異なっている必要があります。 空の文字列
buffered.records.per.partition パーティションごとにバッファに保持する最大レコード数。 1000
cache.max.bytes.buffering すべてのスレッドでレコードキャッシュに使用されるメモリーの最大バイト数。 10485760 バイト
client.id リクエストを実行する際にサーバーに渡す ID 文字列。(この設定は、Kafka Streams が内部で使用するコンシューマークライアントやプロデューサークライアントに渡されます)。 空の文字列
commit.interval.ms

タスクの位置(ソーストピック内のオフセット)を保存する頻度。

  • For at-least-once processing, committing means saving the position (offsets) of the processor.
  • For exactly-once processing, it means to commit the transaction, which includes saving the position.
30000 ミリ秒(at_least_once)または 100 ミリ秒(exactly_once_v2
default.deserialization.exception.handler DeserializationExceptionHandler インターフェイスを実装する例外処理クラス。 default.deserialization.exception.handler」を参照
default.production.exception.handler ProductionExceptionHandler インターフェイスを実装する例外処理クラス。 default.production.exception.handler」を参照
default.key.serde Serde インターフェイスを実装する、レコードキーに対するデフォルトのシリアライザーおよび逆シリアライザークラス(value.serde も参照)。 null
default.value.serde Serde インターフェイスを実装する、レコード値に対するデフォルトのシリアライザーおよび逆シリアライザークラス(key.serde も参照)。 null
default.windowed.key.serde.inner Serde インターフェイスを実装する、レコードキーに対するデフォルトの内部シリアライザーおよび逆シリアライザークラス。default.key.serde がウィンドウ化された serde の場合にのみ効果があります。 Serdes.ByteArray().getClass().getName()
default.windowed.value.serde.inner Serde インターフェイスを実装する、レコード値に対するデフォルトの内部シリアライザーおよび逆シリアライザークラス。 default.value.serde がウィンドウ化された serde の場合にのみ効果があります。 Serdes.ByteArray().getClass().getName()
default.timestamp.extractor TimestampExtractor インターフェイスを実装するデフォルトのタイムスタンプエクストラクタークラス。 タイムスタンプエクストラクター」を参照
max.task.idle.ms 順番どおりの処理のセマンティクスを維持するために、Streams がデータをフェッチする前に待機する時間の最大値。 0 ミリ秒
max.warmup.replicas 一度に割り当てることができるウォームアップレプリカ(構成されている num.standbys を超える追加のスタンバイ)の最大数。 2
metric.reporters Metrics Reporter として使用するクラスのリスト。 空のリスト
metrics.num.samples メトリクスの計算用に維持されるサンプルの数。 2
metrics.recording.level メトリクスの最高記録レベル。 INFO
metrics.sample.window.ms メトリクスサンプルが計算される時間枠。 30000 ミリ秒
num.standby.replicas タスクごとのスタンバイレプリカの数。 0
num.stream.threads ストリーム処理を実行するスレッドの数。 1
poll.ms 入力を待機してブロックする時間(ミリ秒)。 100 ミリ秒
probing.rebalance.interval.ms 十分にキャッチアップされたウォームアップレプリカを探すために、バランス調整をトリガーするまでに待機する最大時間。 600000 ミリ秒(10 分)
processing.guarantee 処理モード。at_least_once (デフォルト)、または exactly_once_v2 (EOS バージョン 2 用。Confluent Platform バージョン 5.5x または Kafka バージョン 2.5.x 以降が必要)のいずれかを指定できます。exactly_once (EOS バージョン 1 用)および exactly_once_beta (EOS バージョン 2 用)は非推奨の構成オプションです。 処理の保証」を参照
replication.factor アプリケーションによって作成された changelog トピックと再パーティショントピックのレプリケーション係数。ブローカークラスターがバージョン Confluent Platform 5.4.x (Kafka 2.4.x) 以降の場合、-1 を設定することで、ブローカーのデフォルトのレプリケーション係数を使用することができます。 1
retries ブローカーリクエストが再試行可能なエラーを返した場合の再試行回数。 0
retry.backoff.ms リクエストが再試行されるまでの時間(ミリ秒)。retries パラメーターが 0 より大きい値に構成されている場合に適用されます。 100
rocksdb.config.setter RocksDB 構成。  
state.cleanup.delay.ms パーティションの移行時にステートを削除するまでの待ち時間(ミリ秒)。 600000 ミリ秒
state.dir ステートストアのディレクトリの場所。 /var/lib/kafka-streams
topology.optimization トポロジーの最適化を有効または無効にします。文字列 none または all を指定できます。 none
upgrade.from ローリングアップグレードでのアップグレード元のバージョン。 アップグレード元」を参照
windowstore.changelog.additional.retention.ms データが不完全な状態でログから削除されるのを防ぐために、ウィンドウの maintainMs に追加されます。クロックドリフトを許容します。 86400000 ミリ秒(1 日)
window.size.ms ウィンドウの終了時間を計算するために、逆シリアライザーのウィンドウサイズを設定します。 null

acceptable.recovery.lag

インスタンスがキャッチアップされ、アクティブタスクを受け取ることができると見なされる、許容可能な最大ラグ(changelog からのキャッチアップが必要なオフセットの合計数)。Streams は、ステートストアが許容可能なリカバリーラグの範囲内にあるインスタンスが存在する場合にのみ、それらにステートフルなアクティブタスクを割り当てます。まだキャッチアップされていないインスタンスについては、ウォームアップレプリカを割り当ててバックグラウンドでステートを復元します。任意のワークロードについて、この設定に相当するリカバリー時間は 1 分未満でなければなりません。0 以上に設定する必要があります。

default.deserialization.exception.handler

デフォルトの逆シリアル化例外ハンドラーを使用すると、逆シリアル化に失敗したレコードの例外を管理できます。例外は、破損したデータ、不適切な逆シリアル化ロジック、または処理できないレコードの種類が原因で発生する可能性があります。例外ハンドラーの実装では、レコードとスローされた例外に応じて FAIL または CONTINUE を返す必要があります。FAIL を返すと Streams にシャットダウンを指示し、CONTINUE を返すと、Streams に問題を無視して処理を続行するように指示することになります。デフォルトの実装クラスは LogAndFailExceptionHandler です。以下の例外ハンドラーを使用できます。

  • LogAndContinueExceptionHandler: このハンドラーは、逆シリアル化例外を記録し、レコードの処理を続行するように処理パイプラインに指示します。記録してスキップするこの戦略では、逆シリアル化できないレコードがある場合でも、Kafka Streams を失敗させずに先へ進めることができます。
  • LogAndFailExceptionHandler。このハンドラーは、逆シリアル化例外を記録し、それ以上のレコードの処理を停止するように処理パイプラインに指示します。

ライブラリによって提供されるハンドラーの他に、ニーズに合わせてカスタマイズした独自の例外ハンドラーを用意することもできます。カスタマイズした例外ハンドラーの実装例については、FAQ の「エラーと例外処理」を参照してください。

default.production.exception.handler

デフォルトの生成例外ハンドラーでは、生成しようとしたレコードが大きすぎる場合など、ブローカーとのやり取りの試行時にトリガーされた例外を管理できます。デフォルトでは、Kafka が提供する DefaultProductionExceptionHandler が使用されます。このハンドラーは、これらの例外の発生時に常にエラーを生成します。

各例外ハンドラーでは、レコードとスローされた例外に応じて FAIL または CONTINUE を返すことができます。FAIL を返すと Streams にシャットダウンを指示し、CONTINUE を返すと、Streams に問題を無視して処理を続行するように指示することになります。たとえば、大きすぎるレコードを常に無視する例外ハンドラーを用意する場合、以下のようなコードを実装できます。

import java.util.Properties;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.common.errors.RecordTooLargeException;
import org.apache.kafka.streams.errors.ProductionExceptionHandler;
import org.apache.kafka.streams.errors.ProductionExceptionHandler.ProductionExceptionHandlerResponse;

class IgnoreRecordTooLargeHandler implements ProductionExceptionHandler {
    public void configure(Map<String, Object> config) {}

    public ProductionExceptionHandlerResponse handle(final ProducerRecord<byte[], byte[]> record,
                                                     final Exception exception) {
        if (exception instanceof RecordTooLargeException) {
            return ProductionExceptionHandlerResponse.CONTINUE;
        } else {
            return ProductionExceptionHandlerResponse.FAIL;
        }
    }
}

Properties settings = new Properties();

// other various kafka streams settings, e.g. bootstrap servers, application ID, etc

settings.put(StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG,
             IgnoreRecordTooLargeHandler.class);

default.key.serde

レコードキーに対するデフォルトのシリアライザーおよび逆シリアライザークラス。ユーザーが設定するまでは null です。Kafka Streams では、データを具現化する必要が生じるたびにシリアル化と逆シリアル化が行われます。たとえば、以下のような場合が該当します。

  • "Kafka のトピック" との間でデータの読み書きが行われるとき(StreamsBuilder#stream() メソッドや KStream#to() メソッドの使用時など)。
  • "ステートストア" との間でデータの読み書きが行われるとき。

詳細については、「データ型とシリアル化」を参照してください。

default.value.serde

レコード値に対するデフォルトのシリアライザーおよび逆シリアライザークラス。ユーザーが設定するまでは null です。Kafka Streams では、データを具現化する必要が生じるたびにシリアル化と逆シリアル化が行われます。たとえば、以下のような場合が該当します。

  • "Kafka のトピック" との間でデータの読み書きが行われるとき (KStreamBuilder#stream() メソッドや KStream#to() メソッドの使用時など)。
  • "ステートストア" との間でデータの読み書きが行われるとき。

詳細については、「データ型とシリアル化」を参照してください。

default.timestamp.extractor

タイムスタンプエクストラクターは、ConsumerRecord のインスタンスからタイムスタンプを取得します。タイムスタンプは、ストリームの進行状況を制御するために使用されます。

デフォルトのエクストラクターは FailOnInvalidTimestamp です。このエクストラクターは、Kafka バージョン 0.10 以降で Kafka プロデューサークライアントによって自動的に Kafka メッセージに埋め込まれる、組み込みのタイムスタンプを取得します。このエクストラクターは、Kafka のサーバー側の log.message.timestamp.type ブローカーパラメーターおよび message.timestamp.type トピックパラメーターの設定に応じて、次のいずれかを提供します。

  • イベント時刻 の処理セマンティクス。これは、log.message.timestamp.typeCreateTime つまり "プロデューサーの時刻" に設定されている場合(デフォルト)に提供されます。これは、Kafka プロデューサーが元のメッセージを送信した時刻を表します。Kafka の公式のプロデューサークライアント、または Confluent のいずれかのプロデューサークライアントを使用している場合、タイムスタンプはエポックからのミリ秒数を表します。
  • インジェスト時刻 の処理セマンティクス。これは、log.message.timestamp.typeLogAppendTime つまり "ブローカーの時刻" に設定されている場合に提供されます。これは、Kafka ブローカーが元のメッセージを受信した時刻を、エポックからのミリ秒数で表します。

レコードに含まれている組み込みのタイムスタンプが無効(負)の場合、Kafka Streams ではそのレコードが処理されず、警告なしで破棄されるため、FailOnInvalidTimestamp エクストラクターは例外をスローします。無効な組み込みのタイムスタンプが発生する原因には、さまざまなものが考えられます。たとえば、0.10 より前の Kafka プロデューサークライアント、または新しい Kafka 0.10 のメッセージフォーマットをまだサポートしていないサードパーティ製プロデューサークライアントによって書き込まれたトピックを消費した場合に発生します。別の状況として、Kafka クラスターを 0.9 から 0.10 にアップグレードした後にも発生する可能性があります。この場合、0.9 で生成されたデータには、いずれも 0.10 のメッセージタイムスタンプが含まれていないためです。

タイムスタンプが無効でもデータを処理する場合は、代わりに使用できるエクストラクターが 2 つあります。どちらも組み込みのタイムスタンプを取得しますが、無効なタイムスタンプの処理方法が異なります。

  • LogAndSkipOnInvalidTimestamp: このエクストラクターは、警告メッセージを記録し、無効なタイムスタンプを Kafka Streams に返します。その結果、レコードは処理されずに破棄されます。記録してスキップするこの戦略では、入力データに無効な組み込みのタイムスタンプを持つレコードがある場合でも、Kafka Streams を失敗させずに先へ進めることができます。
  • UsePartitionTimeOnInvalidTimestamp。このエクストラクターは、レコードの組み込みのタイムスタンプが有効であれば(負でなければ)、そのタイムスタンプを返します。レコードに有効な組み込みのタイムスタンプが含まれていない場合、エクストラクターは、同じトピックパーティションのレコードから抽出された直前の有効なタイムスタンプを、現在のレコードの推定タイムスタンプとして返します。タイムスタンプを推定できない場合は、例外がスローされます。

もう 1 つの組み込みのエクストラクターとして WallclockTimestampExtractor があります。このエクストラクターは、消費されたレコードから実際にタイムスタンプを "抽出" するのではなく、システムクロックから現在の時刻をミリ秒で返します(System.currentTimeMillis())。これは実質的に、Streams がイベントの 処理時刻 に基づいて動作することを意味します。

独自のタイムスタンプエクストラクターを提供することもできます。たとえば、メッセージのペイロードに埋め込まれたタイムスタンプを取得するエクストラクターを作成できます。有効なタイムスタンプを抽出できない場合は、例外をスローするか、負のタイムスタンプを返すか、または推定のタイムスタンプを返すことができます。負のタイムスタンプを返すと、対応するレコードは処理されずに警告なしで破棄されるため、データの損失が発生します。新しいタイムスタンプを推定する場合は、previousTimestamp で提供される値を使用できます(これは Kafka Streams によるタイムスタンプの推定と同じです)。カスタムの TimestampExtractor の実装を以下に示します。

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.streams.processor.TimestampExtractor;

// Extracts the embedded timestamp of a record (giving you "event-time" semantics).
public class MyEventTimeExtractor implements TimestampExtractor {

  @Override
  public long extract(final ConsumerRecord<Object, Object> record, final long previousTimestamp) {
    // `Foo` is your own custom class, which we assume has a method that returns
    // the embedded timestamp (milliseconds since midnight, January 1, 1970 UTC).
    long timestamp = -1;
    final Foo myPojo = (Foo) record.value();
    if (myPojo != null) {
      timestamp = myPojo.getTimestampInMillis();
    }
    if (timestamp < 0) {
      // Invalid timestamp!  Attempt to estimate a new timestamp,
      // otherwise fall back to wall-clock time (processing-time).
      if (previousTimestamp >= 0) {
        return previousTimestamp;
      } else {
        return System.currentTimeMillis();
      }
    }
    return timestamp;
  }

}

その後、以下のように Streams 構成でカスタムのタイムスタンプエクストラクターを定義します。

import java.util.Properties;
import org.apache.kafka.streams.StreamsConfig;

Properties streamsConfiguration = new Properties();
streamsConfiguration.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, MyEventTimeExtractor.class);

max.task.idle.ms

順番どおりの処理のセマンティクスを維持するために、Kafka Streams がデータをフェッチする前に待機する時間の長さを制御できます。

max.task.idle.ms の設定により、結合およびマージで順不同の結果が生じることを許容するかどうかを制御できます。この構成値は、プロデューサーからの追加レコードの送信を待っている間に、ストリームタスクが一部(すべてではない)の入力パーティションに完全に追いつきながらアイドル状態を維持する最長時間(単位: ミリ秒)です。このアイドル時間により、複数の入力ストリームがある場合に、順不同のレコード処理が行われることを回避できます。

デフォルトは 0 です。デフォルトに設定した場合、ストリームはプロデューサーから送信される追加レコードを待ちません。この場合、ストリームはブローカー上に既に存在するレコードをフェッチする前に待機します。これにより、既にブローカーに存在するレコードは、タイムスタンプの順序で Kafka Streams によって処理されます。

-1 に設定した場合、アイドル状態は無効になり、順不同の処理が生じるかどうかに関係なく、ローカルで使用可能なデータがあればすべて処理されます。

結合やマージのように、複数の入力パーティションがあるタスクの処理では、Kafka Streams で次にレコードを処理するパーティションを選択する必要があります。すべての入力パーティションにローカルのバッファに格納されたデータがある場合、Kafka Streams は次のレコードとしてタイムスタンプが最も古いパーティションを選択します。この決定により、入力パーティションがタイムスタンプの順番になり、ストリーミングの結合やマージで好ましい結果が得られます。

しかし、Kafka Streams で、ローカルのバッファにデータが格納されていないパーティションが 1 つある場合、そのパーティションの次のレコードのタイムスタンプが残りのパーティションのレコードより古いか新しいかを判定することができません。

ここで次の 2 つのケースについて考える必要があります。Kafka Streams でまだフェッチしていないデータがブローカーのパーティションに存在する場合と、Kafka Streams がパーティションに完全に追いついていて、Kafka Streams が最後のバッチのポーリングをしてからプロデューサーで新しいレコードが生成されていない場合です。

デフォルト値の 0 の場合、ローカルのバッファにはパーティションのデータが格納されておらず、ブローカーにはデータが存在することが判明すると、Kafka Streams はタスクの処理を遅らせます。これは、ローカルのバッファに空のパーティションがあり、Kafka Streams ではそのパーティションのラグがゼロではない状態です。ただし、Kafka Streams がブローカーに追いつくと、データのないパーティションがあっても、処理すべき新しいデータの到着を待たずに処理を続行します。このデフォルトの動作は、スループットを多少犠牲にしても、正しい結合のセマンティクスを維持することを意図したものです。

max.task.idle.ms にゼロより大きい任意の値を設定することで、処理が追いついていて空のパーティションがある場合に Kafka Streams が待機する時間をミリ秒単位で指定できます。ゼロより大きな値は、入力パーティションに新しいデータが生成されるのを待つ待機時間となり、それにより、プロデューサーの処理が遅い場合にもデータを順番どおりに処理できます。

max.task.idle.ms に -1 を設定すると、Kafka Streams が、タイムスタンプで次のレコードを選択する前に空のパーティションのバッファリングを待つことはありません。スループットが向上しますが、順不同の処理が発生する可能性が生じます。

max.warmup.replicas

ウォームアップレプリカの最大数。ウォームアップレプリカは、構成されている num.standbys を超える追加のスタンバイです。これらは、いずれかのインスタンスでタスクを利用可能な状態に保つために割り当てられることがあり、それまでは再割り当て先の別のインスタンスでウォームアップしています。この構成は、高可用性を目的とした追加のブローカートラフィックとクラスターステートを抑えるために使用されます。値を大きくすると、Kafka Streams で同時にウォームアップできるタスクの数が増えるため、再割り当てされているウォームアップをアクティブタスクに移行するときに、適切なステートに復元する時間が短縮されます。1 以上に設定する必要があります。

num.standby.replicas

スタンバイレプリカの数。スタンバイレプリカは、ローカルステートストアのシャドウコピーです。Kafka Streams は、指定された数のレプリカをストアごとに作成し、十分な数のインスタンスが実行されている間、各レプリカを最新の状態に維持しようと試みます。スタンバイレプリカは、タスクのフェールオーバーのレイテンシを最小化するために使用されます。障害が発生したインスタンスで以前に実行されていたタスクは、スタンバイレプリカのあるインスタンスで再起動するのが望ましく、そうすれば changelog からローカルステートストアを復元するプロセスが最小限に抑えられます。フェールオーバー時のタスクの再開コストを最小化するために Kafka Streams でスタンバイレプリカが使用される方法の詳細については、「ステート」セクションで説明されています。

推奨事項:
即時フェイルオーバーのために、スタンバイ数を 1 に増やします(高可用性)。スタンバイ数を増やすと、クライアント側でより多くのストレージ容量が必要になります。たとえば、スタンバイが 1 個の場合、2 倍の容量が必要です。

注釈

n 個のスタンバイレプリカを構成した場合、n+1 個の KafkaStreams インスタンスをプロビジョニングする必要があります。

num.stream.threads

Kafka Streams アプリケーションのインスタンスあたりのストリームスレッドの数を指定します。これらのスレッドでストリーム処理コードが実行されます。Kafka Streams のスレッドモデルの詳細については、「スレッドモデル」を参照してください。

probing.rebalance.interval.ms

バランス調整をトリガーするまでの最大待ち時間。バランス調整では、キャッチアップされていると見なせる状態に復元されたウォームアップレプリカが探索されます。Kafka Streams は、 acceptable.recovery.lag の範囲内にあり、キャッチアップされているインスタンスが存在する場合にのみ、それらにステートフルなアクティブタスクを割り当てます。バランス調整の探索は、ウォームアップレプリカの最新の合計ラグをクエリで照会し、準備ができたものをアクティブタスクに移行するために使用されます。これは、ウォームアップタスクがある限り、割り当てのバランスが取れるまでトリガーされ続けます。1 分以上に設定する必要があります。

processing.guarantee

使用する処理の保証。指定可能な値は、at_least_once (デフォルト)および exactly_once_v2 です。exactly_once_v2 を使用するには、Confluent Platform バージョン 5.5.x または Kafka バージョン 2.5.x 以降が必要です。"厳密に 1 回" の処理を有効にすると、commit.interval.ms パラメーターのデフォルト値は 100ms に変更されます。また、デフォルトでは、コンシューマーの構成は isolation.level="read_committed"、プロデューサーの構成は enable.idempotence=true です。exactly_once_v2 の処理では、デフォルトで 3 つ以上のブローカーからなるクラスターが必要とされることに注意してください。これは本稼働環境で推奨される設定です。開発環境では、ブローカーの設定を調整し、使用するブローカーの数を transaction.state.log.replication.factortransaction.state.log.min.isr に指定することで、これを変更できます。詳細については、「処理の保証」を参照してください。

replication.factor

ローカルステートが使用されるとき、またはストリームが集約のために再パーティション化されるときに Kafka Streams が作成する内部トピックの、レプリケーション係数を指定します。レプリケーションはフォールトトレランスを維持するために重要です。レプリケーションを行わない場合、ブローカーで障害が 1 つのブローカーで障害が発生しただけで、ストリーム処理アプリケーションを続行できなくなる可能性があります。ソーストピックと同様のレプリケーション係数を使用することをお勧めします。

推奨事項:
内部 Kafka Streams トピックで 最大 2 つのブローカーの障害を許容できるように、レプリケーション係数を 3 に増やすことをお勧めします。ただし、必要なストレージ領域も増えることに注意してください(レプリケーション係数が 3 の場合は 3 倍)。

rocksdb.config.setter

RocksDB 構成。Kafka Streams では、永続ストアのデフォルトのストレージエンジンとして RocksDB が使用されます。RocksDB のデフォルト構成を変更するには、RocksDBConfigSetter を実装し、そのカスタムクラスを rocksdb.config.setter に指定します。

以下に、RocksDB で消費されるメモリーサイズを調整する例を示します。

public static class CustomRocksDBConfig implements RocksDBConfigSetter {

  // This object should be a member variable so it can be closed in RocksDBConfigSetter#close.
  private org.rocksdb.Cache cache = new org.rocksdb.LRUCache(16 * 1024L * 1024L);

  @Override
  public void setConfig(final String storeName, final Options options, final Map<String, Object> configs) {
    // See #1 below.
    BlockBasedTableConfig tableConfig = (BlockBasedTableConfig) options.tableFormatConfig();
    tableConfig.setBlockCache(cache);
    // See #2 below.
    tableConfig.setBlockSize(16 * 1024L);
    // See #3 below.
    tableConfig.setCacheIndexAndFilterBlocks(true);
    options.setTableFormatConfig(tableConfig);
    // See #4 below.
    options.setMaxWriteBufferNumber(2);
  }

  @Override
  public void close(final String storeName, final Options options) {
    // See #5 below.
    cache.close();
  }
}

Properties streamsSettings = new Properties();
streamsConfig.put(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG, CustomRocksDBConfig.class);
例に関する注記:
  1. BlockBasedTableConfig tableConfig = (BlockBasedTableConfig) options.tableFormatConfig(); 新しく作成するのではなく、既存の TableFormatConfig への参照を取得します。これにより、重要な最適化設定である BloomFilter などのデフォルト値を誤って上書きする心配がなくなります。
  2. tableConfig.setBlockSize(16 * 1024L); RocksDB GitHub(『Indexes and filter blocks』) に記載されている手順に従って、 デフォルト値 を変更します。
  3. tableConfig.setCacheIndexAndFilterBlocks(true); インデックスブロックとフィルターブロックが制限なく増大するのを防ぎます。詳細については、RocksDB GitHub(インデックスブロックとフィルターブロックのキャッシュ) を参照してください。
  4. options.setMaxWriteBufferNumber(2); RocksDB GitHub で高度なオプションを参照してください。
  5. cache.close(); メモリーリークを防ぐために、org.rocksdb.RocksObject を継承するすべての構築済みオブジェクトを終了する必要があります。詳細については、RocksJava のドキュメント を参照してください。

state.dir

ステートディレクトリ。Kafka Streams は、ステートディレクトリにローカルステートを永続化します。アプリケーションごとに、そのアプリケーションをホストしているマシンのステートディレクトリの下にサブディレクトリが作成されます。サブディレクトリの名前はアプリケーション ID になります。このサブディレクトリの下に、アプリケーションに関連付けられたステートストアが作成されます。

topology.optimization

Kafka Streams でトポロジーの最適化を適用する必要があることを示します。現在、最適化の設定はすべてを有効にするか無効にするかの切り替えしかなく、デフォルトでは無効になっています。これらの最適化には、再パーティショントピックを移動および削減する機能と、ソーストピックをソース KTable の changelog として再利用する機能があります。このオプションを有効にすることをお勧めします。

2.3 の時点では、最適化を有効にするには 2 つの作業が必要です。この構成を StreamsConfig.OPTIMIZE に設定することに加えて、トポロジーの構築時に、オーバーロードされた StreamsBuilder.build(Properties) メソッドを使用して構成プロパティを渡す必要があります。

KafkaStreams myStream = new KafkaStreams(streamsBuilder.build(properties), properties)

upgrade.from

アップグレード元のバージョン。アップグレードガイドに記載されているように、特定のバージョンへのローリングアップグレードを実行する場合、この構成を設定することが重要です。この構成を適切なバージョンに設定してから、インスタンスを再起動して新しいバージョンにアップグレードする必要があります。すべてが新しいバージョンになったら、この構成を削除し、2 回目のローリング再起動を実行します。この構成を設定して 2 回の再起動を伴うアップグレードパスに従う必要があるのは、2.0 より前のバージョンからアップグレードする場合か、2.4 より前のバージョンから 2.4 以降にアップグレードする場合だけです。

Kafka コンシューマー、プロデューサー、および管理クライアントの構成パラメーター

内部で使用される Kafka コンシューマープロデューサー、および 管理クライアント のパラメーターを指定できます。コンシューマー、プロデューサー、および管理クライアントの設定は、StreamsConfig インスタンスのパラメーターを指定することで定義されます。

以下の例では、Streams の設定で、Kafka コンシューマーのセッションタイムアウト を 60000 ミリ秒に構成します。

Properties streamsSettings = new Properties();
// Example of a "normal" setting for Kafka Streams
streamsSettings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker-01:9092");
// Customize the Kafka consumer settings of your Streams application
streamsSettings.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 60000);

名前の指定

コンシューマー、プロデューサー、および管理クライアントのいくつかの構成パラメーターでは、同じパラメーター名が使用されます。たとえば、TCP バッファの構成には send.buffer.bytesreceive.buffer.bytes が使用されます。 request.timeout.msretry.backoff.ms は、クライアントリクエストの再試行を制御します。名前の重複を避けるには、パラメーター名に consumer.producer.、または admin というプレフィックスを付けます( consumer.send.buffer.bytesproducer.send.buffer.bytes など)。

Properties streamsSettings = new Properties();
// same value for consumer and producer
streamsSettings.put("PARAMETER_NAME", "value");
// different values for consumer, producer, and admin client
streamsSettings.put("consumer.PARAMETER_NAME", "consumer-value");
streamsSettings.put("producer.PARAMETER_NAME", "producer-value");
streamsSettings.put("admin.PARAMETER_NAME", "admin-value");
// alternatively, you can use
streamsSettings.put(StreamsConfig.consumerPrefix("PARAMETER_NAME"), "consumer-value");
streamsSettings.put(StreamsConfig.producerPrefix("PARAMETER_NAME"), "producer-value");
streamsSettings.put(StreamsConfig.adminClientPrefix("PARAMETER_NAME"), "admin-value");

メインコンシューマー、復元コンシューマー、およびグローバルコンシューマーのパラメーター名には、先頭に以下のプレフィックスが付きます。

  • main.consumer. -- メインコンシューマーの場合。メインコンシューマーは、ストリームソースのデフォルトのコンシューマーです。
  • restore.consumer. -- 復元コンシューマーの場合。復元コンシューマーは、ステートストアの回復を管理します。
  • global.consumer. -- グローバルコンシューマーの場合。グローバルコンシューマーは、グローバル KTable の構築に使用されます。

これらのプレフィックスを持つパラメーターの値を設定すると、consumer パラメーターの設定値がオーバーライドされます。たとえば、以下の構成では consumer.max.poll.records の値がオーバーライドされます。

consumer.max.poll.records = 5
main.consumer.max.poll.records = 100
restore.consumer.max.poll.records = 50

初期化中に、各コンシューマーではこれらの設定が以下のように反映されます。

コンシューマーの種類 max.poll.records 値 | 理由
コンシューマー 5 すべての種類のコンシューマーでデフォルト値は 5 になります。
メインコンシューマー 100 main.consumer. プレフィックスで対象として指定されています。
復元コンシューマー 50 restore.consumer. プレフィックスでデフォルト値がオーバーライドされます。
グローバルコンシューマー 5 global.consumer プレフィックスは指定されていないため、デフォルト値が使用されます。

たとえば、復元コンシューマーだけを構成し、他のコンシューマーの設定は変更しない場合は、restore.consumer. を使用して構成を設定できます。

Properties streamsSettings = new Properties();
// same config value for all consumer types
streamsSettings.put("consumer.PARAMETER_NAME", "general-consumer-value");
// set a different restore consumer config. This would make restore consumer take restore-consumer-value,
// while main consumer and global consumer stay with general-consumer-value
streamsSettings.put("restore.consumer.PARAMETER_NAME", "restore-consumer-value");
// alternatively, you can use
streamsSettings.put(StreamsConfig.restoreConsumerPrefix("PARAMETER_NAME"), "restore-consumer-value");

内部トピックパラメーター

内部の再パーティショントピックや changelog トピックを構成するには、topic. プレフィックスの後に標準のトピック構成プロパティを続けます。

Properties streamsSettings = new Properties();
// Override default for both changelog and repartition topics
streamsSettings.put("topic.PARAMETER_NAME", "topic-value");
// alternatively, you can use
streamsSettings.put(StreamsConfig.topicPrefix("PARAMETER_NAME"), "topic-value");

デフォルト値

Kafka Streams では、基盤となるクライアント構成の一部に対してさまざまなデフォルト値が使用されます。以下は、これらを簡単にまとめたものです。これらの構成の詳細については、「プロデューサーの構成」と「コンシューマーの構成」を参照してください。

パラメーター名 対応するクライアント Streams のデフォルト
auto.offset.reset コンシューマー earliest
linger.ms プロデューサー 100
max.poll.records コンシューマー 1000

Kafka Streams によって制御されるパラメーター

以下の構成パラメーターは Kafka Streams によって割り当てられます。Kafka Streams アプリケーションでは、allow.auto.create.topics を変更しようとしてもユーザーの値は無視され、設定の効果はありません。他のパラメーターは設定できます。Kafka Streams が設定するデフォルト値は、プレーンな KafkaConsumer とは異なります。

Kafka Streams は、client.id パラメーターを使用して内部クライアント用のクライアント ID を生成します。client.id が指定されていない場合は、Kafka Streams によって <application.id>-<random-UUID> に設定されます。

注釈

Kafka Streams による構成パラメーターへの値の割り当てに関しては、いくつかの特別な考慮事項があります。

  • グローバルコンシューマーは Kafka Streams インスタンスごとに 1 つだけ存在します。
  • 復元コンシューマーはスレッドごとに 1 つ存在します。
  • プロデューサーの client.id: 値は構成されている処理の保証によって異なります。
    • EOS が無効または EOS バージョン 2 が有効: プロデューサーはスレッドごとに 1 つだけ存在します。
    • EOS バージョン 1 が有効: プロデューサーはタスクごとに 1 つだけ存在します。
  • partition.assignment.strategy: 割り当て方法のパラメーターは、メインコンシューマーにのみ影響します。グローバルコンシューマーと復元コンシューマーでは、"トピックサブスクリプション" の代わりに "パーティション割り当て" が使用されます。また、コンシューマーグループは形成されないため、StreamsPartitionAssignor が使用されることはありません。
パラメーター名 対応するクライアント Kafka Streams によって割り当てられる値
allow.auto.create.topics コンシューマー false
auto.offset.reset グローバルコンシューマー なし
auto.offset.reset 復元コンシューマー なし
client.id 管理 <client.id>-admin
client.id コンシューマー <client.id>-StreamThread-<threadIndex>-consumer
client.id グローバルコンシューマー <client.id>-global-consumer
client.id 復元コンシューマー <client.id>-StreamThread-<threadIndex>-restore-consumer
client.id プロデューサー
  • EOS v1 の場合 : <client.id>-StreamThread-<threadIndex>-<taskId>-producer
  • EOS 以外および EOS v2 の場合 : <client.id>-StreamThread-<threadIndex>-producer
enable.auto.commit コンシューマー false
group.id コンシューマー application.id と同じです。
group.id グローバルコンシューマー null
group.id 復元コンシューマー null
group.instance.id コンシューマー ユーザーが指定した設定に -<threadIndex> というサフィックスが追加されます。
partition.assignment.strategy コンシューマー 常に StreamsPartitionAssignor に設定されます。

enable.auto.commit

コンシューマーの自動コミット。"少なくとも 1 回" の処理セマンティクスを保証し、自動コミットを無効にするために、Kafka Streams はこのコンシューマー構成値を false にオーバーライドします。コンシューマーのコミットは、Kafka Streams ライブラリまたはユーザーが現在の処理ステートをコミットすることを決定したときに、明示的に commitSync の呼び出しを通じてのみ行われます。

注釈

このウェブサイトには、Apache License v2 の条件に基づいて Apache Software Foundation で開発されたコンテンツが含まれています。