Monitor Kafka Connect and Connectors¶
You can manage and monitor Connect, connectors, and clients using JMX and the REST interface.
Use the Connect REST interface¶
Kafka Connect’s REST API enables administration of the cluster. This includes APIs to view the configuration of connectors and the status of their tasks, as well as to alter their current behavior (for example, changing configuration and restarting tasks).
Once a Kafka Connect cluster is up and running, you can monitor and modify it. This section describes some common management tasks you can do when using the REST API.
Since Kafka Connect is intended to be run as a service, it also supports a
REST API for managing connectors. By default this service runs on port 8083
.
When executed in distributed mode, the REST API will be the primary interface to
the cluster. You can make requests to any cluster member; the REST API
automatically forwards requests if required.
Although you can use standalone mode by submitting a connector on the command line, it also runs the REST interface. This is useful for getting status information, adding and removing connectors without stopping the process, and more.
Connector and task status¶
You can use the REST API to view the current status of a connector and its tasks, including the ID of the worker to which each was assigned.
Connectors and their tasks publish status updates to a shared topic (configured with
status.storage.topic
) which all workers in the cluster monitor. Because the workers consume
this topic asynchronously, there is typically a short delay before a state change is visible
through the status API. The following states are possible for a connector or one of its tasks:
UNASSIGNED
: The connector/task has not yet been assigned to a worker.RUNNING
: The connector/task is running.PAUSED
: The connector/task has been administratively paused.FAILED:
The connector/task has failed (usually by raising an exception, which is reported in the status output).
In most cases, connector and task states will match, though they may be different for short periods
of time when changes are occurring or if tasks have failed. For example, when a connector is first
started, there may be a noticeable delay before the connector and its tasks have all transitioned to
the RUNNING
state. States will also diverge when tasks fail since Connect does not automatically
restart failed tasks.
It’s sometimes useful to temporarily stop the message processing of a connector. For example,
if the remote system is undergoing maintenance, it would be preferable for source connectors to
stop polling it for new data instead of filling logs with exception spam. For this use case,
Connect offers a pause/resume API. While a source connector is paused, Connect will stop polling it
for additional records. While a sink connector is paused, Connect will stop pushing new messages to
it. The pause state is persistent, so even if you restart the cluster, the connector will not begin
message processing again until the task has been resumed. Note that there may be a delay before all
of a connector’s tasks have transitioned to the PAUSED
state since it may take time for them to
finish whatever processing they were in the middle of when being paused. Additionally, failed tasks
will not transition to the PAUSED
state until they have been restarted.
Common REST examples¶
Below are a few common activities that are done over the REST API. These
examples are shown using a worker running on localhost with default
configurations and a connector named s3-connector
. The utility jq
is
used in the examples to format the response, but this is not required. The
examples are intentionally simple. For advanced use of the REST API, see the
Kafka Connect REST Interface.
Get worker cluster ID, version, and git source code commit ID:
curl localhost:8083/ | jq
Example output:
{ "version": "5.5.0-ce", "commit": "2bedb2c6980ba7a8", "kafka_cluster_id": "wxOOb2vrTPCLJl28PI18KA" }
List the connector plugins available on a worker:
curl localhost:8083/connector-plugins | jq
Example output:
[ { "class": "io.confluent.connect.activemq.ActiveMQSourceConnector", "type": "source", "version": "5.5.0" }, { "class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector", "type": "sink", "version": "5.5.0" }, { "class": "io.confluent.connect.ibm.mq.IbmMQSourceConnector", "type": "source", "version": "5.5.0" }, { "class": "io.confluent.connect.jdbc.JdbcSinkConnector", "type": "sink", "version": "5.5.0" }, { "class": "io.confluent.connect.jdbc.JdbcSourceConnector", "type": "source", "version": "5.5.0" }, { "class": "io.confluent.connect.jms.JmsSourceConnector", "type": "source", "version": "5.5.0" }, ... omitted
List active connectors on a worker:
curl localhost:8083/connectors
Example output:
["s3-connector"]
Restart a connector (there is no output if the command is successful):
curl -X POST localhost:8083/connectors/s3-connector/restart
Note
Restarting a connector does not restart the tasks.
Get connector tasks:
curl localhost:8083/connectors/s3-connector/tasks | jq
Example output:
[ { "id": { "connector": "s3-connector", "task": 0 }, "config": { "connector.class": "io.confluent.connect.s3.S3SinkConnector", "format.class": "io.confluent.connect.s3.format.bytearray.ByteArrayFormat", "task.class": "io.confluent.connect.s3.S3SinkTask", "flush.size": "1000", "topics": "passengers", "name": "s3-connector", "aws.access.key.id": "omitted", "rotate.interval.ms": "-3", "aws.secret.access.key": "omitted", "storage.class": "io.confluent.connect.s3.storage.S3Storage", "s3.bucket.name": "s3-test-bucket" } } ]
Restart a task (there is no output if the command is successful):
curl -X POST localhost:8083/connectors/s3-connector/tasks/0/restart
Pause a connector (there is no output if the command is successful):
curl -X PUT localhost:8083/connectors/s3-connector/pause
Tip
This command is useful if the system the connector interacts with is required to be taken out of service temporarily for maintenance.
Resume a paused connector (there is no output if the command is successful):
curl -X PUT localhost:8083/connectors/s3-connector/resume
Update the connector configuration:
curl -X PUT -H "Content-Type: application/json" --data '{"connector.class":"io.confluent.connect.s3.S3SinkConnector","format.class":"io.confluent.connect.s3.format.bytearray.ByteArrayFormat","flush.size":"1000","s3.bucket.name":"s3-test-bucket","storage.class":"io.confluent.connect.s3.storage.S3Storage","tasks.max":"2","topics":"passengers","name":"s3-connector"}' localhost:8083/connectors/s3-connector/config | jq
The example command updates tasks from 1 to 2:
{ "name": "s3-connector", "config": { "connector.class": "io.confluent.connect.s3.S3SinkConnector", "format.class": "io.confluent.connect.s3.format.bytearray.ByteArrayFormat", "flush.size": "1000", "s3.bucket.name": "s3-test-bucket", "storage.class": "io.confluent.connect.s3.storage.S3Storage", "tasks.max": "2", "topics": "passengers", "name": "s3-connector" }, "tasks": [ { "connector": "s3-connector", "task": 0 }, { "connector": "s3-connector", "task": 1 } ], "type": "sink" }
Get connector status:
curl localhost:8083/connectors/s3-connector/status | jq
Example output:
{ "name": "s3-connector", "connector": { "state": "RUNNING", "worker_id": "192.168.86.66:8083" }, "tasks": [ { "id": 0, "state": "RUNNING", "worker_id": "192.168.86.66:8083" }, { "id": 1, "state": "RUNNING", "worker_id": "192.168.86.66:8083" } ], "type": "sink" }
Get the connector configuration, tasks, and type of connector:
curl localhost:8083/connectors/s3-connector | jq
Example output:
{ "name": "s3-connector", "config": { "connector.class": "io.confluent.connect.s3.S3SinkConnector", "format.class": "io.confluent.connect.s3.format.bytearray.ByteArrayFormat", "flush.size": "1000", "tasks.max": "2", "topics": "passengers", "name": "s3-connector", "storage.class": "io.confluent.connect.s3.storage.S3Storage", "s3.bucket.name": "s3-test-bucket" }, "tasks": [ { "connector": "s3-connector", "task": 0 }, { "connector": "s3-connector", "task": 1 } ], "type": "sink" }
To get the connector configuration only, use the following command:
curl localhost:8083/connectors/s3-connector/config | jq
Delete a connector (there is no output if the command is successful):
curl -X DELETE localhost:8083/connectors/s3-connector
Use JMX to Monitor Connect¶
Connect reports a variety of metrics through Java Management Extensions (JMX). Connect can be configured to report stats using additional pluggable stats reporters using the metric.reporters
configuration option. The following sections provide MBeans, metrics, and descriptions.
Connector metrics¶
MBean: kafka.connect:type=connector-metrics,connector=”{([-.w]+)
Metric | Explanation |
---|---|
connector-type | Connector type: source or sink |
connector-class | Connector class name |
connector-version | Connector class version (as reported by the connector) |
status | Connector status: running, paused, or stopped |
Common task metrics¶
MBean: kafka.connect:type=connector-task-metrics,connector=”{connector}”,task=”{task}”
Metric | Explanation |
---|---|
status | Current task status: unassigned, running, paused, failed, or destroyed |
pause-ratio | Fraction of time the task has spent in a paused state |
running-ratio | Fraction of time the task has spent in the running state |
offset-commit-success-percentage | Average percentage of the task’s offset commit attempts that succeeded |
offset-commit-failure-percentage | Average percentage of the task’s offset commit attempts that failed or had an error |
offset-commit-max-time-ms | Maximum time in milliseconds taken by the task to commit offsets |
offset-commit-avg-time-ms | Average time in milliseconds taken by the task to commit offsets |
batch-size-max | Maximum size of the batches processed by the connector |
batch-size-avg | Average size of the batches processed by the connector |
Worker metrics¶
MBean: kafka.connect:type=connect-worker-metrics
Metric | Explanation |
---|---|
task-count | Number of tasks that have run in this worker |
connector-count | The number of connectors that have run in this worker |
connector-startup-attempts-total | Total number of connector startups that this worker has attempted |
connector-startup-success-total | Total number of connector starts that succeeded |
connector-startup-success-percentage | Average percentage of the worker’s connector starts that succeeded |
connector-startup-failure-total | Total number of connector starts that failed |
connector-startup-failure-percentage | Average percentage of the worker’s connectors starts that failed |
task-startup-attempts-total | Total number of task startups that the worker has attempted |
task-startup-success-total | Total number of task starts that succeeded |
task-startup-success-percentage | Average percentage of the worker’s task starts that succeeded |
task-startup-failure-total | Total number of task starts that failed |
task-startup-failure-percentage | Average percentage of the worker’s task starts that failed |
Worker rebalance metrics¶
MBean: kafka.connect:type=connect-worker-rebalance-metrics
Metric | Explanation |
---|---|
leader-name | Name of the group leader |
epoch | Epoch or generation number of the worker |
completed-rebalances-total | Total number of rebalances completed by the worker |
rebalancing | Whether the worker is currently rebalancing |
rebalance-max-time-ms | Maximum time the worker spent rebalancing (in milliseconds) |
rebalance-avg-time-ms | Average time the worker spent rebalancing (in milliseconds) |
time-since-last-rebalance-ms | Time since the most recent worker rebalance (in milliseconds) |
Source task metrics¶
MBean: kafka.connect:type=source-task-metrics,connector=([-.w]+),task=([d]+)
Metric | Explanation |
---|---|
source-record-write-total | Number of records output from the transformations and written to Kafka for the task belonging to the named source connector in the worker (since the task was last restarted) |
source-record-write-rate | After transformations are applied, this is the average per-second number of records output from the transformations and written to Kafka for the task belonging to the named source connector in the worker (excludes any records filtered out by the transformations) |
source-record-poll-total | Before transformations are applied, this is the number of records produced or polled by the task belonging to the named source connector in the worker (since the task was last restarted) |
source-record-poll-rate | Before transformations are applied, this is the average per-second number of records produced or polled by the task belonging to the named source connector in the worker |
source-record-active-count-max | Maximum number of records polled by the task but not yet completely written to Kafka |
source-record-active-count-avg | Average number of records polled by the task but not yet completely written to Kafka |
source-record-active-count | Most recent number of records polled by the task but not yet completely written to Kafka |
poll-batch-max-time-ms | Maximum time in milliseconds taken by this task to poll for a batch of source records |
poll-batch-avg-time-ms | Average time in milliseconds taken by this task to poll for a batch of source records |
Sink task metrics¶
MBean: kafka.connect:type=sink-task-metrics,connector=([-.w]+),task=([d]+)
Metric | Explanation |
---|---|
sink-record-read-rate | Before transformations are applied, this is the average per-second number of records read from Kafka for the task belonging to the named sink connector in the worker |
sink-record-read-total | Before transformations are applied, this is the total number of records produced or polled by the task belonging to the named sink connector in the worker (since the task was last restarted) |
sink-record-send-rate | After transformations are applied, this is the average per-second number of records output from the transformations and sent to the task belonging to the named sink connector in the worker (excludes any records filtered out by the transformations) |
sink-record-send-total | Total number of records output from the transformations and sent to the task belonging to the named sink connector in the worker (since the task was last restarted) |
sink-record-active-count | Most recent number of records read from Kafka but not yet completely committed, flushed, or acknowledged by the sink task |
sink-record-active-count-max | Maximum number of records read from Kafka, but that have not yet completely been committed, flushed, or acknowledged by the sink task |
sink-record-active-count-avg | Average number of records read from Kafka, but that have not yet completely been committed, flushed, or acknowledged by the sink task |
partition-count | Number of topic partitions assigned to the task and which belong to the named sink connector in the worker |
offset-commit-seq-no | Current sequence number for offset commits |
offset-commit-completion-rate | Average per-second number of offset commit completions that have completed successfully |
offset-commit-completion-total | Total number of offset commit completions that were completed successfully |
offset-commit-skip-rate | Average per-second number of offset commit completions that were received too late and skipped, or ignored |
offset-commit-skip-total | Total number of offset commit completions that were received too late and skipped, or ignored |
put-batch-max-time-ms | Maximum time in milliseconds taken by this task to put a batch of sink records |
put-batch-avg-time-ms | The average time in milliseconds taken by this task to put a batch of sinks records |
Client metrics¶
MBean: kafka.connect:type=connect-metrics,client-id=
Metric | Explanation |
---|---|
connection-close-rate | Connections closed per second in the window |
connection-count | Current number of active connections |
connection-creation-rate | New connections established per second in the window |
failed-authentication-rate | Connections that failed authentication |
incoming-byte-rate | Bytes per second read off all sockets |
io-ratio | Fraction of time the I/O thread spent doing I/O |
io-time-ns-avg | Average length of time for I/O per select call in nanoseconds |
io-wait-ratio | Fraction of time the I/O thread spent waiting |
io-wait-time-ns-avg | Average length of time the I/O thread spent waiting for a socket ready for reads or writes in nanoseconds |
network-io-rate | Average number of network operations (reads or writes) on all connections per second |
outgoing-byte-rate | Average number of outgoing bytes sent per second to all servers |
request-rate | Average number of requests sent per second |
request-size-avg | Average size of all requests in the window |
request-size-max | Maximum size of any request sent in the window |
response-rate | Responses received and sent per second |
select-rate | Number of times the I/O layer checked for new I/O to perform per second |
successful-authentication-rate | Connections that were successfully authenticated using SASL or SSL |
Task error metrics¶
MBean: kafka.connect:type=task-error-metrics,connector=”{connector}”,task=”{task}”
Metric | Explanation |
---|---|
deadletterqueue-produce-failures | The number of failed writes to the dead letter queue. |
deadletterqueue-produce-requests | The number of attempted writes to the dead letter queue. |
last-error-timestamp | The epoch timestamp when this task last encountered an error. |
total-errors-logged | The number of errors that were logged. |
total-record-errors | The number of record processing errors in this task. |
total-record-failures | The number of record processing failures in this task. |
total-records-skipped | The number of records skipped due to errors. |
total-retries | The number of operations retried. |