Kafka Connect とコネクターのモニタリング¶
JMX および REST インターフェイスを使用して、Connect、コネクター、およびクライアントを管理およびモニタリングすることができます。
REST インターフェイスの使用¶
Kafka Connect の REST API を使用して、クラスターを管理することができます。これには、コネクターの構成とそのタスクのステータスを表示する API や、コネクターの現在の動作(構成の変更やタスクの再起動など)を変更する API が含まれます。
Kafka Connect クラスターが起動したら、モニタリングしたり変更したりすることができます。このセクションでは、REST API を使用して行われるいくつかの一般的な管理タスクについて説明します。
Kafka Connect は、サービスとして実行できるように設計されており、コネクターを管理するための REST API もサポートされています。デフォルトでは、このサービスはポート 8083
で実行されます。分散モードで実行する場合、REST API はクラスターに対するプライマリインターフェイスになります。リクエストは、どのクラスターメンバーに対しても行うことができます。REST API は必要に応じてリクエストを自動的に転送します。
コマンドラインでコネクターを送信するだけでスタンドアロンモードを使用することができますが、それによって REST インターフェイスも実行されます。これは、処理を停止せずにステータス情報を取得したり、コネクターを追加したり削除したりするのに便利です。
コネクターとタスクのステータス¶
REST API を使用して、コネクターとそのタスクの現在のステータスや、各コネクターが割り当てられているワーカーの ID を表示することができます。
コネクターとそのタスクは、クラスター内のすべてのワーカーがモニタリングする共有トピックに対してステータスのアップデートをパブリッシュします(この共有トピックは status.storage.topic
で構成されます)。ワーカーはこのトピックを非同期で消費するため、通常は、ステータス API でステートの変更を確認できるまでに短い遅延が発生します。コネクターまたはそのタスクの 1 つは、次のいずれかのステートになります。
UNASSIGNED
: コネクター/タスクは、まだワーカーに割り当てられていません。RUNNING
: コネクター/タスクは実行中です。PAUSED
: コネクター/タスクは、管理のために一時停止中です。FAILED:
コネクター/タスクが失敗しました(通常は、例外の発生によるもので、これはステータス出力で報告されます)。
ほとんどの場合、コネクターとタスクのステートは一致しますが、変更が生じていたりタスクが失敗したりした場合などは、一時的に不一致の状態になる可能性があります。たとえば、コネクターが最初に起動されてからコネクターとそのタスクがすべて RUNNING
ステートに遷移するまでには、大幅な遅延が発生することがあります。Connect では失敗したタスクが自動的に再起動されないため、タスクが失敗したときにもステートの不一致が生じます。
コネクターのメッセージ処理を一時的に停止することが役立つ場合があります。たとえば、リモートシステムがメンテナンス中の場合は、大量の例外でログが埋まることを避けるため、ソースコネクターでは新しいデータを確認するためのポーリングを停止することが望ましいでしょう。このユースケースに対応できるよう、Connect では一時停止/再開の API が提供されています。ソースコネクターが一時停止されている間、Connect は追加レコードの取得のためのポーリングを停止します。シンクコネクターが一時停止されている間、Connect はシンクコネクターへの新しいメッセージのプッシュを停止します。一時停止ステートは永続的であるため、クラスターを再起動しても、タスクが再開されるまでコネクターはメッセージ処理を再開しません。タスクが一時停止されたときに実行中だった処理が完了するまでに時間がかかる場合があるため、すべてのコネクターのタスクが PAUSED
ステートに移行するまでに遅延が生じる場合があることに注意してください。また、失敗したタスクは再起動されるまで PAUSED
ステートには遷移しません。
一般的な REST の例¶
以下に、REST API を介して行われるいくつかの一般的なアクティビティを紹介します。以下に示す例では、デフォルトの構成を使用して localhost 上で実行されているワーカーと s3-connector
という名前のコネクターを使用しています。この例では、応答のフォーマットにユーティリティー jq
が使用されていますが、これは必須ではありません。ここでは、例を意図的にシンプルにしています。REST API の高度な使い方については、 Kafka Connect REST インターフェイス を参照してください。
ワーカークラスター ID、バージョン、および git ソースコードのコミット ID を取得します。
curl localhost:8083/ | jq
出力例:
{ "version": "5.5.0-ce", "commit": "2bedb2c6980ba7a8", "kafka_cluster_id": "wxOOb2vrTPCLJl28PI18KA" }
ワーカーで使用可能なコネクタープラグインのリストを表示します。
curl localhost:8083/connector-plugins | jq
出力例:
[ { "class": "io.confluent.connect.activemq.ActiveMQSourceConnector", "type": "source", "version": "5.5.0" }, { "class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector", "type": "sink", "version": "5.5.0" }, { "class": "io.confluent.connect.ibm.mq.IbmMQSourceConnector", "type": "source", "version": "5.5.0" }, { "class": "io.confluent.connect.jdbc.JdbcSinkConnector", "type": "sink", "version": "5.5.0" }, { "class": "io.confluent.connect.jdbc.JdbcSourceConnector", "type": "source", "version": "5.5.0" }, { "class": "io.confluent.connect.jms.JmsSourceConnector", "type": "source", "version": "5.5.0" }, ... omitted
ワーカーのアクティブなコネクターのリストを表示します。
curl localhost:8083/connectors
出力例:
["s3-connector"]
コネクターを再起動します(コマンドが成功しても出力は生成されません)。
curl -X POST localhost:8083/connectors/s3-connector/restart
注釈
コネクターを再起動してもタスクは再開されません。
コネクターのタスクを取得します。
curl localhost:8083/connectors/s3-connector/tasks | jq
出力例:
[ { "id": { "connector": "s3-connector", "task": 0 }, "config": { "connector.class": "io.confluent.connect.s3.S3SinkConnector", "format.class": "io.confluent.connect.s3.format.bytearray.ByteArrayFormat", "task.class": "io.confluent.connect.s3.S3SinkTask", "flush.size": "1000", "topics": "passengers", "name": "s3-connector", "aws.access.key.id": "omitted", "rotate.interval.ms": "-3", "aws.secret.access.key": "omitted", "storage.class": "io.confluent.connect.s3.storage.S3Storage", "s3.bucket.name": "s3-test-bucket" } } ]
タスクを再起動します(コマンドが成功しても何も出力されません)。
curl -X POST localhost:8083/connectors/s3-connector/tasks/0/restart
コネクターを一時停止します(コマンドが成功しても何も出力されません)。
curl -X PUT localhost:8083/connectors/s3-connector/pause
ちなみに
コネクターとやり取りするシステムのサービスを、メンテナンスのために一時的に停止する必要がある場合に、このコマンドが役立ちます。
一時停止されているコネクターを再開します(コマンドが成功しても何も出力されません)。
curl -X PUT localhost:8083/connectors/s3-connector/resume
コネクターの構成を更新します。
curl -X PUT -H "Content-Type: application/json" --data '{"connector.class":"io.confluent.connect.s3.S3SinkConnector","format.class":"io.confluent.connect.s3.format.bytearray.ByteArrayFormat","flush.size":"1000","s3.bucket.name":"s3-test-bucket","storage.class":"io.confluent.connect.s3.storage.S3Storage","tasks.max":"2","topics":"passengers","name":"s3-connector"}' localhost:8083/connectors/s3-connector/config | jq
コマンド例では、タスクを 1 から 2 に更新しています。
{ "name": "s3-connector", "config": { "connector.class": "io.confluent.connect.s3.S3SinkConnector", "format.class": "io.confluent.connect.s3.format.bytearray.ByteArrayFormat", "flush.size": "1000", "s3.bucket.name": "s3-test-bucket", "storage.class": "io.confluent.connect.s3.storage.S3Storage", "tasks.max": "2", "topics": "passengers", "name": "s3-connector" }, "tasks": [ { "connector": "s3-connector", "task": 0 }, { "connector": "s3-connector", "task": 1 } ], "type": "sink" }
コネクターのステータスを取得します。
curl localhost:8083/connectors/s3-connector/status | jq
出力例:
{ "name": "s3-connector", "connector": { "state": "RUNNING", "worker_id": "192.168.86.66:8083" }, "tasks": [ { "id": 0, "state": "RUNNING", "worker_id": "192.168.86.66:8083" }, { "id": 1, "state": "RUNNING", "worker_id": "192.168.86.66:8083" } ], "type": "sink" }
コネクターの構成、タスク、およびコネクターのタイプを取得します。
curl localhost:8083/connectors/s3-connector | jq
出力例:
{ "name": "s3-connector", "config": { "connector.class": "io.confluent.connect.s3.S3SinkConnector", "format.class": "io.confluent.connect.s3.format.bytearray.ByteArrayFormat", "flush.size": "1000", "tasks.max": "2", "topics": "passengers", "name": "s3-connector", "storage.class": "io.confluent.connect.s3.storage.S3Storage", "s3.bucket.name": "s3-test-bucket" }, "tasks": [ { "connector": "s3-connector", "task": 0 }, { "connector": "s3-connector", "task": 1 } ], "type": "sink" }
コネクターの構成のみを取得するには、以下のコマンドを使用します。
curl localhost:8083/connectors/s3-connector/config | jq
コネクターを削除します(コマンドが成功しても何も出力されません)。
curl -X DELETE localhost:8083/connectors/s3-connector
JMX を使用した Connect のモニタリング¶
Connect では、JMX(Java Management Extensions)を通じてさまざまなメトリクスが報告されます。metric.reporters
構成オプションで、プラグ可能な統計情報レポーターを追加して、統計情報が報告されるように Connect を構成することができます。以下のセクションでは MBeans、メトリクス、および説明を示します。
コネクターのメトリクス¶
MBean: kafka.connect:type=connector-metrics,connector="{([-.w]+)
Metric | 説明 |
---|---|
connector-type | コネクターのタイプ: ソースまたはシンク |
connector-class | コネクタークラスの名前 |
connector-version | コネクタークラスのバージョン(コネクターによって報告される) |
status | コネクターのステータス: running、paused、または stopped |
共通タスクのメトリクス¶
MBean: kafka.connect:type=connector-task-metrics,connector="{connector}",task="{task}"
Metric | 説明 |
---|---|
status | 現在のタスクのステータス: unassigned、running、paused、failed、または destroyed |
pause-ratio | タスクが paused ステートで経過した時間の割合 |
running-ratio | タスクが running ステートで経過した時間の割合 |
offset-commit-success-percentage | タスクによるオフセットコミットの試行のうち、成功した試行の平均比率(パーセンテージ) |
offset-commit-failure-percentage | タスクによるオフセットのコミットの試行のうち、失敗またはエラーになった試行の平均比率(パーセンテージ) |
offset-commit-max-time-ms | タスクがオフセットをコミットする際にかかった最大時間(単位: ミリ秒) |
offset-commit-avg-time-ms | タスクがオフセットをコミットするためにかかった平均時間(単位: ミリ秒) |
batch-size-max | コネクターで処理されるバッチの最大サイズ |
batch-size-avg | コネクターで処理されるバッチの平均サイズ |
ワーカーのメトリクス¶
MBean: kafka.connect:type=connect-worker-metrics
Metric | 説明 |
---|---|
task-count | このワーカーで実行されたタスクの数 |
connector-count | このワーカーで実行されたコネクターの数 |
connector-startup-attempts-total | このワーカーがコネクターの起動を試行した回数の合計 |
connector-startup-success-total | コネクターが正常に起動した回数の合計 |
connector-startup-success-percentage | ワーカーのコネクターの起動回数のうち、成功した回数の平均比率(パーセンテージ) |
connector-startup-failure-total | コネクターの起動に失敗した回数の合計 |
connector-startup-failure-percentage | ワーカーのコネクターの起動回数のうち、失敗した回数の平均比率(パーセンテージ) |
task-startup-attempts-total | ワーカーがタスクの起動を試行した回数の合計 |
task-startup-success-total | タスクの起動に成功した回数の合計 |
task-startup-success-percentage | ワーカーのタスクの起動回数のうち、成功した回数の平均比率(パーセンテージ) |
task-startup-failure-total | タスクの起動に失敗した回数の合計 |
task-startup-failure-percentage | ワーカーのタスクの起動回数のうち、失敗した回数の平均比率(パーセンテージ) |
ワーカーのバランス調整のメトリクス¶
MBean: kafka.connect:type=connect-worker-rebalance-metrics
Metric | 説明 |
---|---|
leader-name | グループリーダーの名前 |
epoch | ワーカーのエポックまたは世代番号 |
completed-rebalances-total | ワーカーによって実行されたバランス調整の合計数 |
rebalancing | ワーカーが現在バランス調整を行っているかどうか |
rebalance-max-time-ms | ワーカーがバランス調整に費やした最大時間(単位: ミリ秒) |
rebalance-avg-time-ms | ワーカーがバランス調整に費やした平均時間(単位: ミリ秒) |
time-since-last-rebalance-ms | 直近でワーカーのバランス調整が行われてから経過した時間(単位: ミリ秒) |
ソースタスクのメトリクス¶
MBean: kafka.connect:type=source-task-metrics,connector=([-.w]+),task=([d]+)
Metric | 説明 |
---|---|
source-record-write-total | (タスクが最後に再開されて以降)変換によって出力され、ワーカー内の名前付きソースコネクターに属するタスクに対して Kafka に書き込まれたレコードの件数 |
source-record-write-rate | 変換の適用後に、変換によって出力され、ワーカー内の名前付きソースコネクターに属するタスクに対して Kafka に書き込まれたレコードの 1 秒あたりの平均件数(変換時のフィルターで除外されたレコードは除きます) |
source-record-poll-total | (タスクが最後に再開されて以降)変換の適用前に、ワーカー内の名前付きソースコネクターに属するタスクによって生成またはポーリングされたレコードの件数 |
source-record-poll-rate | 変換の適用前に、ワーカー内の名前付きソースコネクターに属するタスクによって生成またはポーリングされたレコードの 1 秒あたりの平均件数 |
source-record-active-count-max | タスクによってポーリングされ、まだ Kafka への書き込みが完了していないレコードの最大件数 |
source-record-active-count-avg | タスクによってポーリングされ、まだ Kafka への書き込みが完了していないレコードの平均件数 |
source-record-active-count | タスクによってポーリングされ、まだ Kafka への書き込みが完了していないレコードの最新の件数 |
poll-batch-max-time-ms | このタスクでソースレコードのバッチをポーリングする際にかかった最大時間(単位: ミリ秒) |
poll-batch-avg-time-ms | このタスクでソースレコードのバッチをポーリングする際にかかった平均時間(単位: ミリ秒) |
シンクタスクのメトリクス¶
MBean: kafka.connect:type=sink-task-metrics,connector=([-.w]+),task=([d]+)
Metric | 説明 |
---|---|
sink-record-read-rate | 変換の適用前に、ワーカー内の名前付きシンクコネクターに属するタスクに対して Kafka から読み取られたレコードの 1 秒あたりの平均件数 |
sink-record-read-total | 変換適用前に、ワーカーの指定シンクコネクターに属するタスクで生成またはポーリングされるレコードの合計数(タスクの直近の再開以降) |
sink-record-send-rate | 変換の適用後に、変換によって出力され、ワーカー内の名前付きシンクコネクターに属するタスクに送信されたレコードの 1 秒あたりの平均件数(変換時のフィルターで除外されたレコードは除きます) |
sink-record-send-total | 変換によって出力されてワーカーの指定シンクコネクターに属するタスクに送信されるレコードの合計数(タスクの直近の再開以降) |
sink-record-active-count | シンクタスクが Kafka から読み取り、まだコミット、フラッシュ、または確認応答を完了していないレコードの最新の件数 |
sink-record-active-count-max | シンクタスクが Kafka から読み取り、まだコミット、フラッシュ、または確認応答を完了していないレコードの最大件数 |
sink-record-active-count-avg | シンクタスクが Kafka から読み取り、まだコミット、フラッシュ、または確認応答を完了していないレコードの平均件数 |
partition-count | ワーカー内の名前付きシンクコネクターに属する、タスクに割り当てられたトピックパーティションの数 |
offset-commit-seq-no | オフセットコミットの現在のシーケンス番号 |
offset-commit-completion-rate | 正常に完了したオフセットのコミットの 1 秒あたりの平均完了件数 |
offset-commit-completion-total | 正常に完了したオフセットのコミットの合計完了件数 |
offset-commit-skip-rate | 受信が遅すぎてスキップまたは無視されたオフセットコミットの 1 秒あたりの平均完了件数 |
offset-commit-skip-total | 受信が遅すぎてスキップまたは無視されたオフセットコミットの合計完了件数 |
put-batch-max-time-ms | このタスクでシンクレコードのバッチを配置する際にかかった最大時間(単位: ミリ秒) |
put-batch-avg-time-ms | このタスクでシンクレコードのバッチを配置するためにかかった平均時間(単位: ミリ秒) |
クライアントのメトリクス¶
MBean: kafka.connect:type=connect-metrics,client-id=
Metric | 説明 |
---|---|
connection-close-rate | ウィンドウにおける、1 秒あたりの終了された接続の数 |
connection-count | 現在アクティブな接続の数 |
connection-creation-rate | ウィンドウにおける、1 秒あたりの新規接続の確立数 |
failed-authentication-rate | 認証に失敗した接続 |
incoming-byte-rate | すべてのソケットから読み取られた 1 秒あたりのバイト数 |
io-ratio | I/O スレッドで I/O 処理に費やされた時間の割合 |
io-time-ns-avg | select 呼び出しごとの I/O の所要時間の平均値(単位: ナノ秒) |
io-wait-ratio | I/O スレッドで待機に費やされた時間の割合 |
io-wait-time-ns-avg | ソケットで読み取りまたは書き込みの準備が整うまでの I/O スレッドの待機時間の平均値(単位: ナノ秒) |
network-io-rate | すべての接続における 1 秒あたりのネットワーク操作(読み取りまたは書き込み)の平均回数 |
outgoing-byte-rate | 1 秒あたりの平均送信バイト数(すべてのサーバーに対する送信) |
request-rate | 1 秒あたりの平均送信リクエスト数 |
request-size-avg | ウィンドウにおけるすべてのリクエストの平均サイズ |
request-size-max | ウィンドウにおける送信リクエストの最大サイズ |
response-rate | 1 秒あたりに送受信された応答 |
select-rate | I/O レイヤーで新規 I/O の実行がチェックされた、1 秒あたりの回数 |
successful-authentication-rate | SASL または SSL を使用して正常に認証された接続 |