重要

このページの日本語コンテンツは古くなっている可能性があります。最新の英語版コンテンツをご覧になるには、こちらをクリックしてください。

永続性のための最適化

永続性とは、メッセージが失われる可能性を減らすことです。Confluent Cloud では複製係数 3 を使用してデータの永続性を確保しています。

このページの構成パラメーターには、広範囲の値が指定できるものがあります。設定できる値は、必要な要件や、メッセージの平均サイズ、パーティション数といったその他の要因、さらには環境内で生じる他のさまざまな差によって影響を受けます。したがって、ベンチマークテスト を行うとアプリケーションおよび環境の構成の検証に役立ちます。

プロデューサーの acks

プロデューサーでは、acks 構成パラメーターを使用して、Kafka に書き込まれるメッセージの永続性を制御できます。acks パラメーターは、スループットとレイテンシの最適化にも使用できますが、基本的には永続性の設定に使用します。高い永続性を得るために最適化するには、このパラメーターを acks=allacks=-1 と同等)に設定してください。これは、同期レプリカ(ISR)の全セットがメッセージの受領応答を返してメッセージをコミット済みと見なすまで、リーダーが待機することを意味します。この設定が、最も高い保証を提供する設定です。同期レプリカが少なくとも 1 つ残存している限りレコードが失われないことが保証されます。リーダーブローカーが各レプリカからの受領応答を待ってからプロデューサーに応答するので、トレードオフとしてレイテンシが高くなります。

重複と順番

プロデューサーは、送信に失敗した場合に、データが失われないようにするためにメッセージを再送信するという方法で永続性を高めることもできます。プロデューサーは、構成パラメーター retries (デフォルトは MAX_INT)で指定された回数、および構成パラメーター delivery.timeout.ms (デフォルトは 120000)で指定された時間に達するまで、メッセージを自動的に再送信します。後者のパラメーターは KIP-91 で導入されました。delivery.timeout.ms は、メッセージを送信してから受領応答をブローカーから受け取るまでの合計時間として希望する上限時間に調整できます。これは、メッセージの有効期間についてのビジネス要件を反映した時間に設定します。

プロデューサーの自動再試行については、考慮しなければならないことが 2 つあります。

  1. 重複: Confluent Cloud の一時的な障害が原因でプロデューサーが再試行した場合は、重複するメッセージが Confluent Cloud に送信される可能性があります。
  2. 順序: 同時に複数の送信試行が「未完了」になると、先に失敗したメッセージ送信の再試行が、それより新しいメッセージの送信に成功した後に実行されることがあります。

この両方に対処するには、プロデューサーにべき等性を構成します(enable.idempotence=true)。こうすると、TCP と同じように、Confluent Cloud のブローカーが、増分していくシーケンス番号を使用してメッセージを追跡するようになります。べき等性を持つプロデューサーは、要求がパイプライン処理されていても、重複メッセージに対処し、かつメッセージの順番を保持することができます。ブローカーは重複するシーケンス番号を無視するので、メッセージの重複は発生しません。また、障害が発生した場合には、順序付けが回復するまでプロデューサーが未完了状態のメッセージを 1 つに制限するので、メッセージの順番も保持されます。べき等性の保証が満たされない場合、プロデューサーは致命的エラーを生成し、それ以上の送信を拒否します。つまり、プロデューサーにべき等性を構成する場合は、アプリケーション開発者が、致命的エラーをキャッチして適切にエラー処理を行う必要があります。

プロデューサーにべき等性を構成しないが、ビジネス要件でべき等性が求められるという場合は、発生しうるメッセージの重複と順序の問題に別の方法で対処する必要があります。Confluent Cloud で一時的な障害が発生した場合に起きる可能性があるメッセージの重複に対処するには、重複メッセージを処理するためのコンシューマーアプリケーションロジックを構築してください。失敗したメッセージの再送信を可能にしつつメッセージの順番を保つには、構成パラメーター max.in.flight.requests.per.connection=1 を設定して、一度に 1 つだけブローカーに要求を送信できるようにします。要求のパイプライン処理を可能にしつつメッセージの順番を保つには、多少のメッセージ損失を許容できるアプリケーションであれば、構成パラメーターを retries=0 に設定します。

失敗したメッセージの送信をプロデューサー自体で再試行する代わりに、プロデューサークライアントに返される例外に対してアクションをコーディングすることもできます(たとえば、Java クライアントの Callback インターフェイスの onCompletion() メソッドなど)。再試行処理を手動で行う場合は、retries=0 に設定して自動再試行を無効にしてください。プロデューサーのべき等性は、メッセージのシーケンス番号を追跡するものであるため、自動再試行を有効にする場合にしか意味がありません。有効にせずに、retries=0 を設定し、失敗したメッセージをアプリケーションで手動で再送信する場合は、新しいシーケンス番号が生成されるだけであるため、重複の検出は機能しません。自動再試行を無効にすると、個々の送信の失敗が原因でメッセージにギャップが発生する可能性がありますが、ブローカーは、受信した書き込みの順番を保持します。

最小同期レプリカ数

Confluent Cloud は、複数のブローカー間でデータを複製して永続性を実現します。パーティションごとに、データのコピーが置かれるレプリカ(またはブローカー)の割り当てのリストがあります。リーダーと "同期" している、このリストのレプリカを同期レプリカ(ISR)と呼びます。パーティションごとに、リーダーブローカーが、ISR リストに含まれている他のブローカーにメッセージを自動的に複製します。プロデューサーで acks=all または acks=-1 を設定した場合は、構成パラメーター min.insync.replicas で ISR リストのレプリカ数の最小しきい値を指定します。この最小数が満たされない場合、プロデューサーは例外を生成します。min.insync.replicasacks を一緒に使用すると、より高い永続性を実現できます。一般的なシナリオとしては replication.factor=3 でトピックを作成し、トピック構成で min.insync.replicas=2 をオーバーライドし、プロデューサーに acks=all を設定して、レプリカの過半数が書き込みを受信しなかった場合に、プロデューサーで例外を生成するようにします。

コンシューマーのオフセットと自動コミット

予期しないエラーがコンシューマー側で発生した場合にメッセージがどうなるかについても考慮して、処理段階でメッセージが失われないようにする必要があります。コンシューマーのオフセットによって、どのメッセージが既に取り込まれたのかが追跡されるので、いつ、どのようにコンシューマーでメッセージのオフセットをコミットするかが永続性にとって重要になります。コンシューマーがメッセージのオフセットをコミットしてそのメッセージの処理を開始した後に、予期せず失敗するような状況を回避してください。同じパーティションから読み取りを開始した次のコンシューマーは、既にオフセットがコミットされているメッセージを再処理しないからです。

デフォルトでは、オフセットは、一定間隔のコンシューマーによる poll() 呼び出し時にコミットされるように構成されます。通常、ほとんどのユースケースはこの構成で十分です。しかし、コンシューマーがトランザクションチェーンの一部であり、しっかりとしたメッセージ送達保証が必要な場合は、コンシューマーがメッセージを完全に処理した後にのみ、オフセットをコミットすることが必要になります。構成パラメーター enable.auto.commit で、これらのコンシューマーのコミットを自動的に実行するか手動で実行するかを構成できます。高い永続性を実現するために、enable.auto.commit=false を設定して自動コミットを無効にし、コンシューマーのコードでいずれかのコミットメソッド(たとえば、commitSync()commitAsync() など)を明示的に呼び出すことができます。

正確に 1 回(Exactly Once)のセマンティクス(EOS)

より強い保証を実現するために、EOS トランザクションを行うようにアプリケーションを構成することができます。これにより、多くの Kafka トピックおよびパーティションへの自動書き込みが有効になります。ログの一部のメッセージは、さまざまなトランザクション状態をとるので、コンシューマーは、構成パラメーター isolation.level を設定して、受信すべきメッセージのタイプを定義できます。isolation.level=read_committed を設定すると、コンシューマーは、非トランザクションメッセージまたはコミット済みのトランザクションメッセージのみを受け取り、未完了のトランザクションや異常終了したトランザクションからのメッセージは受け取らなくなります。consume-process-produce パターンでトランザクションのセマンティクスを使用し、各メッセージが正確に 1 回処理されるようにするには、クライアントアプリケーションで enable.auto.commit=false を設定し、KafkaProducer インターフェイスの sendOffsetsToTransaction() メソッドを使用して、手動でオフセットをコミットする必要があります。イベントストリーミングアプリケーションも、構成パラメーター processing.guarantee を設定して、正確に 1 回 を有効にすることができます。

永続性を最適化するための構成のまとめ

プロデューサー

  • replication.factor=3
  • acks=all (デフォルト: all - Kafka 3.0 より前のデフォルト: 1
  • enable.idempotence=true (デフォルト: true - Kafka 3.0 より前のデフォルト: false)- 重複メッセージおよび順番どおりでないメッセージを防止する
  • max.in.flight.requests.per.connection=1 (デフォルトは 5)- べき等なプロデューサーを使用しない場合に、順番どおりでないメッセージを防止する

コンシューマー

  • enable.auto.commit=false(デフォルトは ``true
  • isolation.level=read_committed (EOS トランザクションを使用する場合)

ストリーム

  • StreamsConfig.REPLICATION_FACTOR_CONFIG: 3 (デフォルトは 1
  • StreamsConfig.PROCESSING_GUARANTEE_CONFIG: StreamsConfig.EXACTLY_ONCE (デフォルトは StreamsConfig.AT_LEAST_ONCE
  • ストリームアプリケーションにはプロデューサーとコンシューマーが組み込まれているので、これらの推奨構成についても確認してください。