Connect REST Interface¶
Since Kafka Connect is intended to be run as a service, it also supports a REST API for managing connectors.
By default this service runs on port 8083
. When executed in distributed mode, the REST API will be the primary
interface to the cluster. You can make requests to any cluster member; the REST API automatically forwards requests if
required.
Although you can use the standalone mode just by submitting a connector on the command line, it also runs the REST interface. This is useful for getting status information, adding and removing connectors without stopping the process, and more.
Currently the top level resources are connector
and connector-plugins
. The sub-resources for connector
lists configuration settings and tasks and the sub-resource for connector-plugins
provides configuration
validation and recommendation.
Note that if you try to modify, update or delete a resource under connector
which may require the request
to be forwarded to the leader, Connect will return status code 409 while the worker group rebalance is in process
as the leader may change during rebalance.
Note
For common activities that you can do using the REST API and curl, see Common REST examples.
Content Types¶
Currently the REST API only supports application/json
as both the request and response entity content type. Your
requests should specify the expected content type of the response via the HTTP Accept
header:
Accept: application/json
and should specify the content type of the request entity (if one is included) via the Content-Type
header:
Content-Type: application/json
Log levels¶
You can check log levels and change log levels using Connect API endpoints. For details, see Changing log levels using the Connect API.
Status and Errors¶
The REST API will return standards-compliant HTTP status. Clients should check the HTTP status, especially before attempting to parse and use response entities. Currently the API does not use redirects (statuses in the 300 range), but the use of these codes is reserved for future use so clients should handle them.
When possible, all endpoints will use a standard error message format for all errors (status codes in the 400 or 500 range). For example, a request entity that omits a required field may generate the following response:
HTTP/1.1 422 Unprocessable Entity
Content-Type: application/json
{
"error_code": 422,
"message": "config may not be empty"
}
Connect Cluster¶
-
GET
/
¶ Top-level (root) request that gets the version of the Connect worker that serves the REST request, the git commit ID of the source code, and the Kafka cluster ID that the worker is connected to.
Response JSON Object: - version (string) – Connect worker version
- commit ID (string) – git commit ID
- cluster ID (string) – Kafka cluster ID
Example request:
GET / HTTP/1.1 Host: connect.example.com Accept: application/json
Example response:
HTTP/1.1 200 OK Content-Type: application/json { "version":"5.5.0", "commit":"e5741b90cde98052", "kafka_cluster_id":"I4ZmrWqfT2e-upky_4fdPA" }
Connectors¶
-
GET
/connectors
¶ Get a list of active connectors
Response JSON Object: - connectors (array) – List of connector names
Example request:
GET /connectors HTTP/1.1 Host: connect.example.com Accept: application/json
Example response:
HTTP/1.1 200 OK Content-Type: application/json ["my-jdbc-source", "my-hdfs-sink"]
Query parameters:
Name Data type Required / Optional Description ?expand=status
Map Optional Retrieves additional state information for each of the connectors returned in the API call. The endpoint also returns the status of each of the connectors and its tasks as shown in the ?expand=status example below. ?expand=info
Optional Map Returns metadata of each of the connectors such as the configuration, task information, and type of connector as in ?expand=info example below. ?expand=status example
{ "FileStreamSinkConnectorConnector_0": { "status": { "name": "FileStreamSinkConnectorConnector_0", "connector": { "state": "RUNNING", "worker_id": "10.0.0.162:8083" }, "tasks": [ { "id": 0, "state": "RUNNING", "worker_id": "10.0.0.162:8083" } ], "type": "sink" } }, "DatagenConnectorConnector_0": { "status": { "name": "DatagenConnectorConnector_0", "connector": { "state": "RUNNING", "worker_id": "10.0.0.162:8083" }, "tasks": [ { "id": 0, "state": "RUNNING", "worker_id": "10.0.0.162:8083" } ], "type": "source" } } }
?expand=info example
{ "FileStreamSinkConnectorConnector_0": { "info": { "name": "FileStreamSinkConnectorConnector_0", "config": { "connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector", "file": "/Users/smogili/file.txt", "tasks.max": "1", "topics": "datagen", "name": "FileStreamSinkConnectorConnector_0" }, "tasks": [ { "connector": "FileStreamSinkConnectorConnector_0", "task": 0 } ], "type": "sink" } }, "DatagenConnectorConnector_0": { "info": { "name": "DatagenConnectorConnector_0", "config": { "connector.class": "io.confluent.kafka.connect.datagen.DatagenConnector", "quickstart": "clickstream", "tasks.max": "1", "name": "DatagenConnectorConnector_0", "kafka.topic": "datagen" }, "tasks": [ { "connector": "DatagenConnectorConnector_0", "task": 0 } ], "type": "source" } } }
Users can also combine the status and info expands by appending both to the endpoint (for example,
http://localhost:8083/connectors?expand=status&expand=info
). This will return the metadata for the connectors and the current status of the connector and its tasks as shown in the following example:Note
Without using
?expand=status
and/or?expand=info
, the connector’s endpoint will only return a list of connector names that are launched.{ "FileStreamSinkConnectorConnector_0": { "info": { "name": "FileStreamSinkConnectorConnector_0", "config": { "connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector", "file": "/Users/smogili/file.txt", "tasks.max": "1", "topics": "datagen", "name": "FileStreamSinkConnectorConnector_0" }, "tasks": [ { "connector": "FileStreamSinkConnectorConnector_0", "task": 0 } ], "type": "sink" }, "status": { "name": "FileStreamSinkConnectorConnector_0", "connector": { "state": "RUNNING", "worker_id": "10.0.0.162:8083" }, "tasks": [ { "id": 0, "state": "RUNNING", "worker_id": "10.0.0.162:8083" } ], "type": "sink" } }, "DatagenConnectorConnector_0": { "info": { "name": "DatagenConnectorConnector_0", "config": { "connector.class": "io.confluent.kafka.connect.datagen.DatagenConnector", "quickstart": "clickstream", "tasks.max": "1", "name": "DatagenConnectorConnector_0", "kafka.topic": "datagen" }, "tasks": [ { "connector": "DatagenConnectorConnector_0", "task": 0 } ], "type": "source" }, "status": { "name": "DatagenConnectorConnector_0", "connector": { "state": "RUNNING", "worker_id": "10.0.0.162:8083" }, "tasks": [ { "id": 0, "state": "RUNNING", "worker_id": "10.0.0.162:8083" } ], "type": "source" } } }
-
POST
/connectors
¶ Create a new connector, returning the current connector info if successful. Return
409 (Conflict)
if rebalance is in process, or if the connector already exists.Request JSON Object: - name (string) – Name of the connector to create
- config (map) – Configuration parameters for the connector. All values should be strings.
Response JSON Object: - name (string) – Name of the created connector
- config (map) – Configuration parameters for the connector.
- tasks (array) – List of active tasks generated by the connector
- tasks[i].connector (string) – The name of the connector the task belongs to
- tasks[i].task (int) – Task ID within the connector.
Example request:
POST /connectors HTTP/1.1 Host: connect.example.com Content-Type: application/json Accept: application/json { "name": "hdfs-sink-connector", "config": { "connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector", "tasks.max": "10", "topics": "test-topic", "hdfs.url": "hdfs://fakehost:9000", "hadoop.conf.dir": "/opt/hadoop/conf", "hadoop.home": "/opt/hadoop", "flush.size": "100", "rotate.interval.ms": "1000" } }
Example response:
HTTP/1.1 201 Created Content-Type: application/json { "name": "hdfs-sink-connector", "config": { "connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector", "tasks.max": "10", "topics": "test-topic", "hdfs.url": "hdfs://fakehost:9000", "hadoop.conf.dir": "/opt/hadoop/conf", "hadoop.home": "/opt/hadoop", "flush.size": "100", "rotate.interval.ms": "1000" }, "tasks": [ { "connector": "hdfs-sink-connector", "task": 1 }, { "connector": "hdfs-sink-connector", "task": 2 }, { "connector": "hdfs-sink-connector", "task": 3 } ] }
-
GET
/connectors/
(string: name)¶ Get information about the connector.
Response JSON Object: - name (string) – Name of the created connector
- config (map) – Configuration parameters for the connector.
- tasks (array) – List of active tasks generated by the connector
- tasks[i].connector (string) – The name of the connector the task belongs to
- tasks[i].task (int) – Task ID within the connector.
Example request:
GET /connectors/hdfs-sink-connector HTTP/1.1 Host: connect.example.com Accept: application/json
Example response:
HTTP/1.1 200 OK Content-Type: application/json { "name": "hdfs-sink-connector", "config": { "connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector", "tasks.max": "10", "topics": "test-topic", "hdfs.url": "hdfs://fakehost:9000", "hadoop.conf.dir": "/opt/hadoop/conf", "hadoop.home": "/opt/hadoop", "flush.size": "100", "rotate.interval.ms": "1000" }, "tasks": [ { "connector": "hdfs-sink-connector", "task": 1 }, { "connector": "hdfs-sink-connector", "task": 2 }, { "connector": "hdfs-sink-connector", "task": 3 } ] }
-
GET
/connectors/
(string: name)/config
¶ Get the configuration for the connector.
Response JSON Object: - config (map) – Configuration parameters for the connector.
Example request:
GET /connectors/hdfs-sink-connector/config HTTP/1.1 Host: connect.example.com Accept: application/json
Example response:
HTTP/1.1 200 OK Content-Type: application/json { "connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector", "tasks.max": "10", "topics": "test-topic", "hdfs.url": "hdfs://fakehost:9000", "hadoop.conf.dir": "/opt/hadoop/conf", "hadoop.home": "/opt/hadoop", "flush.size": "100", "rotate.interval.ms": "1000" }
-
PUT
/connectors/
(string: name)/config
¶ Create a new connector using the given configuration, or update the configuration for an existing connector. Returns information about the connector after the change has been made. Return
409 (Conflict)
if rebalance is in process.Note
The payload is not wrapped in
{"config": {}}
as in the POST request. The config is directly provided.Request JSON Object: - config (map) – Configuration parameters for the connector. All values should be strings.
Response JSON Object: - name (string) – Name of the created connector
- config (map) – Configuration parameters for the connector.
- tasks (array) – List of active tasks generated by the connector
- tasks[i].connector (string) – The name of the connector the task belongs to
- tasks[i].task (int) – Task ID within the connector.
Example request:
PUT /connectors/hdfs-sink-connector/config HTTP/1.1 Host: connect.example.com Accept: application/json { "connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector", "tasks.max": "10", "topics": "test-topic", "hdfs.url": "hdfs://fakehost:9000", "hadoop.conf.dir": "/opt/hadoop/conf", "hadoop.home": "/opt/hadoop", "flush.size": "100", "rotate.interval.ms": "1000" }
Example response:
HTTP/1.1 201 Created Content-Type: application/json { "name": "hdfs-sink-connector", "config": { "connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector", "tasks.max": "10", "topics": "test-topic", "hdfs.url": "hdfs://fakehost:9000", "hadoop.conf.dir": "/opt/hadoop/conf", "hadoop.home": "/opt/hadoop", "flush.size": "100", "rotate.interval.ms": "1000" }, "tasks": [ { "connector": "hdfs-sink-connector", "task": 1 }, { "connector": "hdfs-sink-connector", "task": 2 }, { "connector": "hdfs-sink-connector", "task": 3 } ] }
Note that in this example the return status indicates that the connector was
Created
. In the case of a configuration update the status would have been200 OK
.
-
GET
/connectors/
(string: name)/status
¶ Get current status of the connector, including whether it is running, failed or paused, which worker it is assigned to, error information if it has failed, and the state of all its tasks.
Response JSON Object: - name (string) – The name of the connector.
- connector (map) – The map containing connector status.
- tasks[i] (map) – The map containing the task status.
Example request:
GET /connectors/hdfs-sink-connector/status HTTP/1.1 Host: connect.example.com
Example response:
HTTP/1.1 200 OK { "name": "hdfs-sink-connector", "connector": { "state": "RUNNING", "worker_id": "fakehost:8083" }, "tasks": [ { "id": 0, "state": "RUNNING", "worker_id": "fakehost:8083" }, { "id": 1, "state": "FAILED", "worker_id": "fakehost:8083", "trace": "org.apache.kafka.common.errors.RecordTooLargeException\n" } ] }
-
POST
/connectors/
(string: name)/restart
¶ Restart the connector. Return
409 (Conflict)
if rebalance is in process.Example request:
POST /connectors/hdfs-sink-connector/restart HTTP/1.1 Host: connect.example.com
Example response:
HTTP/1.1 200 OK
Important
No tasks are restarted as a result of a call to this endpoint. To restart tasks, see restart task.
-
PUT
/connectors/
(string: name)/pause
¶ Pause the connector and its tasks, which stops message processing until the connector is resumed. This call asynchronous and the tasks will not transition to
PAUSED
state at the same time.Example request:
PUT /connectors/hdfs-sink-connector/pause HTTP/1.1 Host: connect.example.com
Example response:
HTTP/1.1 202 Accepted
-
PUT
/connectors/
(string: name)/resume
¶ Resume a paused connector or do nothing if the connector is not paused. This call asynchronous and the tasks will not transition to
RUNNING
state at the same time.Example request:
PUT /connectors/hdfs-sink-connector/resume HTTP/1.1 Host: connect.example.com
Example response:
HTTP/1.1 202 Accepted
-
DELETE
/connectors/
(string: name)/
¶ Delete a connector, halting all tasks and deleting its configuration. Return
409 (Conflict)
if rebalance is in process.Example request:
DELETE /connectors/hdfs-sink-connector HTTP/1.1 Host: connect.example.com
Example response:
HTTP/1.1 204 No Content
Tasks¶
-
GET
/connectors/
(string: name)/tasks
¶ Get a list of tasks currently running for the connector.
Response JSON Object: - tasks (array) – List of active task configs that have been created by the connector
- tasks[i].id (string) – The ID of task
- tasks[i].id.connector (string) – The name of the connector the task belongs to
- tasks[i].id.task (int) – Task ID within the connector.
- tasks[i].config (map) – Configuration parameters for the task
Example request:
GET /connectors/hdfs-sink-connector/tasks HTTP/1.1 Host: connect.example.com
Example response:
HTTP/1.1 200 OK [ { "id": { "connector": "hdfs-sink-connector", "task": 0 }, { "task.class": "io.confluent.connect.hdfs.HdfsSinkTask", "topics": "test-topic", "hdfs.url": "hdfs://fakehost:9000", "hadoop.conf.dir": "/opt/hadoop/conf", "hadoop.home": "/opt/hadoop", "flush.size": "100", "rotate.interval.ms": "1000" } ]
-
GET
/connectors/
(string: name)/tasks/
(int: taskid)/status
¶ Get a task’s status.
Example request:
GET /connectors/hdfs-sink-connector/tasks/1/status HTTP/1.1 Host: connect.example.com
Example response:
HTTP/1.1 200 OK {"state":"RUNNING","id":1,"worker_id":"192.168.86.101:8083"}
-
POST
/connectors/
(string: name)/tasks/
(int: taskid)/restart
¶ Restart an individual task.
Example request:
POST /connectors/hdfs-sink-connector/tasks/1/restart HTTP/1.1 Host: connect.example.com
Example response:
HTTP/1.1 200 OK
Topics¶
-
GET
/connectors/
(string: name)/topics
¶ Returns a list of connector topic names. There is no defined order in which the topics are returned and consecutive calls may return the same topic names but in different order. This request is independent of whether a connector is running, and will return an empty set of topics, both for connectors that don’t have active topics as well as non-existent connectors.
Response JSON Object: - topics (array) – The set of topic names the connector has been using since its creation or since the last time its set of active topics was reset.
Example request:
GET /connectors/hdfs-sink-connector/topics HTTP/1.1 Host: connect.example.com
Example response:
HTTP/1.1 200 OK { "hdfs-sink-connector": { "topics": [ "test-topic-1", "test-topic-2", "test-topic-3", ] } }
-
PUT
/connectors/
(string: name)/topics/reset
¶ Resets the set of topic names that the connector has been using since its creation or since the last time its set of active topics was reset.
Example request:
PUT /connectors/hdfs-sink-connector/topics/reset HTTP/1.1 Host: connect.example.com
Example response:
HTTP/1.1 200 OK
Connector Plugins¶
-
GET
/connector-plugins/
¶ Return a list of connector plugins installed in the Kafka Connect cluster. Note that the API only checks for connectors on the worker that handles the request, which means it is possible to see inconsistent results, especially during a rolling upgrade if you add new connector jars.
Response JSON Object: - class (string) – The connector class name.
Example request:
GET /connector-plugins/ HTTP/1.1 Host: connect.example.com
Example response:
HTTP/1.1 200 OK [ { "class": "io.confluent.connect.hdfs.HdfsSinkConnector" }, { "class": "io.confluent.connect.jdbc.JdbcSourceConnector" } ]
-
PUT
/connector-plugins/
(string: name)/config/validate
¶ Validate the provided configuration values against the configuration definition. This API performs per config validation, returns suggested values and error messages during validation.
Request JSON Object: - config (map) – Configuration parameters for the connector. All values should be strings.
Response JSON Object: - name (string) – The class name of the connector plugin.
- error_count (int) – The total number of errors encountered during configuration validation.
- groups (array) – The list of groups used in configuration definitions.
- configs[i].definition (map) – The definition for a config in the connector plugin, which includes the name, type, importance, etc.
- configs[i].value (map) – The current value for a config, which includes the name, value, recommended values, etc.
Example request:
PUT /connector-plugins/FileStreamSinkConnector/config/validate/ HTTP/1.1 Host: connect.example.com Accept: application/json { "connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector", "tasks.max": "1", "topics": "test-topic" }
Example response:
HTTP/1.1 200 OK { "name": "FileStreamSinkConnector", "error_count": 1, "groups": [ "Common" ], "configs": [ { "definition": { "name": "topics", "type": "LIST", "required": false, "default_value": "", "importance": "HIGH", "documentation": "", "group": "Common", "width": "LONG", "display_name": "Topics", "dependents": [], "order": 4 }, "value": { "name": "topics", "value": "test-topic", "recommended_values": [], "errors": [], "visible": true } }, { "definition": { "name": "file", "type": "STRING", "required": true, "default_value": "", "importance": "HIGH", "documentation": "Destination filename.", "group": null, "width": "NONE", "display_name": "file", "dependents": [], "order": -1 }, "value": { "name": "file", "value": null, "recommended_values": [], "errors": [ "Missing required configuration \"file\" which has no default value." ], "visible": true } }, { "definition": { "name": "name", "type": "STRING", "required": true, "default_value": "", "importance": "HIGH", "documentation": "Globally unique name to use for this connector.", "group": "Common", "width": "MEDIUM", "display_name": "Connector name", "dependents": [], "order": 1 }, "value": { "name": "name", "value": "test", "recommended_values": [], "errors": [], "visible": true } }, { "definition": { "name": "tasks.max", "type": "INT", "required": false, "default_value": "1", "importance": "HIGH", "documentation": "Maximum number of tasks to use for this connector.", "group": "Common", "width": "SHORT", "display_name": "Tasks max", "dependents": [], "order": 3 }, "value": { "name": "tasks.max", "value": "1", "recommended_values": [], "errors": [], "visible": true } }, { "definition": { "name": "connector.class", "type": "STRING", "required": true, "default_value": "", "importance": "HIGH", "documentation": "Name or alias of the class for this connector. Must be a subclass of org.apache.kafka.connect.connector.Connector. If the connector is org.apache.kafka.connect.file.FileStreamSinkConnector, you can either specify this full name, or use \"FileStreamSink\" or \"FileStreamSinkConnector\" to make the configuration a bit shorter", "group": "Common", "width": "LONG", "display_name": "Connector class", "dependents": [], "order": 2 }, "value": { "name": "connector.class", "value": "org.apache.kafka.connect.file.FileStreamSinkConnector", "recommended_values": [], "errors": [], "visible": true } } ] }