Replicator のモニタリング¶
Replicator をモニタリングするための推奨される方法は、Confluent Control Center と Replicator モニタリング拡張機能を使用することです。モニタリング拡張機能の詳細については、「 Replicator モニタリング拡張機能」を参照してください。
また、以下のツールを使用して Replicator をモニタリングすることもできます。
- Replicator のメトリクス
- コンシューマーのメトリクス
- プロデューサーのメトリクス
- レプリケーションラグ
Replicator モニタリング拡張機能¶
Confluent Replicator モニタリング拡張機能を使用すると、公開されている REST API を使用して、Replicator タスクから詳細なメトリクスを収集できます。これらのエンドポイントは、以下の情報を提供します。
- スループット: 1 秒あたりにレプリケートされたメッセージの数
- メッセージラグ: 送信元クラスターに対して生成された一方で、送信先に対してまだレプリケートされていないメッセージの数
- レイテンシ: 送信元クラスターに対するメッセージ生成から送信先クラスターに対するメッセージ生成までの平均時間
メトリクスは、コネクター、タスク、トピックとパーティションを基準に細分化されます。
これらのメトリクスは、Control Center に統合してクラスター内のレプリケーションを一元的に可視化できるように設計されています。
インストール¶
モニタリング拡張機能は、Replicator とともにインストールされます。インストールの詳細については、「 Replicator のダウンロードおよびインストール」を参照してください。
拡張機能のアクティブ化¶
インストールが完了したら、インストールされた拡張機能を認識するように Kafka Connect を構成します。最初に、CLASSPATH
環境変数を設定して、拡張機能ライブラリを Connect ワーカーのクラスパスに追加します。
export CLASSPATH=<path to replicator install>/replicator-rest-extension-<version>.jar
ちなみに
- この REST 拡張機能 JAR ファイルの場所は、プラットフォームと Confluent インストール のタイプによって異なります。たとえば、zip.tar を使用した Mac OS インストールでの場合、デフォルトでは、
replicator-rest-extension-<version>.jar
は Confluent ディレクトリの/share/java/kafka-connect-replicator/
にあります。デフォルトの Ansible 設定でインストールする場合、JAR ファイルは/usr/share/java/kafka-connect-replicator/
に格納されます。 - CLASSPATH を固定するには、これをシェル構成ファイル(
.bash_profile
、.bashrc
、.zshrc
など)に追加し、このプロファイルをオープンコマンドシェルで送信元として指定し、echo $CLASSPATH
でチェックします。
次に、拡張機能をアクティブ化するには、Replicator を起動するために使用する構成ファイル(my-examples/replication.properties
、etc/kafka/replication.properties
、/etc/kafka/connect-distributed.properties
など)の拡張クラスのリストに以下のプロパティを追加します。
rest.extension.classes=io.confluent.connect.replicator.monitoring.ReplicatorMonitoringExtension
拡張機能がアクティブ化されたら、Control Center を使用して Replicator をモニタリングできます。Replicator チュートリアルの「 Control Center を使用した Replicator のモニタリング」および Control Center ユーザーガイドの「Replicator」を参照してください。
セキュアなエンドポイント向けの追加構成¶
Replicator をホストしている Connect 分散クラスターに SSL 認証が有効にされている(https)REST エンドポイントがある場合、Replicator モニタリング拡張機能がクラスター内の他の Connect ノードと通信するために使用する SSL キーストアとトラストストア用のセキュリティプロパティを構成する必要があります。これらは、Java 仮想マシンレベルで以下の環境変数を使用して設定します。
export KAFKA_OPTS: -Djavax.net.ssl.trustStore=/etc/kafka/secrets/kafka.connect.truststore.jks
-Djavax.net.ssl.trustStorePassword=<password>
-Djavax.net.ssl.keyStore=/etc/kafka/secrets/kafka.connect.keystore.jks
-Djavax.net.ssl.keyStorePassword=<password>
参考
この構成の動作例については、Confluent Platform デモ を参照してください。構成リファレンスについては、デモの docker-compose.yml ファイル を参照してください。
また、こちらの説明 に従って、listeners.https.
プロパティも設定する必要があります。以下に例を示します。
"listeners.https.ssl.keystore.location": "/etc/kafka/connect.jks"
"listeners.https.keystore.password": "xxxxx"
systemctl コマンド構成¶
ちなみに
systemctl
を使用しない場合、これらの手順はスキップしてください。
systemctl
コマンドを使用してモニタリングサービスを開始する場合は、sudo systemctl edit confluent-kafka-connect
を使用するか /etc/systemd/system/confluent-kafka-connect.service.d/<service.conf>
ファイルを編集して環境変数を構成する必要があります。この方法を使用する場合、systemctl
の環境変数を適切に構成しないと、Connect は開始できません。
このユースケースの手順は以下のとおりです。
前述のように、Replicator を起動するために使用する構成ファイル(
my-examples/replication.properties
、etc/kafka/replication.properties
、/etc/kafka/connect-distributed.properties
など)に以下のプロパティを追加して、拡張機能をアクティブ化します。rest.extension.classes=io.confluent.connect.replicator.monitoring.ReplicatorMonitoringExtension
sudo systemctl edit confluent-kafka-connect
を使用するか、/etc/systemd/system/confluent-kafka-connect.service.d/<service.conf>
ファイルを編集して、環境変数を構成します。[Service] Environment="CLASSPATH=<path to replicator install>/replicator-rest-extension-6.0.0.jar"
systemd 構成を再ロードし、コンポーネントを再起動します。
sudo systemctl reload sudo systemctl restart confluent-kafka-connect
Replicator モニタリング拡張機能 API リファレンス¶
-
GET
/WorkerMetrics/(string: connector_name)
¶ 特定のコネクター用のこの Connect ワーカーで実行されているタスクのメトリクスを取得します。
パラメーター: - connector_name (string) -- コネクターの名前
リクエストの例:
GET /WorkerMetrics/replicate-topic HTTP/1.1 Host: kafkaconnect.example.com
応答の例:
HTTP/1.1 200 OK Content-Type: application/json { "connectorName": "replicate-topic", "srcClusterId": "qhS9FOVFQkmokuIvo1ZZMA", "destClusterId": "qhS9FOVFQkmokuIvo1ZZMA", "tasks": [{ "id": "replicate-topic-0", "workerId": null, "state": null, "metrics": [{ "timestamp": 1570742038165, "throughput": 0.1075268817204301, "messageLag": 0.0, "latency": 12.764705882352942, "srcTopic": "wikipedia.parsed", "srcPartition": 1, "destTopic": "wikipedia.parsed.replica" }, { "timestamp": 1570742038166, "throughput": 0.10976948408342481, "messageLag": 0.0, "latency": 16.433333333333334, "srcTopic": "wikipedia.parsed", "srcPartition": 0, "destTopic": "wikipedia.parsed.replica" }] }] }
-
GET
/ReplicatorMetrics
¶ この Connect クラスターで実行されているすべての Replicator のメトリクスを取得します。
リクエストの例:
GET /ReplicatorMetrics HTTP/1.1 Host: kafkaconnect.example.com
応答の例:
HTTP/1.1 200 OK Content-Type: application/json { "connectors": [{ "name": "replicate-topic", "srcClusterBootstrapServers": "kafka1:9091", "destClusterBootstrapServers": "kafka1:9091", "srcClusterId": "qhS9FOVFQkmokuIvo1ZZMA", "destClusterId": "qhS9FOVFQkmokuIvo1ZZMA", "tasks": [{ "id": "0", "workerId": "connect:8083", "state": "RUNNING", "metrics": [{ "timestamp": 1570742222631, "throughput": 0.11024142872891633, "messageLag": 0.0, "latency": 34.86666666666667, "srcTopic": "wikipedia.parsed", "srcPartition": 1, "destTopic": "wikipedia.parsed.replica" }, { "timestamp": 1570742222631, "throughput": 0.10187449062754686, "messageLag": 0.0, "latency": 15.307692307692308, "srcTopic": "wikipedia.parsed", "srcPartition": 0, "destTopic": "wikipedia.parsed.replica" }] }] }] }
JMX メトリクスのモニタリング¶
Kafka ブローカーと同様に、Kafka Connect は JMX を通じてメトリクスをレポートします。Connect および Replicator をモニタリングするには、Connect ワーカーを起動する前に JMX_PORT
環境変数を設定します。次に、通常のモニタリングツールを使用してレポートされたメトリクスを収集します。JMXTrans、Graphite、および Grafana は、Kafka から JMX メトリクスを収集してレポートするための一般的な組み合わせです。
JMX を使用してレポートされたメトリクスを確認すると、Connect が Replicator のコンシューマーメトリクスと Connect のプロデューサーメトリクスを公開していることがわかります。メトリクスの完全なリストは、「 Kafka のモニタリング」で確認できます。以下に、重要なメトリクスの一部とそれぞれの意味を示します。
重要な Replicator メトリクス¶
MBean: confluent.replicator:type=confluent-replicator-task-metrics,confluent-replicator-task=([-.w]+),confluent-replicator-topic-name=([-.w]+),confluent-replicator-task-topic-partition=([-.w]+)
confluent-replicator-source-cluster
- Replicator のレプリケート元である送信元クラスターの ID です。
confluent-replicator-destination-cluster
- Replicator のレプリケート先である送信先クラスターの ID です。
confluent-replicator-destination-topic-name
- Replicator のレプリケート先である送信先トピックの名前です。
confluent-replicator-task-topic-partition-message-lag
- 送信元クラスターに対して生成されたものの、送信先クラスターにまだ到達していないメッセージの数です。
confluent-replicator-task-topic-partition-throughput
- 送信元クラスターから送信先クラスターにレプリケートされた 1 秒あたりのメッセージ数です。
confluent-replicator-task-topic-partition-byte-throughput
- 送信元クラスターから送信先クラスターにレプリケートされた 1 秒あたりのバイト数です。
confluent-replicator-task-topic-partition-latency
- 送信元クラスターに対するメッセージ生成から送信先クラスターに対するメッセージ生成までの平均時間です。
重要なプロデューサーメトリクス¶
MBean: kafka.producer:type=producer-metrics,client-id=([-.w]+)
io-ratio
またはio-wait-ratio
io-ratio
が低いか、またはio-wait-ratio
が高い場合、プロデューサーがそれほどビジーではなく、ボトルネックになる可能性が低いことを意味しています。outgoing-byte-rate
- 送信先 Kafka への書き込み時のプロデューサーのスループットをレポートします。
batch-size-avg
およびbatch-size-max
- これらが構成済みの
batch.size
に常に近い場合、可能な限り最大のレートで生成を行っているため、バッチサイズを増やしてバッチ処理を向上させる必要があります。 record-retry-rate
およびrecord-error-rate
- 1 つのトピックに関するレコード送信の再試行とレコード送信の失敗の 1 秒あたりの平均回数です。これらの数字が高い場合、送信先クラスターへの書き込みに関する問題がある可能性があります。
produce-throttle-time-avg
およびproduce-throttle-time-max
- 送信先クラスターで構成されているクォータを満たすために生成リクエストがスロットリングされている可能性があります。これらがゼロ以外の場合、送信先ブローカーによってプロデューサーの処理が遅くなっているため、クォータ構成を確認する必要があることを示しています。クォータの詳細については、「 クライアントクォータの適用」を参照してください。
waiting-threads
およびbufferpool-wait-time
- ゼロ以外の値は、メモリ不足を示しています。Connect プロデューサーはイベントをあまり高速で送信できません。そのため、メモリーバッファがいっぱいになり、Replicator スレッドのブロックが発生します。
重要なコンシューマーメトリクス¶
MBean: kafka.consumer:type=consumer-fetch-manager-metrics,client-id=([-.w]+)
io-ratio
またはio-wait-ratio
io-ratio
が低いか、またはio-wait-ratio
が高い場合、コンシューマーがそれほどビジーではなく、ボトルネックになる可能性が低いことを意味しています。bytes-consumed-rate
- 送信元クラスターからイベントを読み取る Replicator のスループットを示します。
fetch-size-avg
およびfetch-size-max
- これらが構成済みの最大フェッチサイズに常に近い場合、Replicator が可能な限り最大のレートで読み取りを行っていることを意味しています。最大フェッチサイズを増やして、タスクあたりのスループットが向上するかどうかを確認してください。
records-lag-max
- 任意のパーティションのラグ(レコード数)の最大値です。この値が時間の経過とともに増加する場合は、送信元クラスターにイベントが書き込まれるレートに Replicator が追いついていないことを示しています。
fetch-rate
、fetch-size-avg
、およびfetch-size-max
- fetch-rate は高いが、fetch-size-avg と fetch-size-max が構成済みの最大フェッチサイズに近くない場合、コンシューマーが頻繁に処理を "繰り返している" 可能性があります。
fetch.min.bytes
およびfetch.max.wait
構成値を増やしてみてください。これにより、コンシューマーのバッチ処理が効率化される可能性があります。 fetch-throttle-time-max
およびfetch-throttle-time-avg
- 送信元クラスターで構成されているクォータを満たすためにフェッチクエストがスロットリングされている可能性があります。これらがゼロ以外の場合、送信元ブローカーによってコンシューマーの処理が遅くなっているため、クォータ構成を確認する必要があることを示しています。クォータの詳細については、「 クライアントクォータの適用」を参照してください。
Replicator ラグのモニタリング¶
重要
Replicator バージョン 5.4.0 以降では、前述の JMX メトリクスを使用して Replicator ラグをモニタリングすることをお勧めします。その方がコンシューマーグループのラグツールを使用するより正確です。Replicator ラグをモニタリングする以下の方法は、Replicator バージョン 5.4.0 より前のバージョンを使用している場合にのみ推奨されます。
コンシューマーグループのコマンドツール (kafka-consumer-groups
)を使用すると、Replicator ラグをモニタリングできます。この機能を使用するには、Replicator offset.topic.commit
構成を true
(デフォルト値)に設定する必要があります。
ちなみに
Replicator は、コンシューマーグループを使用して消費せず、手動でパーティションを割り当てます。offset.topic.commit
が true の場合、Replicator はコンシューマーオフセットを(再度手動で)コミットしますが、これらは参照用であり、アクティブなコンシューマーグループを表していません。Replicator はオフセットのみをコミットし、実際にはコンシューマーグループを形成しないため、kafka-consumer-groups
コマンドの出力では、グループ内のアクティブなメンバーが(正しく)表示されません。コミットされたオフセットのみを表示します。これは Replicator で予期されている動作です。メンバーシップ情報を確認するには、kafka-consumer-groups
ではなく Connect ステータスエンドポイントを使用します。
レプリケーションラグは、送信元クラスターに対して生成されたものの、送信先クラスターにまだ到達していないメッセージの数です。これは、現在メッセージが送信元から送信先にレプリケートされるのに要する時間として測定することもできます。なんらかの理由で Replicator が遅れており、追い付くのに時間を要する場合、このラグは 2 つのデータセンター間のレイテンシより高くなる可能性があります。
レプリケーションラグをモニタリングする主な理由は以下のとおりです。
- 送信元から送信先にフェイルオーバーする必要があり、かつ "送信元をリストアできない場合"、送信元に対して生成され、送信先にレプリケートされていないすべてのイベントは失われます(送信元をリストア "できる" 場合、イベントは失われません)。
- 送信先で発生するあらゆるイベント処理はラグによって遅延します。
ラグは通常、わずか数百ミリ秒にすぎません(2 つのデータセンター間のネットワークレイテンシによって異なります)。ただし、ネットワークパーティションまたは構成の変更によりレプリケーションが一時的に中断し、Replicator が追い付く必要が生じた場合、ラグが大きくなる可能性があります。レプリケーションラグが大きくなり続ける場合、Replicator のスループットが送信元クラスターに対して生成されるものより低いため、追加の Replicator タスクまたは Connect ワーカーが必要であることを示しています。たとえば、プロデューサーが送信元クラスターに 100 MBps で書き込んでいる一方で、Replicator は 50 MBps でしかレプリケートしていない場合などです。
ちなみに
スループットを高くするには、Replicator とブローカーの TCP ソケットバッファを増やす必要があります。Replicator が送信先クラスターで実行されている場合(推奨)、以下の値も増やす必要があります。
- 送信元クラスターのブローカーの TCP 送信ソケットバッファ(
socket.send.buffer.bytes
)。 - コンシューマーの受信 TCP ソケットバッファ(
socket.receive.buffer.bytes
)。妥当な値は 512 KB ですが、必要に応じて最大 12 MB まで試してみてください。
Linux を使用している場合、Kafka 設定を有効にするためにデフォルトのソケットバッファ最大値を変更する必要が生じることがあります。バッファのチューニングの詳細については、こちらの記事 を参照してください。