Kafka Connect REST Interface for Confluent Platform

Since Kafka Connect is intended to be run as a service, it also supports a REST API for managing connectors. By default, this service runs on port 8083. When executed in distributed mode, the REST API is the primary interface to the cluster. You can make requests to any cluster member—the REST API forwards requests if required.

Although you can use the standalone mode by submitting a connector on the command line, it also runs the REST interface. This is useful for getting status information, adding and removing connectors without stopping the process, and so forth.

You can learn more about the REST API in the Kafka Connect Rest API module of the free Kafka Connect 101 course.

Currently the top level resources are connector and connector-plugins. The sub-resources for connector lists configuration settings and tasks. The sub-resource for connector-plugins provides configuration validation and recommendation.

Note that if you try to modify, update or delete a resource under connector which may require the request to be forwarded to the leader, Connect will return HTTP 409 while the worker group rebalance is in process as the leader may change during rebalance.

Tip

For common activities that you can do using the REST API and curl, see Common REST examples.

Content Types

Currently the REST API only supports application/json as both the request and response entity content type. Your requests should specify the expected content type of the response using the HTTP Accept header:

Accept: application/json

Your requests should also specify the content type of the request entity (if one is included) using the Content-Type header:

Content-Type: application/json

Log levels

You can check log levels and change log levels using Connect API endpoints. For details, see Changing log levels using the Connect API.

Status and Errors

The REST API will return standards-compliant HTTP status. Clients should check the HTTP status, especially before attempting to parse and use response entities. Currently the API does not use redirects (statuses in the 300 range), but the use of these codes is reserved for future use so clients should handle them.

When possible, all endpoints will use a standard error message format for all errors (HTTP 400 or HTTP 500 range). For example, a request entity that omits a required field may generate the following response:

HTTP/1.1 422 Unprocessable Entity
Content-Type: application/json

{
    "error_code": 422,
    "message": "config may not be empty"
}

Connect Cluster

GET /

Top-level (root) request that gets the version of the Connect worker that serves the REST request, the git commit ID of the source code, and the Kafka cluster ID that the worker is connected to.

Response JSON Object:
 
  • version (string) – Connect worker version
  • commit ID (string) – git commit ID
  • cluster ID (string) – Kafka cluster ID

Example request:

GET / HTTP/1.1
Host: connect.example.com
Accept: application/json

Example response:

HTTP/1.1 200 OK
Content-Type: application/json

{
  "version":"5.5.0",
  "commit":"e5741b90cde98052",
  "kafka_cluster_id":"I4ZmrWqfT2e-upky_4fdPA"
}

Connectors

GET /connectors

Get a list of active connectors

Response JSON Object:
 
  • connectors (array) – List of connector names

Example request:

GET /connectors HTTP/1.1
Host: connect.example.com
Accept: application/json

Example response:

HTTP/1.1 200 OK
Content-Type: application/json

["my-jdbc-source", "my-hdfs-sink"]

Query parameters:

Name Data type Required / Optional Description
?expand=status Map Optional Retrieves additional state information for each of the connectors returned in the API call. The endpoint also returns the status of each of the connectors and its tasks as shown in the ?expand=status example below.
?expand=info Map Optional Returns metadata of each of the connectors such as the configuration, task information, and type of connector as in ?expand=info example below.

?expand=status example

 {
        "FileStreamSinkConnectorConnector_0": {
            "status": {
            "name": "FileStreamSinkConnectorConnector_0",
            "connector": {
                "state": "RUNNING",
                "worker_id": "10.0.0.162:8083"
            },
            "tasks": [
                {
                "id": 0,
                "state": "RUNNING",
                "worker_id": "10.0.0.162:8083"
                }
            ],
            "type": "sink"
            }
        },
        "DatagenConnectorConnector_0": {
            "status": {
            "name": "DatagenConnectorConnector_0",
            "connector": {
                "state": "RUNNING",
                "worker_id": "10.0.0.162:8083"
            },
            "tasks": [
                {
                "id": 0,
                "state": "RUNNING",
                "worker_id": "10.0.0.162:8083"
                }
            ],
            "type": "source"
            }
        }
}

?expand=info example

{
     "FileStreamSinkConnectorConnector_0": {
           "info": {
           "name": "FileStreamSinkConnectorConnector_0",
           "config": {
               "connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
               "file": "/Users/smogili/file.txt",
               "tasks.max": "1",
               "topics": "datagen",
               "name": "FileStreamSinkConnectorConnector_0"
           },
           "tasks": [
               {
               "connector": "FileStreamSinkConnectorConnector_0",
               "task": 0
               }
           ],
           "type": "sink"
           }
       },
       "DatagenConnectorConnector_0": {
           "info": {
           "name": "DatagenConnectorConnector_0",
           "config": {
               "connector.class": "io.confluent.kafka.connect.datagen.DatagenConnector",
               "quickstart": "clickstream",
               "tasks.max": "1",
               "name": "DatagenConnectorConnector_0",
               "kafka.topic": "datagen"
           },
           "tasks": [
               {
               "connector": "DatagenConnectorConnector_0",
               "task": 0
               }
           ],
           "type": "source"
           }
       }
  }

Users can also combine the status and info expands by appending both to the endpoint (for example, http://localhost:8083/connectors?expand=status&expand=info). This will return the metadata for the connectors and the current status of the connector and its tasks as shown in the following example:

Note

Without using ?expand=status and/or ?expand=info, the connector’s endpoint will only return a list of connector names that are launched.

{
     "FileStreamSinkConnectorConnector_0": {
         "info": {
         "name": "FileStreamSinkConnectorConnector_0",
         "config": {
             "connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
             "file": "/Users/smogili/file.txt",
             "tasks.max": "1",
             "topics": "datagen",
             "name": "FileStreamSinkConnectorConnector_0"
         },
         "tasks": [
             {
             "connector": "FileStreamSinkConnectorConnector_0",
             "task": 0
             }
         ],
         "type": "sink"
         },
         "status": {
         "name": "FileStreamSinkConnectorConnector_0",
         "connector": {
             "state": "RUNNING",
             "worker_id": "10.0.0.162:8083"
         },
         "tasks": [
             {
             "id": 0,
             "state": "RUNNING",
             "worker_id": "10.0.0.162:8083"
             }
         ],
         "type": "sink"
         }
     },
     "DatagenConnectorConnector_0": {
         "info": {
         "name": "DatagenConnectorConnector_0",
         "config": {
             "connector.class": "io.confluent.kafka.connect.datagen.DatagenConnector",
             "quickstart": "clickstream",
             "tasks.max": "1",
             "name": "DatagenConnectorConnector_0",
             "kafka.topic": "datagen"
         },
         "tasks": [
             {
             "connector": "DatagenConnectorConnector_0",
             "task": 0
             }
         ],
         "type": "source"
         },
         "status": {
         "name": "DatagenConnectorConnector_0",
         "connector": {
             "state": "RUNNING",
             "worker_id": "10.0.0.162:8083"
         },
         "tasks": [
             {
             "id": 0,
             "state": "RUNNING",
             "worker_id": "10.0.0.162:8083"
             }
         ],
         "type": "source"
         }
     }
     }
POST /connectors

Create a new connector, returning the current connector info if successful. Return 409 (Conflict) if rebalance is in process, or if the connector already exists.

Request JSON Object:
 
  • name (string) – Name of the connector to create
  • config (map) – Configuration parameters for the connector. All values should be strings.
Response JSON Object:
 
  • name (string) – Name of the created connector
  • config (map) – Configuration parameters for the connector
  • tasks (array) – List of active tasks generated by the connector
  • tasks[i].connector (string) – The name of the connector the task belongs to
  • tasks[i].task (int) – Task ID within the connector

Example request:

POST /connectors HTTP/1.1
Host: connect.example.com
Content-Type: application/json
Accept: application/json

{
    "name": "hdfs-sink-connector",
    "config": {
        "connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector",
        "tasks.max": "10",
        "topics": "test-topic",
        "hdfs.url": "hdfs://fakehost:9000",
        "hadoop.conf.dir": "/opt/hadoop/conf",
        "hadoop.home": "/opt/hadoop",
        "flush.size": "100",
        "rotate.interval.ms": "1000"
    }
}

Example response:

HTTP/1.1 201 Created
Content-Type: application/json

{
    "name": "hdfs-sink-connector",
    "config": {
        "connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector",
        "tasks.max": "10",
        "topics": "test-topic",
        "hdfs.url": "hdfs://fakehost:9000",
        "hadoop.conf.dir": "/opt/hadoop/conf",
        "hadoop.home": "/opt/hadoop",
        "flush.size": "100",
        "rotate.interval.ms": "1000"
    },
    "tasks": [
        { "connector": "hdfs-sink-connector", "task": 1 },
        { "connector": "hdfs-sink-connector", "task": 2 },
        { "connector": "hdfs-sink-connector", "task": 3 }
    ]
}
GET /connectors/(string: name)

Get information about the connector.

Response JSON Object:
 
  • name (string) – Name of the created connector
  • config (map) – Configuration parameters for the connector
  • tasks (array) – List of active tasks generated by the connector
  • tasks[i].connector (string) – The name of the connector the task belongs to
  • tasks[i].task (int) – Task ID within the connector

Example request:

GET /connectors/hdfs-sink-connector HTTP/1.1
Host: connect.example.com
Accept: application/json

Example response:

HTTP/1.1 200 OK
Content-Type: application/json

{
    "name": "hdfs-sink-connector",
    "config": {
        "connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector",
        "tasks.max": "10",
        "topics": "test-topic",
        "hdfs.url": "hdfs://fakehost:9000",
        "hadoop.conf.dir": "/opt/hadoop/conf",
        "hadoop.home": "/opt/hadoop",
        "flush.size": "100",
        "rotate.interval.ms": "1000"
    },
    "tasks": [
        { "connector": "hdfs-sink-connector", "task": 1 },
        { "connector": "hdfs-sink-connector", "task": 2 },
        { "connector": "hdfs-sink-connector", "task": 3 }
    ]
}
GET /connectors/(string: name)/config

Get the configuration for the connector.

Response JSON Object:
 
  • config (map) – Configuration parameters for the connector

Example request:

GET /connectors/hdfs-sink-connector/config HTTP/1.1
Host: connect.example.com
Accept: application/json

Example response:

HTTP/1.1 200 OK
Content-Type: application/json

{
    "connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector",
    "tasks.max": "10",
    "topics": "test-topic",
    "hdfs.url": "hdfs://fakehost:9000",
    "hadoop.conf.dir": "/opt/hadoop/conf",
    "hadoop.home": "/opt/hadoop",
    "flush.size": "100",
    "rotate.interval.ms": "1000"
}
PUT /connectors/(string: name)/config

Create a new connector using the given configuration, or update the configuration for an existing connector. Returns information about the connector after the change has been made. Return 409 (Conflict) if rebalance is in process.

Note

The payload is not wrapped in {"config": {}} as in the POST request. The config is directly provided.

Request JSON Object:
 
  • config (map) – Configuration parameters for the connector. All values should be strings.
Response JSON Object:
 
  • name (string) – Name of the created connector
  • config (map) – Configuration parameters for the connector
  • tasks (array) – List of active tasks generated by the connector
  • tasks[i].connector (string) – The name of the connector the task belongs to
  • tasks[i].task (int) – Task ID within the connector

Example request:

PUT /connectors/hdfs-sink-connector/config HTTP/1.1
Host: connect.example.com
Accept: application/json

{
    "connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector",
    "tasks.max": "10",
    "topics": "test-topic",
    "hdfs.url": "hdfs://fakehost:9000",
    "hadoop.conf.dir": "/opt/hadoop/conf",
    "hadoop.home": "/opt/hadoop",
    "flush.size": "100",
    "rotate.interval.ms": "1000"
}

Example response:

HTTP/1.1 201 Created
Content-Type: application/json

{
    "name": "hdfs-sink-connector",
    "config": {
        "connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector",
        "tasks.max": "10",
        "topics": "test-topic",
        "hdfs.url": "hdfs://fakehost:9000",
        "hadoop.conf.dir": "/opt/hadoop/conf",
        "hadoop.home": "/opt/hadoop",
        "flush.size": "100",
        "rotate.interval.ms": "1000"
    },
    "tasks": [
        { "connector": "hdfs-sink-connector", "task": 1 },
        { "connector": "hdfs-sink-connector", "task": 2 },
        { "connector": "hdfs-sink-connector", "task": 3 }
    ]
}

Note that in this example the return status indicates that the connector was Created. In the case of a configuration update the status would have been 200 OK.

GET /connectors/(string: name)/status

Gets the current status of the connector, including:

  • Whether it is running or restarting, or if it has failed or paused
  • Which worker it is assigned to
  • Error information if it has failed
  • The state of all its tasks
Response JSON Object:
 
  • name (string) – The name of the connector
  • connector (map) – The map containing connector status
  • tasks[i] (map) – The map containing the task status

Example request:

GET /connectors/hdfs-sink-connector/status HTTP/1.1
Host: connect.example.com

Example response:

HTTP/1.1 200 OK

{
    "name": "hdfs-sink-connector",
    "connector": {
        "state": "RUNNING",
        "worker_id": "fakehost:8083"
    },
    "tasks":
    [
        {
            "id": 0,
            "state": "RUNNING",
            "worker_id": "fakehost:8083"
        },
        {
            "id": 1,
            "state": "FAILED",
            "worker_id": "fakehost:8083",
            "trace": "org.apache.kafka.common.errors.RecordTooLargeException\n"
        }
    ]
}
POST /connectors/(string: name)/restart

Restart the connector. You may use the following query parameters to restart any combination of the Connector and/or Task instances for the connector.

Example request:

POST /connectors/hdfs-sink-connector/restart HTTP/1.1
Host: connect.example.com

Example response:

HTTP/1.1 200 OK

Query parameters:

Name Data type Required / Optional Default value Description
?includeTasks=<true|false> Boolean Optional False Specifies whether to restart the connector instance and task instances (includeTasks=true`) or just the connector instance (includeTasks=false)
?onlyFailed=<true|false> Boolean Optional False Specifies whether to restart just the instances with a FAILED status (onlyFailed=true) or all instances (onlyFailed=false)

The following responses will be outputted accordingly:

  • 200 OK: When the named connector exists and the server has successfully stopped and begun restarting only the Connector object (for example, includeTasks=false and onlyFailed=false). No response body will be returned.
  • 202 ACCEPTED: When the named connector exists and the server has successfully and durably recorded the request to stop and begin restarting at least one failed or running Connector object and Task instances (for example, includeTasks=true or onlyFailed=true). A response body will be returned, and it is similar to the GET /connector/{connectorName}/status response except that the state field is set to RESTARTING for all instances that will eventually be restarted.
  • 204 NO CONTENT: When the operation succeeded, but there is no content in the response.
  • 404 NOT FOUND: When the named connector does not exist.
  • 409 CONFLICT: When a rebalance is needed, forthcoming, or underway while restarting any of the connector and/or task objects; the reason may mention that the Connect cluster’s leader is not known, or that the worker assigned the connector can’t be found.
  • 500 INTERNAL SERVER ERROR: When the request timed out (takes more than 90 seconds), which means the request could not be durably recorded, perhaps because the worker or cluster are shutting down or because the worker receiving the request has temporarily lost contact with the Kafka cluster.

Example request:

POST /connectors/my-connector/restart?includeTasks=true&onlyFailed=true
Host: connect.example.com

Example response:

HTTP/1.1 202 ACCEPTED
{
      "name": "my-connector",
      "connector": {
          "state": "RUNNING",
          "worker_id": "fakehost1:8083"
      },
      "tasks":
      [
          {
              "id": 0,
              "state": "RUNNING",
              "worker_id": "fakehost2:8083"
          },
          {
              "id": 1,
              "state": "RESTARTING",
              "worker_id": "fakehost3:8083"
          },
          {
              "id": 2,
              "state": "RESTARTING",
              "worker_id": "fakehost1:8083"
          }
      ]
}

Important

The Connector instance and task 0 were not restarted, since they were RUNNING when this call was made. The user can monitor the progress of the restart with subsequent calls to the GET /connector/{connectorName}/status method.

PUT /connectors/(string: name)/pause

Pause the connector and its tasks, which stops message processing until the connector is resumed. This call asynchronous and the tasks will not transition to PAUSED state at the same time.

Example request:

PUT /connectors/hdfs-sink-connector/pause HTTP/1.1
Host: connect.example.com

Example response:

HTTP/1.1 202 Accepted
PUT /connectors/(string: name)/resume

Resume a paused connector or do nothing if the connector is not paused. This call asynchronous and the tasks will not transition to RUNNING state at the same time.

Example request:

PUT /connectors/hdfs-sink-connector/resume HTTP/1.1
Host: connect.example.com

Example response:

HTTP/1.1 202 Accepted
PUT /connectors/(string: name)/stop

Stops the connector but does not delete the connector. All tasks for the connector are shut down completely. When you resume a stopped connector, the connector starts on the assigned worker.

Example request:

PUT /connectors/hdfs-sink-connector/stop HTTP/1.1
Host: connect.example.com

Example response:

HTTP/1.1 202 Accepted
DELETE /connectors/(string: name)/

Delete a connector, halting all tasks and deleting its configuration. Return 409 (Conflict) if rebalance is in process.

Example request:

DELETE /connectors/hdfs-sink-connector HTTP/1.1
Host: connect.example.com

Example response:

HTTP/1.1 204 No Content

Tasks

GET /connectors/(string: name)/tasks

Get a list of tasks currently running for the connector.

Response JSON Object:
 
  • tasks (array) – List of active task configs that have been created by the connector
  • tasks[i].id (string) – The ID of task
  • tasks[i].id.connector (string) – The name of the connector the task belongs to
  • tasks[i].id.task (int) – Task ID within the connector
  • tasks[i].config (map) – Configuration parameters for the task

Example request:

GET /connectors/hdfs-sink-connector/tasks HTTP/1.1
Host: connect.example.com

Example response:

HTTP/1.1 200 OK

[
  {
    "id": {
        "connector": "hdfs-sink-connector",
        "task": 0
    },
    "config": {
        "task.class": "io.confluent.connect.hdfs.HdfsSinkTask",
        "topics": "test-topic",
        "hdfs.url": "hdfs://fakehost:9000",
        "hadoop.conf.dir": "/opt/hadoop/conf",
        "hadoop.home": "/opt/hadoop",
        "flush.size": "100",
        "rotate.interval.ms": "1000"
    }
  }
]
GET /connectors/(string: name)/tasks/(int: taskid)/status

Get a task’s status.

Example request:

GET /connectors/hdfs-sink-connector/tasks/1/status HTTP/1.1
Host: connect.example.com

Example response:

HTTP/1.1 200 OK

{"state":"RUNNING","id":1,"worker_id":"192.168.86.101:8083"}
POST /connectors/(string: name)/tasks/(int: taskid)/restart

Restart an individual task.

Example request:

POST /connectors/hdfs-sink-connector/tasks/1/restart HTTP/1.1
Host: connect.example.com

Example response:

HTTP/1.1 200 OK

Topics

GET /connectors/(string: name)/topics

Returns a list of connector topic names. There is no defined order in which the topics are returned and consecutive calls may return the same topic names but in different order. This request is independent of whether a connector is running, and will return an empty set of topics, both for connectors that don’t have active topics as well as non-existent connectors.

Response JSON Object:
 
  • topics (array) – The set of topic names the connector has been using since its creation or since the last time its set of active topics was reset.

Example request:

GET /connectors/hdfs-sink-connector/topics HTTP/1.1
Host: connect.example.com

Example response:

HTTP/1.1 200 OK

{
  "hdfs-sink-connector": {
    "topics": [
      "test-topic-1",
      "test-topic-2",
      "test-topic-3",
    ]
  }
}
PUT /connectors/(string: name)/topics/reset

Resets the set of topic names that the connector has been using since its creation or since the last time its set of active topics was reset.

Example request:

PUT /connectors/hdfs-sink-connector/topics/reset HTTP/1.1
Host: connect.example.com

Example response:

HTTP/1.1 200 OK

Offsets

GET /connectors/{connector}/offsets

Get the current offsets for a connector. Note that the connector must exist.

Source offsets response

{
  "offsets": [
      {
      "partition": {
          // Connector-defined source partition
      },
      "offset": {
          // Connector-defined source offset
      }
      }
  ]
}

Sink offsets response

{
  "offsets": [
      {
      "partition": {
          "kafka_topic": // Kafka topic
          "kafka_partition": // Kafka partition
      },
      "offset": {
          "kafka_offset": // Kafka offset
      }
      }
  ]
}
PATCH /connectors/{connector}/offsets

Alter the offsets for a connector. Note that the connector must exist and be in the stopped state. To stop the connector, use the PUT /connectors/{connector}/stop endpoint.

Source offsets request

{
  "offsets": [
      {
      "partition": {
          // Connector-defined source partition
      },
      "offset": {
          // Connector-defined source offset
      }
      }
  ]
}

Sink offsets request

{
    "offsets": [
        {
        "partition": {
            "kafka_topic": // Kafka topic
            "kafka_partition": // Kafka partition
        },
        "offset": {
            "kafka_offset": // Kafka offset
        }
        }
    ]
}

Sink offsets request (patch request)

For sink or source connector, the offset field may be null , which will reset the offset for that specific partition. For example, to reset the offset for partition 3 of a topic my-topic read by a sink connector:

{
  "offsets": [
      {
      "partition": {
          "kafka_topic": "T"
          "kafka_partition": 3
      },
      "offset": null
      }
  ]
}
DELETE /connectors/{connector}/offsets

Reset the offsets for a connector. Note that the connector must exist, and must be in the stopped state. To stop the connector, use the PUT /connectors/{connector}/stop endpoint.

PUT /connectors/{connector}/stop

Stop the connector and shut down its tasks, but do not delete it. Note that the connector must exist.

Alter / reset offset responses

For source and sink connectors, there are three possible cases for request responses when altering or resetting offsets:

  • Case 1: If the connector has implemented alterOffsets and everything has succeeded, the HTTP response status is 200 and the response body is similar to:

    {
       "message": "The offsets for this connector have been <reset | altered> successfully"
    }
    
  • Case 2: If the connector has not implemented alterOffsets, but everything has succeeded, the HTTP response status is 200 and the response body is similar to:

    {
      "message": "The framework-managed offsets for this connector have been  <reset | altered> successfully. However, if this connector manages offsets externally, they will need to be manually <altered|reset> in the system that the connector uses."
    }
    
  • Case 3: If anything fails, including:

    • Consumer group deletion for sink connectors
    • Zombie fencing for exactly-once source connectors
    • Invoking alterOffsets on a connector

    You receive a standard HTTP 500 response similar to the following:

    {
        "error_code": 500,
        "message": "Exception message here"
    }
    

Connector plugins

GET /connector-plugins/

Return a list of connector plugins and (optionally) transforms installed in the Kafka Connect cluster. Note that the API only checks for connectors on the worker that handles the request, which means you might see inconsistent results (especially during a rolling upgrade if you add new connector jars).

Response JSON Object:
 
  • class (string) – The connector class name
  • type (string) – The type of connector, sink or source
  • version (string) – The version of the connector plugin installed on the worker

Example request:

GET /connector-plugins/ HTTP/1.1
Host: connect.example.com

Example response:

HTTP/1.1 200 OK

[
    {
        "class": "io.confluent.connect.hdfs.HdfsSinkConnector"
        "type": "sink",
        "version": "10.2.1"
    },
    {
        "class": "io.confluent.connect.jdbc.JdbcSourceConnector"
        "type": "source",
        "version": "10.6.4"
    }
]

Query parameters:

Name Data type Required / Optional Default value Description
?connectorsOnly=<true|false> Boolean Optional True Specifies whether to list connector plugins and transforms (connectorsOnly=false) or just the connector plugins (connectorsOnly=true).
PUT /connector-plugins/(string: name)/config/validate

Validate the provided configuration values against the configuration definition. This API performs per config validation, returns suggested values and error messages during validation.

Request JSON Object:
 
  • config (map) – Configuration parameters for the connector. All values should be strings.
Response JSON Object:
 
  • name (string) – The class name of the connector plugin
  • error_count (int) – The total number of errors encountered during configuration validation
  • groups (array) – The list of groups used in configuration definitions
  • configs[i].definition (map) – The definition for a configuration parameter in the connector plugin, which includes the name, type, importance, and so forth
  • configs[i].value (map) – The current value for a configuration parameter, which includes the name, value, recommended values, and so forth

Example request:

PUT /connector-plugins/FileStreamSinkConnector/config/validate/ HTTP/1.1
Host: connect.example.com
Accept: application/json

{
    "connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
    "tasks.max": "1",
    "topics": "test-topic"
}

Example response:

HTTP/1.1 200 OK

{
    "name": "FileStreamSinkConnector",
    "error_count": 1,
    "groups": [
        "Common"
    ],
    "configs": [
        {
            "definition": {
                "name": "topics",
                "type": "LIST",
                "required": false,
                "default_value": "",
                "importance": "HIGH",
                "documentation": "",
                "group": "Common",
                "width": "LONG",
                "display_name": "Topics",
                "dependents": [],
                "order": 4
        },
            "value": {
                "name": "topics",
                "value": "test-topic",
                "recommended_values": [],
                "errors": [],
                "visible": true
            }
        },
        {
            "definition": {
                "name": "file",
                "type": "STRING",
                "required": true,
                "default_value": "",
                "importance": "HIGH",
                "documentation": "Destination filename.",
                "group": null,
                "width": "NONE",
                "display_name": "file",
                "dependents": [],
                "order": -1
            },
            "value": {
                "name": "file",
                "value": null,
                "recommended_values": [],
                "errors": [
                    "Missing required configuration \"file\" which has no default value."
                ],
                "visible": true
            }
        },
        {
            "definition": {
                "name": "name",
                "type": "STRING",
                "required": true,
                "default_value": "",
                "importance": "HIGH",
                "documentation": "Globally unique name to use for this connector.",
                "group": "Common",
                "width": "MEDIUM",
                "display_name": "Connector name",
                "dependents": [],
                "order": 1
            },
            "value": {
                "name": "name",
                "value": "test",
                "recommended_values": [],
                "errors": [],
                "visible": true
            }
        },
        {
            "definition": {
                "name": "tasks.max",
                "type": "INT",
                "required": false,
                "default_value": "1",
                "importance": "HIGH",
                "documentation": "Maximum number of tasks to use for this connector.",
                "group": "Common",
                "width": "SHORT",
                "display_name": "Tasks max",
                "dependents": [],
                "order": 3
        },
            "value": {
                "name": "tasks.max",
                "value": "1",
                "recommended_values": [],
                "errors": [],
                "visible": true
            }
        },
        {
            "definition": {
                "name": "connector.class",
                "type": "STRING",
                "required": true,
                "default_value": "",
                "importance": "HIGH",
                "documentation": "Name or alias of the class for this connector. Must be a subclass of org.apache.kafka.connect.connector.Connector. If the connector is org.apache.kafka.connect.file.FileStreamSinkConnector, you can either specify this full name,  or use \"FileStreamSink\" or \"FileStreamSinkConnector\" to make the configuration a bit shorter",
                "group": "Common",
                "width": "LONG",
                "display_name": "Connector class",
                "dependents": [],
                "order": 2
            },
            "value": {
                "name": "connector.class",
                "value": "org.apache.kafka.connect.file.FileStreamSinkConnector",
                "recommended_values": [],
                "errors": [],
                "visible": true
            }
        }
    ]
}