Monitor Kafka Connect and Connectors

You can manage and monitor Connect, connectors, and clients using JMX and the REST interface.

Use the Connect REST interface

Kafka Connect’s REST API enables administration of the cluster. This includes APIs to view the configuration of connectors and the status of their tasks, as well as to alter their current behavior (for example, changing configuration and restarting tasks).

Once a Kafka Connect cluster is up and running, you can monitor and modify it. This section describes some common management tasks you can do when using the REST API.

Since Kafka Connect is intended to be run as a service, it also supports a REST API for managing connectors. By default this service runs on port 8083. When executed in distributed mode, the REST API will be the primary interface to the cluster. You can make requests to any cluster member; the REST API automatically forwards requests if required.

Although you can use standalone mode by submitting a connector on the command line, it also runs the REST interface. This is useful for getting status information, adding and removing connectors without stopping the process, and more.

Connector and task status

You can use the REST API to view the current status of a connector and its tasks, including the ID of the worker to which each was assigned.

Connectors and their tasks publish status updates to a shared topic (configured with status.storage.topic) which all workers in the cluster monitor. Because the workers consume this topic asynchronously, there is typically a short delay before a state change is visible through the status API. The following states are possible for a connector or one of its tasks:

  • UNASSIGNED: The connector/task has not yet been assigned to a worker.

  • RUNNING: The connector/task is running.

  • PAUSED: The connector/task has been administratively paused.

  • FAILED: The connector/task has failed (usually by raising an exception, which is reported in the status output).

In most cases, connector and task states will match, though they may be different for short periods of time when changes are occurring or if tasks have failed. For example, when a connector is first started, there may be a noticeable delay before the connector and its tasks have all transitioned to the RUNNING state. States will also diverge when tasks fail since Connect does not automatically restart failed tasks.

It’s sometimes useful to temporarily stop the message processing of a connector. For example, if the remote system is undergoing maintenance, it would be preferable for source connectors to stop polling it for new data instead of filling logs with exception spam. For this use case, Connect offers a pause/resume API. While a source connector is paused, Connect will stop polling it for additional records. While a sink connector is paused, Connect will stop pushing new messages to it. The pause state is persistent, so even if you restart the cluster, the connector will not begin message processing again until the task has been resumed. Note that there may be a delay before all of a connector’s tasks have transitioned to the PAUSED state since it may take time for them to finish whatever processing they were in the middle of when being paused. Additionally, failed tasks will not transition to the PAUSED state until they have been restarted.

Common REST examples

Below are a few common activities that are done over the REST API. These examples are shown using a worker running on localhost with default configurations and a connector named s3-connector. The utility jq is used in the examples to format the response, but this is not required. The examples are intentionally simple. For advanced use of the REST API, see the Kafka Connect REST Interface.

  • Get worker cluster ID, version, and git source code commit ID:

    curl localhost:8083/ | jq
    

    Example output:

    {
      "version": "5.5.0-ce",
      "commit": "2bedb2c6980ba7a8",
      "kafka_cluster_id": "wxOOb2vrTPCLJl28PI18KA"
    }
    
  • List the connector plugins available on a worker:

    curl localhost:8083/connector-plugins | jq
    

    Example output:

    [
      {
        "class": "io.confluent.connect.activemq.ActiveMQSourceConnector",
        "type": "source",
        "version": "5.5.0"
      },
      {
        "class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
        "type": "sink",
        "version": "5.5.0"
      },
      {
        "class": "io.confluent.connect.ibm.mq.IbmMQSourceConnector",
        "type": "source",
        "version": "5.5.0"
      },
      {
        "class": "io.confluent.connect.jdbc.JdbcSinkConnector",
        "type": "sink",
        "version": "5.5.0"
      },
      {
        "class": "io.confluent.connect.jdbc.JdbcSourceConnector",
        "type": "source",
        "version": "5.5.0"
      },
      {
        "class": "io.confluent.connect.jms.JmsSourceConnector",
        "type": "source",
        "version": "5.5.0"
      },
    
      ... omitted
    
  • List active connectors on a worker:

    curl localhost:8083/connectors
    

    Example output:

    ["s3-connector"]
    
  • Restart a connector (there is no output if the command is successful):

    curl -X POST localhost:8083/connectors/s3-connector/restart
    

    Note

    Restarting a connector does not restart the tasks.

  • Get connector tasks:

    curl localhost:8083/connectors/s3-connector/tasks | jq
    

    Example output:

    [
      {
        "id": {
          "connector": "s3-connector",
          "task": 0
        },
        "config": {
          "connector.class": "io.confluent.connect.s3.S3SinkConnector",
          "format.class": "io.confluent.connect.s3.format.bytearray.ByteArrayFormat",
          "task.class": "io.confluent.connect.s3.S3SinkTask",
          "flush.size": "1000",
          "topics": "passengers",
          "name": "s3-connector",
          "aws.access.key.id": "omitted",
          "rotate.interval.ms": "-3",
          "aws.secret.access.key": "omitted",
          "storage.class": "io.confluent.connect.s3.storage.S3Storage",
          "s3.bucket.name": "s3-test-bucket"
        }
      }
    ]
    
  • Restart a task (there is no output if the command is successful):

    curl -X POST localhost:8083/connectors/s3-connector/tasks/0/restart
    
  • Pause a connector (there is no output if the command is successful):

    curl -X PUT localhost:8083/connectors/s3-connector/pause
    

    Tip

    This command is useful if the system the connector interacts with is required to be taken out of service temporarily for maintenance.

  • Resume a paused connector (there is no output if the command is successful):

    curl -X PUT localhost:8083/connectors/s3-connector/resume
    
  • Update the connector configuration:

    curl -X PUT -H "Content-Type: application/json" --data '{"connector.class":"io.confluent.connect.s3.S3SinkConnector","format.class":"io.confluent.connect.s3.format.bytearray.ByteArrayFormat","flush.size":"1000","s3.bucket.name":"s3-test-bucket","storage.class":"io.confluent.connect.s3.storage.S3Storage","tasks.max":"2","topics":"passengers","name":"s3-connector"}' localhost:8083/connectors/s3-connector/config | jq
    

    The example command updates tasks from 1 to 2:

    {
      "name": "s3-connector",
      "config": {
        "connector.class": "io.confluent.connect.s3.S3SinkConnector",
        "format.class": "io.confluent.connect.s3.format.bytearray.ByteArrayFormat",
        "flush.size": "1000",
        "s3.bucket.name": "s3-test-bucket",
        "storage.class": "io.confluent.connect.s3.storage.S3Storage",
        "tasks.max": "2",
        "topics": "passengers",
        "name": "s3-connector"
      },
      "tasks": [
        {
          "connector": "s3-connector",
          "task": 0
        },
        {
          "connector": "s3-connector",
          "task": 1
        }
      ],
      "type": "sink"
    }
    
  • Get connector status:

    curl localhost:8083/connectors/s3-connector/status | jq
    

    Example output:

    {
       "name": "s3-connector",
       "connector": {
          "state": "RUNNING",
          "worker_id": "192.168.86.66:8083"
       },
       "tasks": [
       {
          "id": 0,
          "state": "RUNNING",
          "worker_id": "192.168.86.66:8083"
       },
       {
          "id": 1,
          "state": "RUNNING",
          "worker_id": "192.168.86.66:8083"
       }
       ],
       "type": "sink"
    }
    
  • Get the connector configuration, tasks, and type of connector:

    curl localhost:8083/connectors/s3-connector | jq
    

    Example output:

    {
       "name": "s3-connector",
       "config": {
          "connector.class": "io.confluent.connect.s3.S3SinkConnector",
          "format.class": "io.confluent.connect.s3.format.bytearray.ByteArrayFormat",
          "flush.size": "1000",
          "tasks.max": "2",
          "topics": "passengers",
          "name": "s3-connector",
          "storage.class": "io.confluent.connect.s3.storage.S3Storage",
          "s3.bucket.name": "s3-test-bucket"
       },
       "tasks": [
       {
          "connector": "s3-connector",
          "task": 0
       },
       {
          "connector": "s3-connector",
          "task": 1
       }
       ],
       "type": "sink"
    }
    

    To get the connector configuration only, use the following command:

    curl localhost:8083/connectors/s3-connector/config | jq
    
  • Delete a connector (there is no output if the command is successful):

    curl -X DELETE localhost:8083/connectors/s3-connector
    

Use JMX to Monitor Connect

Connect reports a variety of metrics through Java Management Extensions (JMX). Connect can be configured to report stats using additional pluggable stats reporters using the metric.reporters configuration option. The following sections provide MBeans, metrics, and descriptions.

Connector metrics

MBean: kafka.connect:type=connector-metrics,connector=”{([-.w]+)

Metric

Explanation

connector-type

Connector type: source or sink

connector-class

Connector class name

connector-version

Connector class version (as reported by the connector)

status

Connector status: running, paused, or stopped

Common task metrics

MBean: kafka.connect:type=connector-task-metrics,connector=”{connector}”,task=”{task}”

Metric

Explanation

status

Current task status: unassigned, running, paused, failed, or destroyed

pause-ratio

Fraction of time the task has spent in a paused state

running-ratio

Fraction of time the task has spent in the running state

offset-commit-success-percentage

Average percentage of the task’s offset commit attempts that succeeded

offset-commit-failure-percentage

Average percentage of the task’s offset commit attempts that failed or had an error

offset-commit-max-time-ms

Maximum time in milliseconds taken by the task to commit offsets

offset-commit-avg-time-ms

Average time in milliseconds taken by the task to commit offsets

batch-size-max

Maximum size of the batches processed by the connector

batch-size-avg

Average size of the batches processed by the connector

Worker metrics

MBean: kafka.connect:type=connect-worker-metrics

Metric

Explanation

task-count

Number of tasks that have run in this worker

connector-count

The number of connectors that have run in this worker

connector-startup-attempts-total

Total number of connector startups that this worker has attempted

connector-startup-success-total

Total number of connector starts that succeeded

connector-startup-success-percentage

Average percentage of the worker’s connector starts that succeeded

connector-startup-failure-total

Total number of connector starts that failed

connector-startup-failure-percentage

Average percentage of the worker’s connectors starts that failed

task-startup-attempts-total

Total number of task startups that the worker has attempted

task-startup-success-total

Total number of task starts that succeeded

task-startup-success-percentage

Average percentage of the worker’s task starts that succeeded

task-startup-failure-total

Total number of task starts that failed

task-startup-failure-percentage

Average percentage of the worker’s task starts that failed

Worker rebalance metrics

MBean: kafka.connect:type=connect-worker-rebalance-metrics

Metric

Explanation

leader-name

Name of the group leader

epoch

Epoch or generation number of the worker

completed-rebalances-total

Total number of rebalances completed by the worker

rebalancing

Whether the worker is currently rebalancing

rebalance-max-time-ms

Maximum time the worker spent rebalancing (in milliseconds)

rebalance-avg-time-ms

Average time the worker spent rebalancing (in milliseconds)

time-since-last-rebalance-ms

Time since the most recent worker rebalance (in milliseconds)

Source task metrics

MBean: kafka.connect:type=source-task-metrics,connector=([-.w]+),task=([d]+)

Metric

Explanation

source-record-write-total

Number of records output from the transformations and written to Kafka for the task belonging to the named source connector in the worker (since the task was last restarted)

source-record-write-rate

After transformations are applied, this is the average per-second number of records output from the transformations and written to Kafka for the task belonging to the named source connector in the worker (excludes any records filtered out by the transformations)

source-record-poll-total

Before transformations are applied, this is the number of records produced or polled by the task belonging to the named source connector in the worker (since the task was last restarted)

source-record-poll-rate

Before transformations are applied, this is the average per-second number of records produced or polled by the task belonging to the named source connector in the worker

source-record-active-count-max

Maximum number of records polled by the task but not yet completely written to Kafka

source-record-active-count-avg

Average number of records polled by the task but not yet completely written to Kafka

source-record-active-count

Most recent number of records polled by the task but not yet completely written to Kafka

poll-batch-max-time-ms

Maximum time in milliseconds taken by this task to poll for a batch of source records

poll-batch-avg-time-ms

Average time in milliseconds taken by this task to poll for a batch of source records

Sink task metrics

MBean: kafka.connect:type=sink-task-metrics,connector=([-.w]+),task=([d]+)

Metric

Explanation

sink-record-read-rate

Before transformations are applied, this is the average per-second number of records read from Kafka for the task belonging to the named sink connector in the worker

sink-record-read-total

Before transformations are applied, this is the total number of records produced or polled by the task belonging to the named sink connector in the worker (since the task was last restarted)

sink-record-send-rate

After transformations are applied, this is the average per-second number of records output from the transformations and sent to the task belonging to the named sink connector in the worker (excludes any records filtered out by the transformations)

sink-record-send-total

Total number of records output from the transformations and sent to the task belonging to the named sink connector in the worker (since the task was last restarted)

sink-record-active-count

Most recent number of records read from Kafka but not yet completely committed, flushed, or acknowledged by the sink task

sink-record-active-count-max

Maximum number of records read from Kafka, but that have not yet completely been committed, flushed, or acknowledged by the sink task

sink-record-active-count-avg

Average number of records read from Kafka, but that have not yet completely been committed, flushed, or acknowledged by the sink task

partition-count

Number of topic partitions assigned to the task and which belong to the named sink connector in the worker

offset-commit-seq-no

Current sequence number for offset commits

offset-commit-completion-rate

Average per-second number of offset commit completions that have completed successfully

offset-commit-completion-total

Total number of offset commit completions that were completed successfully

offset-commit-skip-rate

Average per-second number of offset commit completions that were received too late and skipped, or ignored

offset-commit-skip-total

Total number of offset commit completions that were received too late and skipped, or ignored

put-batch-max-time-ms

Maximum time in milliseconds taken by this task to put a batch of sink records

put-batch-avg-time-ms

The average time in milliseconds taken by this task to put a batch of sinks records

Client metrics

MBean: kafka.connect:type=connect-metrics,client-id=

Metric

Explanation

connection-close-rate

Connections closed per second in the window

connection-count

Current number of active connections

connection-creation-rate

New connections established per second in the window

failed-authentication-rate

Connections that failed authentication

incoming-byte-rate

Bytes per second read off all sockets

io-ratio

Fraction of time the I/O thread spent doing I/O

io-time-ns-avg

Average length of time for I/O per select call in nanoseconds

io-wait-ratio

Fraction of time the I/O thread spent waiting

io-wait-time-ns-avg

Average length of time the I/O thread spent waiting for a socket ready for reads or writes in nanoseconds

network-io-rate

Average number of network operations (reads or writes) on all connections per second

outgoing-byte-rate

Average number of outgoing bytes sent per second to all servers

request-rate

Average number of requests sent per second

request-size-avg

Average size of all requests in the window

request-size-max

Maximum size of any request sent in the window

response-rate

Responses received and sent per second

select-rate

Number of times the I/O layer checked for new I/O to perform per second

successful-authentication-rate

Connections that were successfully authenticated using SASL or SSL

Task error metrics

MBean: kafka.connect:type=task-error-metrics,connector=”{connector}”,task=”{task}”

Metric

Explanation

deadletterqueue-produce-failures

The number of failed writes to the dead letter queue.

deadletterqueue-produce-requests

The number of attempted writes to the dead letter queue.

last-error-timestamp

The epoch timestamp when this task last encountered an error.

total-errors-logged

The number of errors that were logged.

total-record-errors

The number of record processing errors in this task.

total-record-failures

The number of record processing failures in this task.

total-records-skipped

The number of records skipped due to errors.

total-retries

The number of operations retried.