Kafka Connect とコネクターのモニタリング

JMX および REST インターフェイスを使用して、Connect、コネクター、およびクライアントを管理およびモニタリングすることができます。

REST インターフェイスの使用

Kafka Connect の REST API を使用して、クラスターを管理することができます。これには、コネクターの構成とそのタスクのステータスを表示する API や、コネクターの現在の動作(構成の変更やタスクの再起動など)を変更する API が含まれます。

Kafka Connect クラスターが起動したら、モニタリングしたり変更したりすることができます。このセクションでは、REST API を使用して行われるいくつかの一般的な管理タスクについて説明します。

Kafka Connect は、サービスとして実行できるように設計されており、コネクターを管理するための REST API もサポートされています。デフォルトでは、このサービスはポート 8083 で実行されます。分散モードで実行する場合、REST API はクラスターに対するプライマリインターフェイスになります。リクエストは、どのクラスターメンバーに対しても行うことができます。REST API は必要に応じてリクエストを自動的に転送します。

コマンドラインでコネクターを送信するだけでスタンドアロンモードを使用することができますが、それによって REST インターフェイスも実行されます。これは、処理を停止せずにステータス情報を取得したり、コネクターを追加したり削除したりするのに便利です。

コネクターとタスクのステータス

REST API を使用して、コネクターとそのタスクの現在のステータスや、各コネクターが割り当てられているワーカーの ID を表示することができます。

コネクターとそのタスクは、クラスター内のすべてのワーカーがモニタリングする共有トピックに対してステータスの更新をパブリッシュします(この共有トピックは status.storage.topic で構成されます)。ワーカーはこのトピックを非同期で消費するため、通常は、ステータス API でステートの変更を確認できるまでに短い遅延が発生します。コネクターまたはそのタスクの 1 つは、次のいずれかのステートになります。

  • UNASSIGNED: コネクター/タスクは、まだワーカーに割り当てられていません。
  • RUNNING: コネクター/タスクは実行中です。
  • PAUSED: コネクター/タスクは、管理のために一時停止中です。
  • FAILED: コネクター/タスクが失敗しました(通常は、例外の発生によるもので、これはステータス出力で報告されます)。

ほとんどの場合、コネクターとタスクのステートは一致しますが、変更が生じていたりタスクが失敗したりした場合などは、一時的に不一致の状態になる可能性があります。たとえば、コネクターが最初に起動されてからコネクターとそのタスクがすべて RUNNING ステートに遷移するまでには、大幅な遅延が発生することがあります。Connect では失敗したタスクが自動的に再起動されないため、タスクが失敗したときにもステートの不一致が生じます。

コネクターのメッセージ処理を一時的に停止することが役立つ場合があります。たとえば、リモートシステムがメンテナンス中の場合は、大量の例外でログが埋まることを避けるため、ソースコネクターでは新しいデータを確認するためのポーリングを停止することが望ましいでしょう。このユースケースに対応できるよう、Connect では一時停止/再開の API が提供されています。ソースコネクターが一時停止されている間、Connect は追加レコードの取得のためのポーリングを停止します。シンクコネクターが一時停止されている間、Connect はシンクコネクターへの新しいメッセージのプッシュを停止します。一時停止ステートは永続的であるため、クラスターを再起動しても、タスクが再開されるまでコネクターはメッセージ処理を再開しません。タスクが一時停止されたときに実行中だった処理が完了するまでに時間がかかる場合があるため、すべてのコネクターのタスクが PAUSED ステートに移行するまでに遅延が生じる場合があることに注意してください。また、失敗したタスクは再起動されるまで PAUSED ステートには遷移しません。

一般的な REST の例

以下に、REST API を介して行われるいくつかの一般的なアクティビティを紹介します。以下に示す例では、デフォルトの構成を使用して localhost 上で実行されているワーカーと s3-connector という名前のコネクターを使用しています。この例では、応答のフォーマットにユーティリティー jq が使用されていますが、これは必須ではありません。ここでは、例を意図的にシンプルにしています。REST API の高度な使い方については、 Kafka Connect REST インターフェイス を参照してください。

  • ワーカークラスター ID、バージョン、および git ソースコードのコミット ID を取得します。

    curl localhost:8083/ | jq
    

    出力例:

    {
      "version": "5.5.0-ce",
      "commit": "2bedb2c6980ba7a8",
      "kafka_cluster_id": "wxOOb2vrTPCLJl28PI18KA"
    }
    
  • ワーカーで使用可能なコネクタープラグインのリストを表示します。

    curl localhost:8083/connector-plugins | jq
    

    出力例:

    [
      {
        "class": "io.confluent.connect.activemq.ActiveMQSourceConnector",
        "type": "source",
        "version": "5.5.0"
      },
      {
        "class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
        "type": "sink",
        "version": "5.5.0"
      },
      {
        "class": "io.confluent.connect.ibm.mq.IbmMQSourceConnector",
        "type": "source",
        "version": "5.5.0"
      },
      {
        "class": "io.confluent.connect.jdbc.JdbcSinkConnector",
        "type": "sink",
        "version": "5.5.0"
      },
      {
        "class": "io.confluent.connect.jdbc.JdbcSourceConnector",
        "type": "source",
        "version": "5.5.0"
      },
      {
        "class": "io.confluent.connect.jms.JmsSourceConnector",
        "type": "source",
        "version": "5.5.0"
      },
    
      ... omitted
    
  • ワーカーのアクティブなコネクターのリストを表示します。

    curl localhost:8083/connectors
    

    出力例:

    ["s3-connector"]
    
  • コネクターを再起動します(コマンドが成功しても出力は生成されません)。

    curl -X POST localhost:8083/connectors/s3-connector/restart
    

    注釈

    コネクターを再起動してもタスクは再開されません。

  • コネクターのタスクを取得します。

    curl localhost:8083/connectors/s3-connector/tasks | jq
    

    出力例:

    [
      {
        "id": {
          "connector": "s3-connector",
          "task": 0
        },
        "config": {
          "connector.class": "io.confluent.connect.s3.S3SinkConnector",
          "format.class": "io.confluent.connect.s3.format.bytearray.ByteArrayFormat",
          "task.class": "io.confluent.connect.s3.S3SinkTask",
          "flush.size": "1000",
          "topics": "passengers",
          "name": "s3-connector",
          "aws.access.key.id": "omitted",
          "rotate.interval.ms": "-3",
          "aws.secret.access.key": "omitted",
          "storage.class": "io.confluent.connect.s3.storage.S3Storage",
          "s3.bucket.name": "s3-test-bucket"
        }
      }
    ]
    
  • タスクを再起動します(コマンドが成功しても何も出力されません)。

    curl -X POST localhost:8083/connectors/s3-connector/tasks/0/restart
    
  • コネクターを一時停止します(コマンドが成功しても何も出力されません)。

    curl -X PUT localhost:8083/connectors/s3-connector/pause
    

    ちなみに

    コネクターとやり取りするシステムのサービスを、メンテナンスのために一時的に停止する必要がある場合に、このコマンドが役立ちます。

  • 一時停止されているコネクターを再開します(コマンドが成功しても何も出力されません)。

    curl -X PUT localhost:8083/connectors/s3-connector/resume
    
  • コネクターの構成を更新します。

    curl -X PUT -H "Content-Type: application/json" --data '{"connector.class":"io.confluent.connect.s3.S3SinkConnector","format.class":"io.confluent.connect.s3.format.bytearray.ByteArrayFormat","flush.size":"1000","s3.bucket.name":"s3-test-bucket","storage.class":"io.confluent.connect.s3.storage.S3Storage","tasks.max":"2","topics":"passengers","name":"s3-connector"}' localhost:8083/connectors/s3-connector/config | jq
    

    コマンド例では、タスクを 1 から 2 に更新しています。

    {
      "name": "s3-connector",
      "config": {
        "connector.class": "io.confluent.connect.s3.S3SinkConnector",
        "format.class": "io.confluent.connect.s3.format.bytearray.ByteArrayFormat",
        "flush.size": "1000",
        "s3.bucket.name": "s3-test-bucket",
        "storage.class": "io.confluent.connect.s3.storage.S3Storage",
        "tasks.max": "2",
        "topics": "passengers",
        "name": "s3-connector"
      },
      "tasks": [
        {
          "connector": "s3-connector",
          "task": 0
        },
        {
          "connector": "s3-connector",
          "task": 1
        }
      ],
      "type": "sink"
    }
    
  • コネクターのステータスを取得します。

    curl localhost:8083/connectors/s3-connector/status | jq
    

    出力例:

    {
       "name": "s3-connector",
       "connector": {
          "state": "RUNNING",
          "worker_id": "192.168.86.66:8083"
       },
       "tasks": [
       {
          "id": 0,
          "state": "RUNNING",
          "worker_id": "192.168.86.66:8083"
       },
       {
          "id": 1,
          "state": "RUNNING",
          "worker_id": "192.168.86.66:8083"
       }
       ],
       "type": "sink"
    }
    
  • コネクターの構成、タスク、およびコネクターのタイプを取得します。

    curl localhost:8083/connectors/s3-connector | jq
    

    出力例:

    {
       "name": "s3-connector",
       "config": {
          "connector.class": "io.confluent.connect.s3.S3SinkConnector",
          "format.class": "io.confluent.connect.s3.format.bytearray.ByteArrayFormat",
          "flush.size": "1000",
          "tasks.max": "2",
          "topics": "passengers",
          "name": "s3-connector",
          "storage.class": "io.confluent.connect.s3.storage.S3Storage",
          "s3.bucket.name": "s3-test-bucket"
       },
       "tasks": [
       {
          "connector": "s3-connector",
          "task": 0
       },
       {
          "connector": "s3-connector",
          "task": 1
       }
       ],
       "type": "sink"
    }
    

    コネクターの構成のみを取得するには、以下のコマンドを使用します。

    curl localhost:8083/connectors/s3-connector/config | jq
    
  • コネクターを削除します(コマンドが成功しても何も出力されません)。

    curl -X DELETE localhost:8083/connectors/s3-connector
    

JMX を使用した Connect のモニタリング

Connect では、JMX(Java Management Extensions)を通じてさまざまなメトリクスが報告されます。metric.reporters 構成オプションで、プラグ可能な統計情報レポーターを追加して、統計情報が報告されるように Connect を構成することができます。以下のセクションでは MBeans、メトリクス、および説明を示します。

コネクターのメトリクス

MBean: kafka.connect:type=connector-metrics,connector="{connector}"

メトリック 説明
connector-class コネクタークラスの名前
connector-total-task-count The number of tasks of the connector
connector-type コネクターのタイプ: ソースまたはシンク
connector-version コネクタークラスのバージョン(コネクターによって報告される)
status コネクターのステータス: running、paused、または stopped

共通タスクのメトリクス

MBean: kafka.connect:type=connector-task-metrics,connector="{connector}",task="{task}"

メトリック 説明
batch-size-avg コネクターで処理されるバッチの平均サイズ
batch-size-max コネクターで処理されるバッチの最大サイズ
offset-commit-avg-time-ms タスクがオフセットをコミットするためにかかった平均時間(単位: ミリ秒)
offset-commit-max-time-ms タスクがオフセットをコミットする際にかかった最大時間(単位: ミリ秒)
offset-commit-failure-percentage タスクによるオフセットのコミットの試行のうち、失敗またはエラーになった試行の平均比率(パーセンテージ)
offset-commit-success-percentage タスクによるオフセットコミットの試行のうち、成功した試行の平均比率(パーセンテージ)
pause-ratio タスクが paused ステートで経過した時間の割合
running-ratio タスクが running ステートで経過した時間の割合
status 現在のタスクのステータス: unassigned、running、paused、failed、または destroyed

ワーカーのメトリクス

MBean: kafka.connect:type=connect-worker-metrics

メトリック 説明
connector-count このワーカーで実行されたコネクターの数
connector-startup-attempts-total このワーカーがコネクターの起動を試行した回数の合計
connector-startup-failure-percentage ワーカーのコネクターの起動回数のうち、失敗した回数の平均比率(パーセンテージ)
connector-startup-failure-total コネクターの起動に失敗した回数の合計
connector-startup-success-percentage ワーカーのコネクターの起動回数のうち、成功した回数の平均比率(パーセンテージ)
connector-startup-success-total コネクターが正常に起動した回数の合計
task-count このワーカーで実行されたタスクの数
task-startup-attempts-total ワーカーがタスクの起動を試行した回数の合計
task-startup-failure-percentage ワーカーのタスクの起動回数のうち、失敗した回数の平均比率(パーセンテージ)
task-startup-failure-total タスクの起動に失敗した回数の合計
task-startup-success-percentage ワーカーのタスクの起動回数のうち、成功した回数の平均比率(パーセンテージ)
task-startup-success-total タスクの起動に成功した回数の合計

Per-connector worker metrics

MBean: kafka.connect:type=connect-worker-metrics,connector="{connector}"

メトリック 説明
connector-destroyed-task-count The number of destroyed tasks of the connector on the worker.
connector-failed-task-count The number of failed tasks of the connector on the worker.
connector-paused-task-count The number of paused tasks of the connector on the worker.
connector-restarting-task-count The number of restarting tasks of the connector on the worker.
connector-running-task-count The number of running tasks of the connector on the worker.
connector-total-task-count The number of tasks of the connector on the worker.
connector-unassigned-task-count The number of unassigned tasks of the connector on the worker.

ワーカーのバランス調整のメトリクス

MBean: kafka.connect:type=connect-worker-rebalance-metrics

メトリック 説明
completed-rebalances-total ワーカーによって実行されたバランス調整の合計数
connect-protocol The Connect protocol used by this cluster
epoch ワーカーのエポックまたは世代番号
leader-name グループリーダーの名前
rebalance-avg-time-ms ワーカーがバランス調整に費やした平均時間(単位: ミリ秒)
rebalance-max-time-ms ワーカーがバランス調整に費やした最大時間(単位: ミリ秒)
rebalancing ワーカーが現在バランス調整を行っているかどうか
time-since-last-rebalance-ms 直近でワーカーのバランス調整が行われてから経過した時間(単位: ミリ秒)

ソースタスクのメトリクス

MBean: kafka.connect:type=source-task-metrics,connector="{connector}",task="{task}"

メトリック 説明
poll-batch-avg-time-ms このタスクでソースレコードのバッチをポーリングする際にかかった平均時間(単位: ミリ秒)
poll-batch-max-time-ms このタスクでソースレコードのバッチをポーリングする際にかかった最大時間(単位: ミリ秒)
source-record-active-count Most recent number of records produced by the task but not yet completely written to Kafka
source-record-active-count-avg Average number of records produced by the task but not yet completely written to Kafka
source-record-active-count-max Maximum number of records produced by the task but not yet completely written to Kafka
source-record-poll-rate 変換の適用前に、ワーカー内の名前付きソースコネクターに属するタスクによって生成またはポーリングされたレコードの 1 秒あたりの平均件数
source-record-poll-total (タスクが最後に再開されて以降)変換の適用前に、ワーカー内の名前付きソースコネクターに属するタスクによって生成またはポーリングされたレコードの件数
source-record-write-rate 変換の適用後に、変換によって出力され、ワーカー内の名前付きソースコネクターに属するタスクに対して Kafka に書き込まれたレコードの 1 秒あたりの平均件数(変換時のフィルターで除外されたレコードは除きます)
source-record-write-total (タスクが最後に再開されて以降)変換によって出力され、ワーカー内の名前付きソースコネクターに属するタスクに対して Kafka に書き込まれたレコードの件数

シンクタスクのメトリクス

MBean: kafka.connect:type=sink-task-metrics,connector=([-.w]+),task=([d]+)

メトリック 説明
offset-commit-completion-rate 正常に完了したオフセットのコミットの 1 秒あたりの平均完了件数
offset-commit-completion-total 正常に完了したオフセットのコミットの合計完了件数
offset-commit-seq-no オフセットコミットの現在のシーケンス番号
offset-commit-skip-rate 受信が遅すぎてスキップまたは無視されたオフセットコミットの 1 秒あたりの平均完了件数
offset-commit-skip-total 受信が遅すぎてスキップまたは無視されたオフセットコミットの合計完了件数
partition-count ワーカー内の名前付きシンクコネクターに属する、タスクに割り当てられたトピックパーティションの数
put-batch-avg-time-ms The average time taken by this task to put a batch of sinks records
put-batch-max-time-ms このタスクでシンクレコードのバッチを配置する際にかかった最大時間(単位: ミリ秒)
sink-record-active-count シンクタスクが Kafka から読み取り、まだコミット、フラッシュ、または確認応答を完了していないレコードの最新の件数
sink-record-active-count-avg シンクタスクが Kafka から読み取り、まだコミット、フラッシュ、または確認応答を完了していないレコードの平均件数
sink-record-active-count-max シンクタスクが Kafka から読み取り、まだコミット、フラッシュ、または確認応答を完了していないレコードの最大件数
sink-record-lag-max The maximum lag in terms of number of records that the sink task is behind the consumer's position for any topic partitions
sink-record-read-rate 変換の適用前に、ワーカー内の名前付きシンクコネクターに属するタスクに対して Kafka から読み取られたレコードの 1 秒あたりの平均件数
sink-record-read-total Before transformations are applied, this is the total number of records produced or polled by the task belonging to the named source connector in the worker (since the task was last restarted)
sink-record-send-rate 変換の適用後に、変換によって出力され、ワーカー内の名前付きシンクコネクターに属するタスクに送信されたレコードの 1 秒あたりの平均件数(変換時のフィルターで除外されたレコードは除きます)
sink-record-send-total Total number of records output from the transformations and sent to the task belonging to the named source connector in the worker (since the task was last restarted)

Task error metrics

MBean: kafka.connect:type=task-error-metrics,connector="{connector}",task="{task}"

メトリック 説明
deadletterqueue-produce-failures The number of failed writes to the dead letter queue.
deadletterqueue-produce-requests The number of attempted writes to the dead letter queue.
last-error-timestamp The epoch timestamp when this task last encountered an error.
total-errors-logged The number of errors that were logged.
total-record-errors The number of record processing errors in this task.
total-record-failures The number of record processing failures in this task.
total-records-skipped The number of records skipped due to errors.
total-retries The number of operations retried.

クライアントのメトリクス

MBean: kafka.connect:type=connect-metrics,client-id=

メトリック 説明
connection-close-rate ウィンドウにおける、1 秒あたりの終了された接続の数
connection-count 現在アクティブな接続の数
connection-creation-rate ウィンドウにおける、1 秒あたりの新規接続の確立数
failed-authentication-rate 認証に失敗した接続
incoming-byte-rate すべてのソケットから読み取られた 1 秒あたりのバイト数
io-ratio I/O スレッドで I/O 処理に費やされた時間の割合
io-time-ns-avg select 呼び出しごとの I/O の所要時間の平均値(単位: ナノ秒)
io-wait-ratio I/O スレッドで待機に費やされた時間の割合
io-wait-time-ns-avg ソケットで読み取りまたは書き込みの準備が整うまでの I/O スレッドの待機時間の平均値(単位: ナノ秒)
network-io-rate すべての接続における 1 秒あたりのネットワーク操作(読み取りまたは書き込み)の平均回数
outgoing-byte-rate 1 秒あたりの平均送信バイト数(すべてのサーバーに対する送信)
request-rate 1 秒あたりの平均送信リクエスト数
request-size-avg ウィンドウにおけるすべてのリクエストの平均サイズ
request-size-max ウィンドウにおける送信リクエストの最大サイズ
response-rate 1 秒あたりに送受信された応答
select-rate I/O レイヤーで新規 I/O の実行がチェックされた、1 秒あたりの回数
successful-authentication-rate SASL または SSL を使用して正常に認証された接続