.. _connect_http-connector: HTTP Sink Connector for |cp| ============================ The |kconnect-long| HTTP Sink Connector integrates |ak-tm| with an API via HTTP or HTTPS. The connector consumes records from |ak| topic(s) and converts each record value to a String or a JSON with ``request.body.format=json`` before sending it in the request body to the configured ``http.api.url``, which optionally can reference the record key and/or topic name. The targeted API must support either a ``POST`` or ``PUT`` request. The connector batches records up to the set ``batch.max.size`` before sending the batched request to the API. Each record is converted to its String representation or its Json representation with ``request.body.format=json`` and then separated with the ``batch.separator``. The HTTP Sink Connector supports connecting to APIs using SSL along with Basic Authentication, OAuth2, or a Proxy Authentication Server. Install the HTTP Connector -------------------------- .. include:: ../includes/connector-install.rst .. include:: ../includes/connector-install-hub.rst .. codewithvars:: bash confluent-hub install confluentinc/kafka-connect-http:latest .. include:: ../includes/connector-install-version.rst .. codewithvars:: bash confluent-hub install confluentinc/kafka-connect-http:1.0.3 ------------------------------ Install the connector manually ------------------------------ `Download and extract the ZIP file `__ for your connector and then follow the manual connector installation :ref:`instructions `. License ------- .. include:: ../includes/enterprise-license.rst See :ref:`connect-https-sink-connector-license-config` for license properties and :ref:`http_sink_license-topic-configuration` for information about the license topic. Configuration Properties ------------------------ For a complete list of configuration properties for this connector, see :ref:`http_sink_connector_config`. .. include:: ../includes/connect-to-cloud-note.rst .. _http_connector_quickstart: Quick Start ----------- This quick start uses the HTTP Sink Connector to consume records and send HTTP requests to a demo HTTP service running locally that is running without any authentication. .. include:: ../../includes/install-cli-prereqs.rst #. Before starting the connector, clone and run the `kafka-connect-http-demo `_ app on your machine. .. codewithvars:: bash git clone https://github.com/confluentinc/kafka-connect-http-demo.git cd kafka-connect-http-demo mvn spring-boot:run -Dspring.profiles.active=simple-auth #. Install the connector through the :ref:`confluent_hub_client`. .. include:: ../../includes/cli-new.rst .. codewithvars:: bash |confluent_start| #. Produce test data to the ``http-messages`` topic in |ak| using the |confluent-cli| :ref:`confluent_local_produce` command. .. codewithvars:: bash seq 10 | |confluent_produce| http-messages #. Create a ``http-sink.json`` file with the following contents: .. codewithvars:: json { "name": "HttpSink", "config": { "topics": "http-messages", "tasks.max": "1", "connector.class": "io.confluent.connect.http.HttpSinkConnector", "http.api.url": "http://localhost:8080/api/messages", "value.converter": "org.apache.kafka.connect.storage.StringConverter", "confluent.topic.bootstrap.servers": "localhost:9092", "confluent.topic.replication.factor": "1", "reporter.bootstrap.servers": "localhost:9092", "reporter.result.topic.name": "success-responses", "reporter.result.topic.replication.factor": "1", "reporter.error.topic.name":"error-responses", "reporter.error.topic.replication.factor":"1" } } #. Load the HTTP Sink Connector. .. include:: ../../includes/confluent-local-consume-limit.rst .. codewithvars:: bash |confluent_load| HttpSink|dash| -d http-sink.json .. important:: Don't use the :ref:`confluent_local` commands in production environments. #. Confirm that the connector is in a ``RUNNING`` state. .. codewithvars:: bash |confluent_status| HttpSink #. Confirm that the data was sent to the HTTP endpoint. .. codewithvars:: bash curl localhost:8080/api/messages .. note:: Before running other examples, kill the demo app (``CTRL + C``) to avoid port conflicts. Examples -------- * :ref:`http_connector_authentication` * :ref:`http_connector_header_forwarding` * :ref:`http_connector_key_topic_substitution` * :ref:`http_connector_regex` * :ref:`http_connector_tombstone` * :ref:`http_connector_retries` * :ref:`http_connector_reporter` .. _http_connector_authentication: -------------- Authentication -------------- The HTTP Connector can run with SSL enabled/disabled and also supports various authentication types like Basic Auth, OAuth2, and Proxy Server Auth. .. _http_connector_basic_auth_example: ^^^^^^^^^^^^^^^^^^^^^^^^^^^^ Basic Authentication Example ^^^^^^^^^^^^^^^^^^^^^^^^^^^^ #. Run the demo app with the ``basic-auth`` Spring profile. .. codewithvars:: bash mvn spring-boot:run -Dspring.profiles.active=basic-auth .. note:: If the demo app is already running, you will need to kill that instance (``CTRL + C``) before running a new instance to avoid port conflicts. #. Create a ``http-sink.properties`` file with the following contents: :: name=HttpSinkBasicAuth topics=http-messages tasks.max=1 connector.class=io.confluent.connect.http.HttpSinkConnector # key/val converters key.converter=org.apache.kafka.connect.storage.StringConverter value.converter=org.apache.kafka.connect.storage.StringConverter # licensing for local single-node Kafka cluster confluent.topic.bootstrap.servers=localhost:9092 confluent.topic.replication.factor=1 # connect reporter required bootstrap server reporter.bootstrap.servers=localhost:9092 reporter.result.topic.name=success-responses reporter.result.topic.replication.factor=1 reporter.error.topic.name=error-responses reporter.error.topic.replication.factor=1 # http sink connector configs http.api.url=http://localhost:8080/api/messages auth.type=BASIC connection.user=admin connection.password=password .. note:: For details about using this connector with |kconnect-long| Reporter, see :ref:`userguide-connect-reporter`. #. Run and validate the connector as described in the :ref:`http_connector_quickstart`. .. _http_connector_oauth2_example: ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ OAuth2 Authentication Example ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ .. note:: The connector's OAuth2 configuration only allows for use of the Client Credentials grant type. #. Run the demo app with the ``oauth2`` Spring profile. .. codewithvars:: bash mvn spring-boot:run -Dspring.profiles.active=oauth2 #. Create a ``http-sink.properties`` file with the following contents: :: name=HttpSinkOAuth2 topics=http-messages tasks.max=1 connector.class=io.confluent.connect.http.HttpSinkConnector # key/val converters key.converter=org.apache.kafka.connect.storage.StringConverter value.converter=org.apache.kafka.connect.storage.StringConverter # licensing for local single-node Kafka cluster confluent.topic.bootstrap.servers=localhost:9092 confluent.topic.replication.factor=1 # connect reporter required bootstrap server reporter.bootstrap.servers=localhost:9092 reporter.result.topic.name=success-responses reporter.result.topic.replication.factor=1 reporter.error.topic.name=error-responses reporter.error.topic.replication.factor=1 # http sink connector configs http.api.url=http://localhost:8080/api/messages auth.type=OAUTH2 oauth2.token.url=http://localhost:8080/oauth/token oauth2.client.id=kc-client oauth2.client.secret=kc-secret .. note:: For details about using this connector with |kconnect-long| Reporter, see :ref:`userguide-connect-reporter`. #. Run and validate the connector as described in the :ref:`http_connector_quickstart`. .. _http_connector_ssl_example: ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ SSL with Basic Authentication Example ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ #. Run the demo app with the ``ssl-auth`` Spring profile. .. codewithvars:: bash mvn spring-boot:run -Dspring.profiles.active=ssl-auth #. Create a ``http-sink.properties`` file with the following contents: :: name=SSLHttpSink topics=string-topic tasks.max=1 connector.class=io.confluent.connect.http.HttpSinkConnector # key/val converters key.converter=org.apache.kafka.connect.storage.StringConverter value.converter=org.apache.kafka.connect.storage.StringConverter # licensing for local single-node Kafka cluster confluent.topic.bootstrap.servers=localhost:9092 confluent.topic.replication.factor=1 # connect reporter required bootstrap server reporter.bootstrap.servers=localhost:9092 reporter.result.topic.name=success-responses reporter.result.topic.replication.factor=1 reporter.error.topic.name=error-responses reporter.error.topic.replication.factor=1 # http sink connector configs http.api.url=https://localhost:8443/api/messages # http sink connector SSL config ssl.enabled=true https.ssl.truststore.location=/path/to/http-sink-demo/src/main/resources/localhost-keystore.jks https.ssl.truststore.type=JKS https.ssl.truststore.password=changeit https.ssl.keystore.location=/path/to/http-sink-demo/src/main/resources/localhost-keystore.jks https.ssl.keystore.type=JKS https.ssl.keystore.password=changeit https.ssl.key.password=changeit https.ssl.protocol=TLSv1.2 auth.type=BASIC connection.user=admin connection.password=password .. tip:: Don't forget to update the ``https.ssl.truststore.location`` and ``https.ssl.keystore.location`` with the path to your ``http-sink-demo`` project. #. Run and validate the connector as described in the :ref:`http_connector_quickstart`. .. _http_connector_proxy_auth_example: ^^^^^^^^^^^^^^^^^^^^^^^^^^^^ Proxy Authentication Example ^^^^^^^^^^^^^^^^^^^^^^^^^^^^ .. note:: The proxy authentication example is dependent on MacOS X 10.6.8 or higher due to the proxy that is utilized. #. Run the demo app with the ``simple-auth`` Spring profile. .. codewithvars:: bash mvn spring-boot:run -Dspring.profiles.active=simple-auth #. Install `Squidman Proxy `_. #. In Squidman preferences/general, set the http port to ``3128``. #. In Squidman preferences/template, add the following: :: auth_param basic program /usr/local/squid/libexec/basic_ncsa_auth /etc/squid/passwords auth_param basic realm proxy acl authenticated proxy_auth REQUIRED http_access allow authenticated #. Create a credentials file for the proxy. .. codewithvars:: bash sudo mkdir /etc/squid sudo htpasswd -c /etc/squid/passwords proxyuser # set password to proxypassword #. Open the Squidman application and select ``Start Squid``. #. Create a ``http-sink.properties`` file with the following contents: :: name=HttpSinkProxyAuth topics=http-messages tasks.max=1 connector.class=io.confluent.connect.http.HttpSinkConnector # key/val converters key.converter=org.apache.kafka.connect.storage.StringConverter value.converter=org.apache.kafka.connect.storage.StringConverter # licensing for local single-node Kafka cluster confluent.topic.bootstrap.servers=localhost:9092 confluent.topic.replication.factor=1 # connect reporter required bootstrap server reporter.bootstrap.servers=localhost:9092 reporter.result.topic.name=success-responses reporter.result.topic.replication.factor=1 reporter.error.topic.name=error-responses reporter.error.topic.replication.factor=1 # http sink connector configs http.api.url=http://localhost:8080/api/messages http.proxy.host=localhost http.proxy.port=3128 http.proxy.user=proxyuser http.proxy.password=proxypassword #. Run and validate the connector as described in the :ref:`http_connector_quickstart`. .. _http_connector_header_forwarding: ----------------- Header Forwarding ----------------- The connector forwards any headers configured via the ``headers`` property. Multiple headers can be separated via the ``|`` but this is configurable by setting ``header.separator``. .. note:: Headers on the incoming |ak| records will not be forwarded. .. _http_connector_header_forwarding_example: ^^^^^^^^^^^^^^^^^^^^^^^^^ Header Forwarding Example ^^^^^^^^^^^^^^^^^^^^^^^^^ #. Run the demo app with the ``basic-auth`` Spring profile. .. codewithvars:: bash mvn spring-boot:run -Dspring.profiles.active=basic-auth #. Create a ``http-sink.properties`` file with the following contents: :: name=HttpSinkBasicAuth topics=http-messages tasks.max=1 connector.class=io.confluent.connect.http.HttpSinkConnector # key/val converters key.converter=org.apache.kafka.connect.storage.StringConverter value.converter=org.apache.kafka.connect.storage.StringConverter # licensing for local single-node Kafka cluster confluent.topic.bootstrap.servers=localhost:9092 confluent.topic.replication.factor=1 # connect reporter required bootstrap server reporter.bootstrap.servers=localhost:9092 reporter.result.topic.name=success-responses reporter.result.topic.replication.factor=1 reporter.error.topic.name=error-responses reporter.error.topic.replication.factor=1 # http sink connector configs http.api.url=http://localhost:8080/api/messages auth.type=BASIC connection.user=admin connection.password=password headers=Forward-Me:header_value|Another-Header:another_value #. Run and validate the connector as described in the :ref:`http_connector_quickstart`. .. _http_connector_key_topic_substitution: -------------------------- Key and Topic Substitution -------------------------- The record's value is the only piece of data forwarded to the API by default. However, the record key and/or topic can be substituted into the ``http.api.url`` so that it can be sent to the API. The example below illustrates how this can be done. Notice the structure of the ``http.api.url``. .. _http_connector_key_topic_substitution_example: ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ Key and Topic Substitution Example ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ #. Run the demo app with the ``simple-auth`` Spring profile. .. codewithvars:: bash mvn spring-boot:run -Dspring.profiles.active=simple-auth #. Create a ``http-sink.properties`` file with the following contents: :: name=KeyTopicSubstitution topics=key-val-messages tasks.max=1 connector.class=io.confluent.connect.http.HttpSinkConnector # key/val converters key.converter=org.apache.kafka.connect.storage.StringConverter value.converter=org.apache.kafka.connect.storage.StringConverter # licensing for local single-node Kafka cluster confluent.topic.bootstrap.servers=localhost:9092 confluent.topic.replication.factor=1 # connect reporter required bootstrap server reporter.bootstrap.servers=localhost:9092 reporter.result.topic.name=success-responses reporter.result.topic.replication.factor=1 reporter.error.topic.name=error-responses reporter.error.topic.replication.factor=1 # http sink connector configs auth.type=NONE confluent.topic.bootstrap.servers=localhost:9092 reporter.bootstrap.servers=localhost:9092 confluent.topic.replication.factor=1 http.api.url=http://localhost:8080/api/messages/${topic}/${key} #. Produce a set of messages with keys and values. .. include:: ../../includes/confluent-local-consume-limit.rst .. codewithvars:: bash |confluent_produce| key-val-topic|dash| --property parse.key=true --property key.separator=, > 1,value > 2,another-value #. Run and validate the connector as described in the :ref:`http_connector_quickstart`. .. tip:: Run ``curl localhost:8080/api/messages | jq`` to see that the messages key and topic were saved. .. _http_connector_regex: ------------------ Regex Replacements ------------------ The connector can be configured to match on ``regex.patterns`` and replace any matches with the ``regex.replacements``. The regex pattern match and replacement is done after the record has been converted into its string representation. For using multiple regex patterns, the default separator is ``~`` but can be configured via ``regex.separator``. .. _http_connector_regex_example: ^^^^^^^^^^^^^^^^^^^^^^^^^ Regex Replacement Example ^^^^^^^^^^^^^^^^^^^^^^^^^ #. Run the demo app with the ``basic-auth`` Spring profile. .. codewithvars:: bash mvn spring-boot:run -Dspring.profiles.active=basic-auth #. Create a ``http-sink.properties`` file with the following contents: :: name=RegexHttpSink topics=email-topic,non-email-topic tasks.max=1 connector.class=io.confluent.connect.http.HttpSinkConnector # key/val converters key.converter=org.apache.kafka.connect.storage.StringConverter value.converter=org.apache.kafka.connect.storage.StringConverter # licensing for local single-node Kafka cluster confluent.topic.bootstrap.servers=localhost:9092 confluent.topic.replication.factor=1 # connect reporter required bootstrap server reporter.bootstrap.servers=localhost:9092 reporter.result.topic.name=success-responses reporter.result.topic.replication.factor=1 reporter.error.topic.name=error-responses reporter.error.topic.replication.factor=1 # http sink connector configs http.api.url=http://localhost:8080/api/messages auth.type=BASIC connection.user=admin connection.password=password # regex to mask emails regex.patterns=^.+@.+$ regex.replacements=******** #. Publish messages to the topics that are configured. Emails should be redacted with ``********`` before being sent to the demo app. .. codewithvars:: bash |confluent_produce| email-topic > example@domain.com > another@email.com |confluent_produce| non-email-topic > not an email > another normal string #. Run and validate the connector as described in the :ref:`http_connector_quickstart`. .. note:: Regex replacement is not supported when ``request.body.format`` configuration is set to ``JSON``. .. _http_connector_tombstone: ----------------- Tombstone Records ----------------- A record that has a non-null key and a null value is refered to as a tombstone in |ak|. These records are handled specially by the HTTP Sink connector. By default, tombstone records are ignored but this behavior can be configured with the ``behavior.on.null.values`` property. The other two configuration options are: * ``fail``: If a tombstone record is received, the connector task is killed immediately. * ``delete``: The connector attempts to send a ``DELETE`` request to the configured API. If key substitution is being used (ex. ``localhost:8080/api/messages/${key}``), a ``DELETE`` request is sent to the configured URL with the key injected into the ``${key}`` placeholder. If key substitution is not configured, the record key is appended to the end of the URI and a ``DELETE`` is sent to the formatted URL. ^^^^^^^^^^^^^^^^^^ Delete URL Example ^^^^^^^^^^^^^^^^^^ :: # EXAMPLE - KEY SUBSTITUTION http.api.url=http://localhost:8080/api/messages/${key} behavior.on.null.values=delete # SinkRecord with key = 12, value = "mark@email.com" # DELETE sent to http://localhost:8080/api/messages/12 # EXAMPLE - KEY APPENDED TO END http.api.url=http://localhost:8080/api/messages behavior.on.null.values=delete # SinkRecord with key = 25, value = "jane@email.com" # DELETE sent to http://localhost:8080/api/messages/25 .. _http_connector_tombstone_example: ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ Delete Behavior on Null Values Example ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ #. Run the demo app with the ``simple-auth`` Spring profile. .. codewithvars:: bash mvn spring-boot:run -Dspring.profiles.active=basic-auth #. Create a ``http-sink.properties`` file with the following contents: :: name=DeleteNullHttpSink topics=http-messages tasks.max=1 connector.class=io.confluent.connect.http.HttpSinkConnector # key/val converters key.converter=org.apache.kafka.connect.storage.StringConverter value.converter=org.apache.kafka.connect.storage.StringConverter # licensing for local single-node Kafka cluster confluent.topic.bootstrap.servers=localhost:9092 confluent.topic.replication.factor=1 # connect reporter required bootstrap server reporter.bootstrap.servers=localhost:9092 reporter.result.topic.name=success-responses reporter.result.topic.replication.factor=1 reporter.error.topic.name=error-responses reporter.error.topic.replication.factor=1 # http sink connector configs http.api.url=http://localhost:8080/api/messages auth.type=BASIC connection.user=admin connection.password=password behavior.on.null.values=delete #. Publish messages to the topic that have keys and values. .. include:: ../../includes/confluent-local-consume-limit.rst .. codewithvars:: bash |confluent_produce| http-messages|dash| --property parse.key=true --property key.separator=, > 1,message-value > 2,another-message #. Run and validate the connector as described in the :ref:`http_connector_quickstart`. .. tip:: Check for messages in the demo API with this command: ``curl http://localhost:8080/api/messages -H 'Authorization: Basic YWRtaW46cGFzc3dvcmQ=' | jq`` #. Publish messages to the topic that have keys with null values (tombstones). .. note:: This cannot be done with :litwithvars:`|confluent_produce|` but there is an API in the demo app to send tombstones. :: curl -X POST \ 'localhost:8080/api/tombstone?topic=http-messages&key=1' \ -H 'Authorization: Basic YWRtaW46cGFzc3dvcmQ=' #. Validate that the demo app deleted the messages. :: curl http://localhost:8080/api/messages \ -H 'Authorization: Basic YWRtaW46cGFzc3dvcmQ=' | jq .. _http_connector_retries: ------- Retries ------- In case of failures, the connector can be configured to retry the operations by configuring ``max.retries`` and ``retry.backoff.ms`` parameters. ^^^^^^^^^^^^^^^ Retries Example ^^^^^^^^^^^^^^^ #. Run the demo app with the ``basic-auth`` Spring profile. .. codewithvars:: bash mvn spring-boot:run -Dspring.profiles.active=basic-auth #. Create a ``http-sink.properties`` file with the following contents: :: name=RetriesExample topics=http-messages tasks.max=1 connector.class=io.confluent.connect.http.HttpSinkConnector # key/val converters key.converter=org.apache.kafka.connect.storage.StringConverter value.converter=org.apache.kafka.connect.storage.StringConverter # licensing for local single-node Kafka cluster confluent.topic.bootstrap.servers=localhost:9092 confluent.topic.replication.factor=1 # connect reporter required bootstrap server reporter.bootstrap.servers=localhost:9092 reporter.result.topic.name=success-responses reporter.result.topic.replication.factor=1 reporter.error.topic.name=error-responses reporter.error.topic.replication.factor=1 # http sink connector configs http.api.url=http://localhost:8080/api/messages auth.type=BASIC connection.user=admin connection.password=password behavior.on.null.values=delete # retry configurations max.retries=20 retry.backoff.ms=5000 #. Publish messages to the topic that have keys and values. .. include:: ../../includes/confluent-local-consume-limit.rst .. codewithvars:: bash |confluent_produce| http-messages|dash| --property parse.key=true --property key.separator=, > 1,message-value > 2,another-message #. Stop the demo app. #. Run and validate the connector as described in the :ref:`http_connector_quickstart`. #. The Connector will retry for maximum 20 times with an initial backoff duration of 5000ms. If the http operation is successful then the retry will be stopped. In this case the connector will retry for 20 times and the connector task will get failed. #. The default value for ``max.retries`` is 10 and for ``retry.backoff.ms`` is 3000ms. .. _http_connector_reporter: -------- Reporter -------- The Connector can be configured to capture the success/failure responses from http operations by configuring reporter parameters. .. _http_connector_reporter_example: ^^^^^^^^^^^^^^^^ Reporter Example ^^^^^^^^^^^^^^^^ #. Run the demo app with the ``basic-auth`` Spring profile. .. codewithvars:: bash mvn spring-boot:run -Dspring.profiles.active=basic-auth #. Create a ``http-sink.properties`` file with the following contents: :: name=ReporterExample topics=http-messages tasks.max=1 connector.class=io.confluent.connect.http.HttpSinkConnector # key/val converters key.converter=org.apache.kafka.connect.storage.StringConverter value.converter=org.apache.kafka.connect.storage.StringConverter # licensing for local single-node Kafka cluster confluent.topic.bootstrap.servers=localhost:9092 confluent.topic.replication.factor=1 # http sink connector configs http.api.url=http://localhost:8080/api/messages auth.type=BASIC connection.user=admin connection.password=password behavior.on.null.values=delete # reporter configurations reporter.bootstrap.servers=localhost:9092 reporter.result.topic.name=success-responses reporter.result.topic.replication.factor=1 reporter.error.topic.name=error-responses reporter.error.topic.replication.factor=1 reporter.admin.bootstrap.servers=.confluent.cloud:9092 reporter.admin.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule / required username= password= reporter.admin.security.protocol=SASL_SSL reporter.admin.sasl.mechanism=PLAIN" reporter.producer.bootstrap.servers=.confluent.cloud:9092 reporter.producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule / required username= password= reporter.producer.security.protocol=SASL_SSL reporter.producer.sasl.mechanism=PLAIN" .. note:: For additional information about |kconnect| Reporter for secure environments, see :ref:`Kafka Connect Reporter `. #. Publish messages to the topic that have keys and values. .. include:: ../../includes/confluent-local-consume-limit.rst .. codewithvars:: bash |confluent_produce| http-messages|dash| --property parse.key=true --property key.separator=, > 1,message-value > 2,another-message #. Run and validate the connector as described in the :ref:`http_connector_quickstart`. #. Consume the records from ``success-responses`` and ``error-responses`` topic to see the http operation response. .. codewithvars:: bash kafkacat -C -b localhost:9092 -t success-responses -J |jq .. codewithvars:: json { "topic": "success-responses", "partition": 0, "offset": 0, "tstype": "create", "ts": 1581579911854, "headers": [ "input_record_offset", "0", "input_record_timestamp", "1581488456476", "input_record_partition", "0", "input_record_topic", "http-connect" ], "key": null, "payload": "{\"id\":1,\"message\":\"1,message-value\"}" } { "topic": "success-responses", "partition": 0, "offset": 1, "tstype": "create", "ts": 1581579911854, "headers": [ "input_record_offset", "1", "input_record_timestamp", "1581488456476", "input_record_partition", "0", "input_record_topic", "http-connect" ], "key": null, "payload": "{\"id\":2,\"message\":\"2,message-value\"}" } In case of retryable errors (i.e., errors with a 5xx status code), a response like the one shown below is included in the error-responses topic. .. codewithvars:: bash kafkacat -C -b localhost:9092 -t error-responses -J |jq .. codewithvars:: json { "topic": "error-responses", "partition": 0, "offset": 0, "tstype": "create", "ts": 1581579911854, "headers": [ "input_record_offset", "0", "input_record_timestamp", "1581579931450", "input_record_partition", "0", "input_record_topic", "http-messages" ], "key": null, "payload": "Retry time lapsed, unable to process HTTP request. Error while processing HTTP request with Url : http://localhost:8080/api/messages, Payload : 6,test, Status code : 500, Reason Phrase : , Response Content : {\"timestamp\":\"2020-02-11T10:44:41.574+0000\",\"status\":500,\"error\":\"Internal Server Error\",\"message\":\"Unresolved compilation problem: \\n\\tlog cannot be resolved\\n\",\"path\":\"/api/messages\"}, " } Suggested Reading ----------------- Blog post: `Webify Event Streams Using the Kafka Connect HTTP Sink Connector `__ Additional Documentation ------------------------ .. toctree:: :titlesonly: connector_config changelog