Replicator の調整

本セクションには、最小限の数の Connect ワーカーとマシンでレプリケーションの最大スループットを引き出すためのアドバイスが記載されています。

ここでは、Replicator が実行されている Connect クラスターが既に存在すると想定しています。また、本セクションの趣旨に沿って、Replicator 専用の Connect クラスターが実行されていること、つまり、同じクラスターで別の Connect が実行されておらず、そのクラスターのすべてのリソースを Replicator で利用可能であることも想定しています。

Replicator クラスターのサイズ変更

Replicator クラスターのサイズ変更でよく疑問点として挙げられるのは次の 2 つです。

  • クラスターに必要なノードの数とノードの規模
  • 実行する必要がある Replicator タスクの数

タスクの数を決定する方法、およびその情報を使用して必要となるノードの数を決定する方法について説明します。

最初のステップは、タスクあたりのスループットを割り出すことです。そのためには、短時間 1 つのタスクのみを実行して、スループットを確認します。1 つの方法として、送信元クラスターで 1 つのトピックを大量のデータで満たし、Replicator をシングルタスク(構成パラメーター tasks.max=1)で起動し、イベントが送信先クラスターに書き込まれるレートを測定します。そうすると、シングルタスクで達成できるスループットがわかります。

期待するスループット(クラスター間でレプリケートするのに必要となる MBps 数)をタスクあたりのスループットで除算した数値が、必要となるタスク数です。

たとえば、Replicator をシングルタスクで実行して、30 MBps のレートでレプリケートできたとします。すべてのトピック間でレプリケーションに 100 MBps 必要であることがわかっています。したがって、Replicator タスクが 4 つ以上必要であることになります。

この方法には次の 2 つの注意事項があります。

  • タスク数をパーティション数より多くすることはできません。期待するスループットを実現するために必要なタスク数がパーティション数を上回ることがわかった場合は、パーティションを追加する必要があります。1 つのタスクは複数のパーティションから消費できるため、パーティション数はタスク数より多くしても構いません。
  • 1 台のマシンで実行するタスク数が多すぎると、マシンのリソースが飽和状態になります。この場合のリソースとは、ネットワークまたは CPU のどちらかです。タスク数を増やしてもスループットが増えない場合は、おそらくどちらかのリソースが飽和状態になっています。簡単な解決方法は、ノードを追加して利用可能なリソースを増やすことです。

推奨ガイドライン:

  • タスクあたりのパーティション数は 150 以下
  • ワーカーあたりのタスク数は 20 以下

Replicator タスクでのスループットの向上

Replicator のスループットを増やす方法の 1 つは、より多くの Connect ワーカーノードでより多くのタスクを実行することです。もう 1 つの方法は、各 Replicator タスクから引き出すスループットを増やすことです。実際には、調整ではほとんどの場合にその両方を組み合わせることになります。まずシングルタスクを調整し、次に複数のタスクを並列実行することで、期待するスループットを実現します。

Connect タスクのパフォーマンスを調整する場合は、各タスクの CPU 使用率を少なくするか、ネットワークをより効率的に使用することを試します。まずどちらがボトルネックになっているかを確認してから、該当するリソースを調整することをお勧めします。どちらがボトルネックであるかを確認する方法の 1 つは、シングルノードで実行している Replicator に、タスクを追加してもスループットが増えなくなるまで、タスクを追加することです。その時点で CPU 使用率が高ければ、Connect タスクの CPU 使用率を改善します。CPU 使用率は低いのにタスクを追加してもスループットが増えない場合は、ネットワークの使用率を改善します。

Connect タスクでの CPU 使用率の改善

  • Java のガベージコレクションログを有効化してチェックし、ガベージコレクションの過度の一時停止が発生していないことを確認します。G1GC を使用し、適度なヒープサイズ(通常は 4 GB で始めるのが適切)を設定します。
  • Replicator で key.convertervalue.converterio.confluent.connect.replicator.util.ByteArrayConverter に設定されていることを確認します。src.key.convertersrc.value.converter が設定されている場合は、それらも io.confluent.connect.replicator.util.ByteArrayConverter (デフォルト値)に設定されている必要があります。これで、イベントを Connect の内部データ形式に変換しバイト値に戻すという、コストの高い変換が行われなくなります。
  • CRC チェックを無効にします。これはデータ破損を引き起こすことがあるため推奨されていませんが、データ整合性のための CRC チェックでは CPU が使用されます。Replicator の構成で src.consumer.check.crcs=false を設定することによって、パフォーマンスの改善のために CRC チェックを無効にすることができます。

Connect タスクでのネットワーク使用率の改善

Replicator は 2 つのデータセンター間でイベントを読み取るまたは書き込むことが多いため、通常はレイテンシが非常に大きくなっています。ネットワークのレイテンシが大きいと、スループットの低下につながります。下記の推奨事項はすべて、そのような環境でスループットが向上するように TCP スタックを構成することを目的としています。それは、Apache Kafka® クラスターと同じデータセンター内で実行されているアプリケーションを構成することとは異なります。データセンター内では通常、レイテンシとスループットのバランスを取って、どちらを調整するかを決めることができます。2 つのデータセンター間でイベントをレプリケートする場合、通常は高いレイテンシに対処する必要があります。

ネットワークリンクのプロパティに基づいて TCP バッファサイズを算出するために、https://www.switch.ch/network/tools/tcp_throughput/?do+new+calculation=do+new+calculation のようなツールを使用します。TCP バッファサイズを大きくすると、ネットワークが高帯域幅かつ高レイテンシで処理されます(ほとんどのデータセンター間リンクも含まれます)。これを次の 2 つの場所で行う必要があります。

  • リクエストの送信および受信で使用されるバッファサイズを設定することによって、アプリケーションレベルでリクエストされるサイズを大きくします。プロデューサーとコンシューマーでは send.buffer.bytesreceive.buffer.bytes を使用します。ブローカーでは socket.send.buffer.bytessocket.receive.buffer.bytes を使用します。これらは、アプリケーションによってリクエストされるレベルの設定であるため、2 番目のステップに従っていない場合は、オペレーティングシステムによって警告なしで上書きされることがあります。
  • オペレーティングシステムレベルの TCP バッファサイズ(net.core.rmem_defaultnet.core.rmem_maxnet.core.wmem_defaultnet.core.wmem_maxnet.core.optmem_max)を増やします。現在のセッションに対してテストする場合は sysctl を使用することもことができますが、永続的に設定するには /etc/sysctl.conf 内で値を設定する必要があります。
  • 重要: 実際に効果があったことを詳しく確認するために、ログ記録を有効にしておきます。この設定は、オペレーティングシステムによって警告なしで上書きまたは無視される場合があります。log4j.logger.org.apache.kafka.common.network.Selector=DEBUG

さらに、次の 2 つのネットワーク最適化を試すこともできます。

  • 自動ウィンドウスケーリングを有効にします(sysctl -w net.ipv4.tcp_window_scaling=1 を実行するか、/etc/sysctl.confnet.ipv4.tcp_window_scaling=1 を追加する)。こうすると、TCP バッファを通常の最大値である 64K より大きくすることができます(レイテンシで許容される場合)。
  • TCP のスロースタート時間を小さく(/proc/sys/net/ipv4/tcp_slow_start_after_idle を「0」に設定)すると、ネットワーク接続が最大容量に達するのが早くなります。
  • より大きいバッチを読み取るおよび書き込むことによってスループットを改善するには、プロデューサーの batch.sizelinger.ms、およびコンシューマーの fetch.max.bytesfetch.min.bytesfetch.max.wait の値を大きくします。

重要

Replicator を実行するために使用する 方法 に応じて、これらの設定をどこでどのように構成するかが異なります。

  • Replicator をコネクタとして実行 している場合は、Replicator の構成ファイルのコンシューマー設定および Connect ワーカーの構成のプロデューサー設定を変更します。たとえば、このタイプのデプロイでは、コンシューマーの fetch.max.bytes を調整するには、Replicator の構成ファイルの src.consumer.fetch.max.bytes を設定します。プロデューサーの batch.size を調整するには、Connect ワーカーの構成で producer.batch.size を設定します。このシナリオでは、Replicator は送信元クラスターから消費されていて、レコードを Connect ワーカーに渡し、そのワーカーが送信先クラスターに対して生成しているので、これらの構成ファイルが使用されています。
  • Replicator を実行可能ファイルとして実行 している場合は、consumer.properties のコンシューマー設定と producer.properties のプロデューサー設定を変更します。このシナリオでは、Replicator 内に組み込まれている送信元コンシューマーの構成は consumer.properties ファイルによって制御されていますが、Connect フレームワーク内のプロデューサーの設定は producer.properties によって制御されているので、これらの構成ファイルが使用されています。

データ負荷が増えた場合にパフォーマンスを改善するための圧縮の設定

データ負荷が増えると、処理に時間がかかって Replicator のパフォーマンスが低下することがあります。送信元トピックと送信先トピックの両方でデータ圧縮を使用することによって、この問題に対処できます。送信元トピックのみで圧縮を設定しても通常は十分な効果が得られないので、送信先 Replicator のコネクタまたはワーカーの構成でも次のように明示的に圧縮を設定する必要があります。