.. _connect_userguide_rest: |kconnect-long| REST Interface ------------------------------ Since |kconnect-long| is intended to be run as a service, it also supports a REST API for managing connectors. By default this service runs on port ``8083``. When executed in distributed mode, the REST API will be the primary interface to the cluster. You can make requests to any cluster member; the REST API automatically forwards requests if required. Although you can use the standalone mode just by submitting a connector on the command line, it also runs the REST interface. This is useful for getting status information, adding and removing connectors without stopping the process, and more. Currently the top level resources are ``connector`` and ``connector-plugins``. The sub-resources for ``connector`` lists configuration settings and tasks and the sub-resource for ``connector-plugins`` provides configuration validation and recommendation. Note that if you try to modify, update or delete a resource under ``connector`` which may require the request to be forwarded to the leader, |kconnect| will return status code 409 while the worker group rebalance is in process as the leader may change during rebalance. .. note:: For common activities that you can do using the REST API and curl, see :ref:`connect_managing_rest_examples`. Content Types ~~~~~~~~~~~~~ Currently the REST API only supports ``application/json`` as both the request and response entity content type. Your requests *should* specify the expected content type of the response via the HTTP ``Accept`` header:: Accept: application/json and *should* specify the content type of the request entity (if one is included) via the ``Content-Type`` header:: Content-Type: application/json Status and Errors ~~~~~~~~~~~~~~~~~ The REST API will return standards-compliant HTTP status. Clients *should* check the HTTP status, especially before attempting to parse and use response entities. Currently the API does not use redirects (statuses in the 300 range), but the use of these codes is reserved for future use so clients should handle them. When possible, all endpoints will use a standard error message format for all errors (status codes in the 400 or 500 range). For example, a request entity that omits a required field may generate the following response: .. sourcecode:: http HTTP/1.1 422 Unprocessable Entity Content-Type: application/json { "error_code": 422, "message": "config may not be empty" } |kconnect| Cluster ~~~~~~~~~~~~~~~~~~ .. http:get:: / Top-level (root) request that gets the version of the |kconnect| worker that serves the REST request, the git commit ID of the source code, and the |ak| cluster ID that the worker is connected to. :>json string version: |kconnect| worker version :>json string commit ID: git commit ID :>json string cluster ID: |ak| cluster ID **Example request**: .. sourcecode:: http GET / HTTP/1.1 Host: connect.example.com Accept: application/json **Example response**: .. sourcecode:: http HTTP/1.1 200 OK Content-Type: application/json { "version":"5.5.0", "commit":"e5741b90cde98052", "kafka_cluster_id":"I4ZmrWqfT2e-upky_4fdPA" } Connectors ~~~~~~~~~~ .. http:get:: /connectors Get a list of active connectors :>json array connectors: List of connector names **Example request**: .. sourcecode:: http GET /connectors HTTP/1.1 Host: connect.example.com Accept: application/json **Example response**: .. sourcecode:: http HTTP/1.1 200 OK Content-Type: application/json ["my-jdbc-source", "my-hdfs-sink"] .. http:post:: /connectors Create a new connector, returning the current connector info if successful. Return ``409 (Conflict)`` if rebalance is in process. :json string name: Name of the created connector :>json map config: Configuration parameters for the connector. :>json array tasks: List of active tasks generated by the connector :>json string tasks[i].connector: The name of the connector the task belongs to :>json int tasks[i].task: Task ID within the connector. **Example request**: .. sourcecode:: http POST /connectors HTTP/1.1 Host: connect.example.com Content-Type: application/json Accept: application/json { "name": "hdfs-sink-connector", "config": { "connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector", "tasks.max": "10", "topics": "test-topic", "hdfs.url": "hdfs://fakehost:9000", "hadoop.conf.dir": "/opt/hadoop/conf", "hadoop.home": "/opt/hadoop", "flush.size": "100", "rotate.interval.ms": "1000" } } **Example response**: .. sourcecode:: http HTTP/1.1 201 Created Content-Type: application/json { "name": "hdfs-sink-connector", "config": { "connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector", "tasks.max": "10", "topics": "test-topic", "hdfs.url": "hdfs://fakehost:9000", "hadoop.conf.dir": "/opt/hadoop/conf", "hadoop.home": "/opt/hadoop", "flush.size": "100", "rotate.interval.ms": "1000" }, "tasks": [ { "connector": "hdfs-sink-connector", "task": 1 }, { "connector": "hdfs-sink-connector", "task": 2 }, { "connector": "hdfs-sink-connector", "task": 3 } ] } .. http:get:: /connectors/(string:name) Get information about the connector. :>json string name: Name of the created connector :>json map config: Configuration parameters for the connector. :>json array tasks: List of active tasks generated by the connector :>json string tasks[i].connector: The name of the connector the task belongs to :>json int tasks[i].task: Task ID within the connector. **Example request**: .. sourcecode:: http GET /connectors/hdfs-sink-connector HTTP/1.1 Host: connect.example.com Accept: application/json **Example response**: .. sourcecode:: http HTTP/1.1 200 OK Content-Type: application/json { "name": "hdfs-sink-connector", "config": { "connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector", "tasks.max": "10", "topics": "test-topic", "hdfs.url": "hdfs://fakehost:9000", "hadoop.conf.dir": "/opt/hadoop/conf", "hadoop.home": "/opt/hadoop", "flush.size": "100", "rotate.interval.ms": "1000" }, "tasks": [ { "connector": "hdfs-sink-connector", "task": 1 }, { "connector": "hdfs-sink-connector", "task": 2 }, { "connector": "hdfs-sink-connector", "task": 3 } ] } .. http:get:: /connectors/(string:name)/config Get the configuration for the connector. :>json map config: Configuration parameters for the connector. **Example request**: .. sourcecode:: http GET /connectors/hdfs-sink-connector/config HTTP/1.1 Host: connect.example.com Accept: application/json **Example response**: .. sourcecode:: http HTTP/1.1 200 OK Content-Type: application/json { "connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector", "tasks.max": "10", "topics": "test-topic", "hdfs.url": "hdfs://fakehost:9000", "hadoop.conf.dir": "/opt/hadoop/conf", "hadoop.home": "/opt/hadoop", "flush.size": "100", "rotate.interval.ms": "1000" } .. http:put:: /connectors/(string:name)/config Create a new connector using the given configuration, or update the configuration for an existing connector. Returns information about the connector after the change has been made. Return ``409 (Conflict)`` if rebalance is in process. :json string name: Name of the created connector :>json map config: Configuration parameters for the connector. :>json array tasks: List of active tasks generated by the connector :>json string tasks[i].connector: The name of the connector the task belongs to :>json int tasks[i].task: Task ID within the connector. **Example request**: .. sourcecode:: http PUT /connectors/hdfs-sink-connector/config HTTP/1.1 Host: connect.example.com Accept: application/json { "connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector", "tasks.max": "10", "topics": "test-topic", "hdfs.url": "hdfs://fakehost:9000", "hadoop.conf.dir": "/opt/hadoop/conf", "hadoop.home": "/opt/hadoop", "flush.size": "100", "rotate.interval.ms": "1000" } **Example response**: .. sourcecode:: http HTTP/1.1 201 Created Content-Type: application/json { "name": "hdfs-sink-connector", "config": { "connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector", "tasks.max": "10", "topics": "test-topic", "hdfs.url": "hdfs://fakehost:9000", "hadoop.conf.dir": "/opt/hadoop/conf", "hadoop.home": "/opt/hadoop", "flush.size": "100", "rotate.interval.ms": "1000" }, "tasks": [ { "connector": "hdfs-sink-connector", "task": 1 }, { "connector": "hdfs-sink-connector", "task": 2 }, { "connector": "hdfs-sink-connector", "task": 3 } ] } Note that in this example the return status indicates that the connector was ``Created``. In the case of a configuration update the status would have been ``200 OK``. .. http:get:: /connectors/(string:name)/status Get current status of the connector, including whether it is running, failed or paused, which worker it is assigned to, error information if it has failed, and the state of all its tasks. :>json string name: The name of the connector. :>json map connector: The map containing connector status. :>json map tasks[i]: The map containing the task status. **Example request**: .. sourcecode:: http GET /connectors/hdfs-sink-connector/status HTTP/1.1 Host: connect.example.com **Example response**: .. sourcecode:: http HTTP/1.1 200 OK { "name": "hdfs-sink-connector", "connector": { "state": "RUNNING", "worker_id": "fakehost:8083" }, "tasks": [ { "id": 0, "state": "RUNNING", "worker_id": "fakehost:8083" }, { "id": 1, "state": "FAILED", "worker_id": "fakehost:8083", "trace": "org.apache.kafka.common.errors.RecordTooLargeException\n" } ] } .. http:post:: /connectors/(string:name)/restart Restart the connector. Return ``409 (Conflict)`` if rebalance is in process. **Example request**: .. sourcecode:: http POST /connectors/hdfs-sink-connector/restart HTTP/1.1 Host: connect.example.com **Example response**: .. sourcecode:: http HTTP/1.1 200 OK .. important:: No tasks are restarted as a result of a call to this endpoint. To restart tasks, see :ref:`restart task `. .. http:put:: /connectors/(string:name)/pause Pause the connector and its tasks, which stops message processing until the connector is resumed. This call asynchronous and the tasks will not transition to ``PAUSED`` state at the same time. **Example request**: .. sourcecode:: http PUT /connectors/hdfs-sink-connector/pause HTTP/1.1 Host: connect.example.com **Example response**: .. sourcecode:: http HTTP/1.1 202 Accepted .. http:put:: /connectors/(string:name)/resume Resume a paused connector or do nothing if the connector is not paused. This call asynchronous and the tasks will not transition to ``RUNNING`` state at the same time. **Example request**: .. sourcecode:: http PUT /connectors/hdfs-sink-connector/resume HTTP/1.1 Host: connect.example.com **Example response**: .. sourcecode:: http HTTP/1.1 202 Accepted .. http:delete:: /connectors/(string:name)/ Delete a connector, halting all tasks and deleting its configuration. Return ``409 (Conflict)`` if rebalance is in process. **Example request**: .. sourcecode:: http DELETE /connectors/hdfs-sink-connector HTTP/1.1 Host: connect.example.com **Example response**: .. sourcecode:: http HTTP/1.1 204 No Content Tasks ~~~~~ .. http:get:: /connectors/(string:name)/tasks Get a list of tasks currently running for the connector. :>json array tasks: List of active task configs that have been created by the connector :>json string tasks[i].id: The ID of task :>json string tasks[i].id.connector: The name of the connector the task belongs to :>json int tasks[i].id.task: Task ID within the connector. :>json map tasks[i].config: Configuration parameters for the task **Example request**: .. sourcecode:: http GET /connectors/hdfs-sink-connector/tasks HTTP/1.1 Host: connect.example.com **Example response**: .. sourcecode:: http HTTP/1.1 200 OK [ { "task.class": "io.confluent.connect.hdfs.HdfsSinkTask", "topics": "test-topic", "hdfs.url": "hdfs://fakehost:9000", "hadoop.conf.dir": "/opt/hadoop/conf", "hadoop.home": "/opt/hadoop", "flush.size": "100", "rotate.interval.ms": "1000" }, { "task.class": "io.confluent.connect.hdfs.HdfsSinkTask", "topics": "test-topic", "hdfs.url": "hdfs://fakehost:9000", "hadoop.conf.dir": "/opt/hadoop/conf", "hadoop.home": "/opt/hadoop", "flush.size": "100", "rotate.interval.ms": "1000" } ] .. http:get:: /connectors/(string:name)/tasks/(int:taskid)/status Get a task's status. **Example request**: .. sourcecode:: http GET /connectors/hdfs-sink-connector/tasks/1/status HTTP/1.1 Host: connect.example.com **Example response**: .. sourcecode:: http HTTP/1.1 200 OK {"state":"RUNNING","id":1,"worker_id":"192.168.86.101:8083"} .. _rest-api-task-restart: .. http:post:: /connectors/(string:name)/tasks/(int:taskid)/restart Restart an individual task. **Example request**: .. sourcecode:: http POST /connectors/hdfs-sink-connector/tasks/1/restart HTTP/1.1 Host: connect.example.com **Example response**: .. sourcecode:: http HTTP/1.1 200 OK Connector Plugins ~~~~~~~~~~~~~~~~~ .. http:get:: /connector-plugins/ Return a list of connector plugins installed in the |kconnect-long| cluster. Note that the API only checks for connectors on the worker that handles the request, which means it is possible to see inconsistent results, especially during a rolling upgrade if you add new connector jars. :>json string class: The connector class name. **Example request**: .. sourcecode:: http GET /connector-plugins/ HTTP/1.1 Host: connect.example.com **Example response**: .. sourcecode:: http HTTP/1.1 200 OK [ { "class": "io.confluent.connect.hdfs.HdfsSinkConnector" }, { "class": "io.confluent.connect.jdbc.JdbcSourceConnector" } ] .. http:put:: /connector-plugins/(string:name)/config/validate Validate the provided configuration values against the configuration definition. This API performs per config validation, returns suggested values and error messages during validation. :json string name: The class name of the connector plugin. :>json int error_count: The total number of errors encountered during configuration validation. :>json array groups: The list of groups used in configuration definitions. :>json map configs[i].definition: The definition for a config in the connector plugin, which includes the name, type, importance, etc. :>json map configs[i].value: The current value for a config, which includes the name, value, recommended values, etc. **Example request**: .. sourcecode:: http PUT /connector-plugins/FileStreamSinkConnector/config/validate/ HTTP/1.1 Host: connect.example.com Accept: application/json { "connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector", "tasks.max": "1", "topics": "test-topic" } **Example response**: .. sourcecode:: http HTTP/1.1 200 OK { "name": "FileStreamSinkConnector", "error_count": 1, "groups": [ "Common" ], "configs": [ { "definition": { "name": "topics", "type": "LIST", "required": false, "default_value": "", "importance": "HIGH", "documentation": "", "group": "Common", "width": "LONG", "display_name": "Topics", "dependents": [], "order": 4 }, "value": { "name": "topics", "value": "test-topic", "recommended_values": [], "errors": [], "visible": true } }, { "definition": { "name": "file", "type": "STRING", "required": true, "default_value": "", "importance": "HIGH", "documentation": "Destination filename.", "group": null, "width": "NONE", "display_name": "file", "dependents": [], "order": -1 }, "value": { "name": "file", "value": null, "recommended_values": [], "errors": [ "Missing required configuration \"file\" which has no default value." ], "visible": true } }, { "definition": { "name": "name", "type": "STRING", "required": true, "default_value": "", "importance": "HIGH", "documentation": "Globally unique name to use for this connector.", "group": "Common", "width": "MEDIUM", "display_name": "Connector name", "dependents": [], "order": 1 }, "value": { "name": "name", "value": "test", "recommended_values": [], "errors": [], "visible": true } }, { "definition": { "name": "tasks.max", "type": "INT", "required": false, "default_value": "1", "importance": "HIGH", "documentation": "Maximum number of tasks to use for this connector.", "group": "Common", "width": "SHORT", "display_name": "Tasks max", "dependents": [], "order": 3 }, "value": { "name": "tasks.max", "value": "1", "recommended_values": [], "errors": [], "visible": true } }, { "definition": { "name": "connector.class", "type": "STRING", "required": true, "default_value": "", "importance": "HIGH", "documentation": "Name or alias of the class for this connector. Must be a subclass of org.apache.kafka.connect.connector.Connector. If the connector is org.apache.kafka.connect.file.FileStreamSinkConnector, you can either specify this full name, or use \"FileStreamSink\" or \"FileStreamSinkConnector\" to make the configuration a bit shorter", "group": "Common", "width": "LONG", "display_name": "Connector class", "dependents": [], "order": 2 }, "value": { "name": "connector.class", "value": "org.apache.kafka.connect.file.FileStreamSinkConnector", "recommended_values": [], "errors": [], "visible": true } } ] }