Monitoring Connectors¶
Kafka Connect’s REST API enables administration of the cluster. This includes APIs to view the configuration of connectors and the status of their tasks, as well as to alter their current behavior (e.g. changing configuration and restarting tasks).
Once a Kafka Connect cluster is up and running, you can monitor and modify it. In this section, we go over a few common management tasks done via the REST API.
Using the REST Interface¶
Since Kafka Connect is intended to be run as a service, it also supports a REST API for managing connectors.
By default this service runs on port 8083
. When executed in distributed mode, the REST API will be the primary
interface to the cluster. You can make requests to any cluster member; the REST API automatically forwards requests if
required.
Although you can use the standalone mode just by submitting a connector on the command line, it also runs the REST interface. This is useful for getting status information, adding and removing connectors without stopping the process, and more.
Connector and Task Status¶
You can use the REST API to view the current status of a connector and its tasks, including the ID of the worker to which each was assigned.
Connectors and their tasks publish status updates to a shared topic (configured with
status.storage.topic
) which all workers in the cluster monitor. Because the workers consume
this topic asynchronously, there is typically a short delay before a state change is visible
through the status API. The following states are possible for a connector or one of its tasks:
UNASSIGNED
: The connector/task has not yet been assigned to a worker.RUNNING
: The connector/task is running.PAUSED
: The connector/task has been administratively paused.FAILED:
The connector/task has failed (usually by raising an exception, which is reported in the status output).
In most cases, connector and task states will match, though they may be different for short periods
of time when changes are occurring or if tasks have failed. For example, when a connector is first
started, there may be a noticeable delay before the connector and its tasks have all transitioned to
the RUNNING
state. States will also diverge when tasks fail since Connect does not automatically
restart failed tasks.
It’s sometimes useful to temporarily stop the message processing of a connector. For example,
if the remote system is undergoing maintenance, it would be preferable for source connectors to
stop polling it for new data instead of filling logs with exception spam. For this use case,
Connect offers a pause/resume API. While a source connector is paused, Connect will stop polling it
for additional records. While a sink connector is paused, Connect will stop pushing new messages to
it. The pause state is persistent, so even if you restart the cluster, the connector will not begin
message processing again until the task has been resumed. Note that there may be a delay before all
of a connector’s tasks have transitioned to the PAUSED
state since it may take time for them to
finish whatever processing they were in the middle of when being paused. Additionally, failed tasks
will not transition to the PAUSED
state until they have been restarted.
Common REST Examples¶
Below are a few common activities that are done over the REST API. These
examples are shown using a worker running on localhost with default
configurations and a connector named s3-connector
. The utility jq
is
used in the examples to format the response, but this is not required. The
examples are intentionally simple. For advanced use of the REST API, see the
Kafka Connect REST Interface.
Get worker cluster ID, version, and git source code commit ID:
curl localhost:8083/ | jq
Example output:
{ "version": "5.5.0-ce", "commit": "2bedb2c6980ba7a8", "kafka_cluster_id": "wxOOb2vrTPCLJl28PI18KA" }
List the connector plugins available on a worker:
curl localhost:8083/connector-plugins | jq
Example output:
[ { "class": "io.confluent.connect.activemq.ActiveMQSourceConnector", "type": "source", "version": "5.5.0" }, { "class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector", "type": "sink", "version": "5.5.0" }, { "class": "io.confluent.connect.ibm.mq.IbmMQSourceConnector", "type": "source", "version": "5.5.0" }, { "class": "io.confluent.connect.jdbc.JdbcSinkConnector", "type": "sink", "version": "5.5.0" }, { "class": "io.confluent.connect.jdbc.JdbcSourceConnector", "type": "source", "version": "5.5.0" }, { "class": "io.confluent.connect.jms.JmsSourceConnector", "type": "source", "version": "5.5.0" }, ... omitted
List active connectors on a worker:
curl localhost:8083/connectors
Example output:
["s3-connector"]
Restart a connector (there is no output if the command is successful):
curl -X POST localhost:8083/connectors/s3-connector/restart
Note
Restarting a connector does not restart the tasks.
Get connector tasks:
curl localhost:8083/connectors/s3-connector/tasks | jq
Example output:
[ { "id": { "connector": "s3-connector", "task": 0 }, "config": { "connector.class": "io.confluent.connect.s3.S3SinkConnector", "format.class": "io.confluent.connect.s3.format.bytearray.ByteArrayFormat", "task.class": "io.confluent.connect.s3.S3SinkTask", "flush.size": "1000", "topics": "passengers", "name": "s3-connector", "aws.access.key.id": "omitted", "rotate.interval.ms": "-3", "aws.secret.access.key": "omitted", "storage.class": "io.confluent.connect.s3.storage.S3Storage", "s3.bucket.name": "s3-test-bucket" } } ]
Restart a task (there is no output if the command is successful):
curl -X POST localhost:8083/connectors/s3-connector/tasks/0/restart
Pause a connector (there is no output if the command is successful):
curl -X PUT localhost:8083/connectors/s3-connector/pause
Tip
This command is useful if the system the connector interacts with is required to be taken out of service temporarily for maintenance.
Resume a paused connector (there is no output if the command is successful):
curl -X PUT localhost:8083/connectors/s3-connector/resume
Update the connector configuration:
curl -X PUT -H "Content-Type: application/json" --data '{"connector.class":"io.confluent.connect.s3.S3SinkConnector","format.class":"io.confluent.connect.s3.format.bytearray.ByteArrayFormat","flush.size":"1000","s3.bucket.name":"s3-test-bucket","storage.class":"io.confluent.connect.s3.storage.S3Storage","tasks.max":"2","topics":"passengers","name":"s3-connector"}' localhost:8083/connectors/s3-connector/config | jq
The example command updates tasks from 1 to 2:
{ "name": "s3-connector", "config": { "connector.class": "io.confluent.connect.s3.S3SinkConnector", "format.class": "io.confluent.connect.s3.format.bytearray.ByteArrayFormat", "flush.size": "1000", "s3.bucket.name": "s3-test-bucket", "storage.class": "io.confluent.connect.s3.storage.S3Storage", "tasks.max": "2", "topics": "passengers", "name": "s3-connector" }, "tasks": [ { "connector": "s3-connector", "task": 0 }, { "connector": "s3-connector", "task": 1 } ], "type": "sink" }
Get connector status:
curl localhost:8083/connectors/s3-connector/status | jq
Example output:
{ "name": "s3-connector", "connector": { "state": "RUNNING", "worker_id": "192.168.86.66:8083" }, "tasks": [ { "id": 0, "state": "RUNNING", "worker_id": "192.168.86.66:8083" }, { "id": 1, "state": "RUNNING", "worker_id": "192.168.86.66:8083" } ], "type": "sink" }
Get the connector configuration, tasks, and type of connector:
curl localhost:8083/connectors/s3-connector | jq
Example output:
{ "name": "s3-connector", "config": { "connector.class": "io.confluent.connect.s3.S3SinkConnector", "format.class": "io.confluent.connect.s3.format.bytearray.ByteArrayFormat", "flush.size": "1000", "tasks.max": "2", "topics": "passengers", "name": "s3-connector", "storage.class": "io.confluent.connect.s3.storage.S3Storage", "s3.bucket.name": "s3-test-bucket" }, "tasks": [ { "connector": "s3-connector", "task": 0 }, { "connector": "s3-connector", "task": 1 } ], "type": "sink" }
To get the connector configuration only, use the following command:
curl localhost:8083/connectors/s3-connector/config | jq
Delete a connector (there is no output if the command is successful):
curl -X DELETE localhost:8083/connectors/s3-connector