.. _connect_http-connector: |kconnect-long| HTTP Sink Connector =================================== The |kconnect-long| HTTP Sink Connector integrates |ak| with an API via HTTP or HTTPS. The connector consumes records from |ak| topic(s) and converts each record value to a String before sending it in the request body to the configured ``http.api.url``, which optionally can reference the record key and/or topic name. The targeted API must support either a ``POST`` or ``PUT`` request. The connector batches records up to the set ``batch.max.size`` before sending the batched request to the API. Each record is converted to its String representation and then separated with the ``batch.separator``. The HTTP Sink Connector supports connecting to APIs using SSL along with Basic Authentication, OAuth2, or a Proxy Authentication Server. .. _http_connector_features: Install HTTP Connector ---------------------- .. include:: ../includes/connector-install.rst .. include:: ../includes/connector-install-hub.rst .. codewithvars:: bash confluent-hub install confluentinc/kafka-connect-http:latest .. include:: ../includes/connector-install-version.rst .. codewithvars:: bash confluent-hub install confluentinc/kafka-connect-http:1.0.3 -------------------------- Install Connector Manually -------------------------- `Download and extract the ZIP file `_ for your connector and then follow the manual connector installation :ref:`instructions `. License ------- .. include:: ../includes/enterprise-license.rst See :ref:`connect-https-sink-connector-license-config` for license properties and :ref:`http_sink_license-topic-configuration` for information about the license topic. .. _http_connector_quickstart: Quick Start ----------- This quick start uses the HTTP Sink Connector to consume records and send HTTP requests to a demo HTTP service running locally that is running without any authentication. Additional examples can be found in the `Feature Descriptions and Examples `_ section below. .. include:: ../../includes/install-cli-prereqs.rst #. Before starting the connector, clone and run the `kafka-connect-http-demo `_ app on your machine. .. codewithvars:: bash git clone https://github.com/confluentinc/kafka-connect-http-demo.git cd kafka-connect-http-demo mvn spring-boot:run -Dspring.profiles.active=simple-auth #. Install the connector through the :ref:`confluent_hub_client`. .. codewithvars:: bash # run from your Confluent Platform installation directory confluent-hub install confluentinc/kafka-connect-http:latest #. Start the Confluent Platform. .. codewithvars:: bash |confluent_start| #. `Produce `_ test data to the ``http-messages`` topic in |ak|. .. codewithvars:: bash seq 10 | |confluent_produce| http-messages #. Create a ``http-sink.json`` file with the following contents: .. codewithvars:: json { "name": "HttpSink", "config": { "topics": "http-messages", "tasks.max": "1", "connector.class": "io.confluent.connect.http.HttpSinkConnector", "http.api.url": "http://localhost:8080/api/messages", "value.converter": "org.apache.kafka.connect.storage.StringConverter", "confluent.topic.bootstrap.servers": "localhost:9092", "confluent.topic.replication.factor": "1" } } #. Load the HTTP Sink Connector. .. include:: ../../includes/confluent-local-consume-limit.rst .. codewithvars:: bash |confluent_load| httpsink|dash| -d http-sink.json .. important:: Don't use the :ref:`cli` in production environments. #. Confirm that the connector is in a ``RUNNING`` state. .. codewithvars:: bash |confluent_status| httpsink #. Confirm that the data was sent to the HTTP endpoint. .. codewithvars:: bash curl localhost:8080/api/messages .. note:: Before running other examples, kill the demo app (``CTRL + C``) to avoid port conflicts. Examples -------- * :ref:`http_connector_authentication` * :ref:`http_connector_message_types` * :ref:`http_connector_header_forwarding` * :ref:`http_connector_key_topic_substitution` * :ref:`http_connector_regex` * :ref:`http_connector_tombstone` .. _http_connector_authentication: -------------- Authentication -------------- The HTTP Connector can run with SSL enabled/disabled and also supports various authentication types like Basic Auth, OAuth2, and Proxy Server Auth. .. _http_connector_basic_auth_example: ^^^^^^^^^^^^^^^^^^^^^^^^^^^^ Basic Authentication Example ^^^^^^^^^^^^^^^^^^^^^^^^^^^^ #. Run the demo app with the ``basic-auth`` Spring profile. .. codewithvars:: bash mvn spring-boot:run -Dspring.profiles.active=basic-auth .. note:: If the demo app is already running, you will need to kill that instance (``CTRL + C``) before running a new instance to avoid port conflicts. #. Create a ``http-sink.properties`` file with the following contents: :: name=HttpSinkBasicAuth topics=http-messages tasks.max=1 connector.class=io.confluent.connect.http.HttpSinkConnector # key/val converters key.converter=org.apache.kafka.connect.storage.StringConverter value.converter=org.apache.kafka.connect.storage.StringConverter # licensing for local single-node Kafka cluster confluent.topic.bootstrap.servers=localhost:9092 confluent.topic.replication.factor=1 # http sink connector configs http.api.url=http://localhost:8080/api/messages auth.type=BASIC connection.user=admin connection.password=password #. Run and validate the connector as described in the :ref:`http_connector_quickstart`. .. _http_connector_oauth2_example: ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ OAuth2 Authentication Example ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ .. note:: The connector's OAuth2 configuration only allows for use of the Client Credentials grant type. #. Run the demo app with the ``oauth2`` Spring profile. .. codewithvars:: bash mvn spring-boot:run -Dspring.profiles.active=oauth2 #. Create a ``http-sink.properties`` file with the following contents: :: name=HttpSinkOAuth2 topics=http-messages tasks.max=1 connector.class=io.confluent.connect.http.HttpSinkConnector # key/val converters key.converter=org.apache.kafka.connect.storage.StringConverter value.converter=org.apache.kafka.connect.storage.StringConverter # licensing for local single-node Kafka cluster confluent.topic.bootstrap.servers=localhost:9092 confluent.topic.replication.factor=1 # http sink connector configs http.api.url=http://localhost:8080/api/messages auth.type=OAUTH2 oauth2.token.url=http://localhost:8080/oauth/token oauth2.client.id=kc-client oauth2.client.secret=kc-secret #. Run and validate the connector as described in the :ref:`http_connector_quickstart`. .. _http_connector_ssl_example: ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ SSL with Basic Authentication Example ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ #. Run the demo app with the ``ssl-auth`` Spring profile. .. codewithvars:: bash mvn spring-boot:run -Dspring.profiles.active=ssl-auth #. Create a ``http-sink.properties`` file with the following contents: :: name=SSLHttpSink topics=string-topic tasks.max=1 connector.class=io.confluent.connect.http.HttpSinkConnector # key/val converters key.converter=org.apache.kafka.connect.storage.StringConverter value.converter=org.apache.kafka.connect.storage.StringConverter # licensing for local single-node Kafka cluster confluent.topic.bootstrap.servers=localhost:9092 confluent.topic.replication.factor=1 # http sink connector configs http.api.url=https://localhost:8443/api/messages # http sink connector SSL config ssl.enabled=true https.ssl.truststore.location=/path/to/http-sink-demo/src/main/resources/localhost-keystore.jks https.ssl.truststore.type=JKS https.ssl.truststore.password=changeit https.ssl.keystore.location=/path/to/http-sink-demo/src/main/resources/localhost-keystore.jks https.ssl.keystore.type=JKS https.ssl.keystore.password=changeit https.ssl.key.password=changeit https.ssl.protocol=TLSv1.2 auth.type=BASIC connection.user=admin connection.password=password .. tip:: Don't forget to update the ``https.ssl.truststore.location`` and ``https.ssl.keystore.location`` with the path to your ``http-sink-demo`` project. #. Run and validate the connector as described in the :ref:`http_connector_quickstart`. .. _http_connector_proxy_auth_example: ^^^^^^^^^^^^^^^^^^^^^^^^^^^^ Proxy Authentication Example ^^^^^^^^^^^^^^^^^^^^^^^^^^^^ .. note:: The proxy authentication example is dependent on MacOS X 10.6.8 or higher due to the proxy that is utilized. #. Run the demo app with the ``simple-auth`` Spring profile. .. codewithvars:: bash mvn spring-boot:run -Dspring.profiles.active=simple-auth #. Install `Squidman Proxy `_. #. In Squidman preferences/general, set the http port to ``3128``. #. In Squidman preferences/template, add the following: :: auth_param basic program /usr/local/squid/libexec/basic_ncsa_auth /etc/squid/passwords auth_param basic realm proxy acl authenticated proxy_auth REQUIRED http_access allow authenticated #. Create a credentials file for the proxy. .. codewithvars:: bash sudo mkdir /etc/squid sudo htpasswd -c /etc/squid/passwords proxyuser # set password to proxypassword #. Open the Squidman application and select ``Start Squid``. #. Create a ``http-sink.properties`` file with the following contents: :: name=HttpSinkProxyAuth topics=http-messages tasks.max=1 connector.class=io.confluent.connect.http.HttpSinkConnector # key/val converters key.converter=org.apache.kafka.connect.storage.StringConverter value.converter=org.apache.kafka.connect.storage.StringConverter # licensing for local single-node Kafka cluster confluent.topic.bootstrap.servers=localhost:9092 confluent.topic.replication.factor=1 # http sink connector configs http.api.url=http://localhost:8080/api/messages http.proxy.host=localhost http.proxy.port=3128 http.proxy.user=proxyuser http.proxy.password=proxypassword #. Run and validate the connector as described in the :ref:`http_connector_quickstart`. .. _http_connector_message_types: ------------------------ Key and Value Converters ------------------------ Similar to other connectors, the key and value converters can be configured to fit the incoming |ak| topic's data. However, once the records have been received by the connector it will attempt to convert them to a String regardless of their type. The String representation of each record is then put into the request body before being sent to the downstream ``http.api.url`` via a ``POST`` or ``PUT`` request. .. _http_connector_avro_example: ^^^^^^^^^^^^^^^^^^^^^^ Avro Converter Example ^^^^^^^^^^^^^^^^^^^^^^ #. Run the demo app with the ``basic-auth`` Spring profile. .. codewithvars:: bash mvn spring-boot:run -Dspring.profiles.active=basic-auth #. Create a ``http-sink.properties`` file with the following contents: :: name=AvroHttpSink topics=avro-topic tasks.max=1 connector.class=io.confluent.connect.http.HttpSinkConnector # key/val converters key.converter=io.confluent.connect.avro.AvroConverter value.converter=io.confluent.connect.avro.AvroConverter # licensing for local single-node Kafka cluster confluent.topic.bootstrap.servers=localhost:9092 confluent.topic.replication.factor=1 # http sink connector configs http.api.url=http://localhost:8080/api/messages auth.type=BASIC connection.user=admin connection.password=password .. note:: Publish Avro messages to ``avro-topic`` instead of the String messages shown in the :ref:`http_connector_quickstart`. #. Run and validate the connector as described in the :ref:`http_connector_quickstart`. .. _http_connector_json_example: ^^^^^^^^^^^^^^^^^^^^^^ JSON Converter Example ^^^^^^^^^^^^^^^^^^^^^^ #. Run the demo app with the ``basic-auth`` Spring profile. .. codewithvars:: bash mvn spring-boot:run -Dspring.profiles.active=basic-auth #. Create a ``http-sink.properties`` file with the following contents: :: name=JsonHttpSink topics=json-topic tasks.max=1 connector.class=io.confluent.connect.http.HttpSinkConnector # key/val converters key.converter=org.apache.kafka.connect.storage.StringConverter value.converter=org.apache.kafka.connect.storage.JsonConverter value.converter.schemas.enable=false # licensing for local single-node Kafka cluster confluent.topic.bootstrap.servers=localhost:9092 confluent.topic.replication.factor=1 # http sink connector configs http.api.url=http://localhost:8080/api/messages auth.type=BASIC connection.user=admin connection.password=password .. note:: Publish JSON messages to ``json-topic`` instead of the String messages shown in the :ref:`http_connector_quickstart`. #. Run and validate the connector as described in the :ref:`http_connector_quickstart`. .. _http_connector_header_forwarding: ----------------- Header Forwarding ----------------- The connector forwards any headers configured via the ``headers`` property. Multiple headers can be separated via the ``|`` but this is configurable by setting ``header.separator``. .. note:: Headers on the incoming |ak| records will not be forwarded. .. _http_connector_header_forwarding_example: ^^^^^^^^^^^^^^^^^^^^^^^^^ Header Forwarding Example ^^^^^^^^^^^^^^^^^^^^^^^^^ #. Run the demo app with the ``basic-auth`` Spring profile. .. codewithvars:: bash mvn spring-boot:run -Dspring.profiles.active=basic-auth #. Create a ``http-sink.properties`` file with the following contents: :: name=HttpSinkBasicAuth topics=http-messages tasks.max=1 connector.class=io.confluent.connect.http.HttpSinkConnector # key/val converters key.converter=org.apache.kafka.connect.storage.StringConverter value.converter=org.apache.kafka.connect.storage.StringConverter # licensing for local single-node Kafka cluster confluent.topic.bootstrap.servers=localhost:9092 confluent.topic.replication.factor=1 # http sink connector configs http.api.url=http://localhost:8080/api/messages auth.type=BASIC connection.user=admin connection.password=password headers=Forward-Me:header_value|Another-Header:another_value #. Run and validate the connector as described in the :ref:`http_connector_quickstart`. .. _http_connector_key_topic_substitution: -------------------------- Key and Topic Substitution -------------------------- The record's value is the only piece of data forwarded to the API by default. However, the record key and/or topic can be substituted into the ``http.api.url`` so that it can be sent to the API. The example below illustrates how this can be done. Notice the structure of the ``http.api.url``. .. _http_connector_key_topic_substitution_example: ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ Key and Topic Substitution Example ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ #. Run the demo app with the ``simple-auth`` Spring profile. .. codewithvars:: bash mvn spring-boot:run -Dspring.profiles.active=simple-auth #. Create a ``http-sink.properties`` file with the following contents: :: name=KeyTopicSubstitution topics=key-val-messages tasks.max=1 connector.class=io.confluent.connect.http.HttpSinkConnector # key/val converters key.converter=org.apache.kafka.connect.storage.StringConverter value.converter=org.apache.kafka.connect.storage.StringConverter # http sink connector configs auth.type=NONE confluent.topic.bootstrap.servers=localhost:9092 confluent.topic.replication.factor=1 http.api.url=http://localhost:8080/api/messages/${topic}/${key} #. Produce a set of messages with keys and values. .. include:: ../../includes/confluent-local-consume-limit.rst .. codewithvars:: bash |confluent_produce| key-val-topic|dash| --property parse.key=true --property key.separator=, > 1,value > 2,another-value #. Run and validate the connector as described in the :ref:`http_connector_quickstart`. .. tip:: Run ``curl localhost:8080/api/messages | jq`` to see that the messages key and topic were saved. .. _http_connector_regex: ------------------ Regex Replacements ------------------ The connector can be configured to match on ``regex.patterns`` and replace any matches with the ``regex.replacements``. The regex pattern match and replacement is done after the record has been converted into its string representation. For using multiple regex patterns, the default separator is ``~`` but can be configured via ``regex.separator``. .. _http_connector_regex_example: ^^^^^^^^^^^^^^^^^^^^^^^^^ Regex Replacement Example ^^^^^^^^^^^^^^^^^^^^^^^^^ #. Run the demo app with the ``basic-auth`` Spring profile. .. codewithvars:: bash mvn spring-boot:run -Dspring.profiles.active=basic-auth #. Create a ``http-sink.properties`` file with the following contents: :: name=RegexHttpSink topics=email-topic,non-email-topic tasks.max=1 connector.class=io.confluent.connect.http.HttpSinkConnector # key/val converters key.converter=org.apache.kafka.connect.storage.StringConverter value.converter=org.apache.kafka.connect.storage.StringConverter # licensing for local single-node Kafka cluster confluent.topic.bootstrap.servers=localhost:9092 confluent.topic.replication.factor=1 # http sink connector configs http.api.url=http://localhost:8080/api/messages auth.type=BASIC connection.user=admin connection.password=password # regex to mask emails regex.patterns=^.+@.+$ regex.replacements=******** #. Publish messages to the topics that are configured. Emails should be redacted with ``********`` before being sent to the demo app. .. codewithvars:: bash |confluent_produce| email-topic > example@domain.com > another@email.com |confluent_produce| non-email-topic > not an email > another normal string #. Run and validate the connector as described in the :ref:`http_connector_quickstart`. .. _http_connector_tombstone: ----------------- Tombstone Records ----------------- A record that has a non-null key and a null value is refered to as a tombstone in |ak|. These records are handled specially by the HTTP Sink connector. By default, tombstone records are ignored but this behavior can be configured with the ``behavior.on.null.values`` property. The other two configuration options are: * ``fail``: If a tombstone record is received, the connector task is killed immediately. * ``delete``: The connector attempts to send a ``DELETE`` request to the configured API. If key substitution is being used (ex. ``localhost:8080/api/messages/${key}``), a ``DELETE`` request is sent to the configured URL with the key injected into the ``${key}`` placeholder. If key substitution is not configured, the record key is appended to the end of the URI and a ``DELETE`` is sent to the formatted URL. ^^^^^^^^^^^^^^^^^^ Delete URL Example ^^^^^^^^^^^^^^^^^^ :: # EXAMPLE - KEY SUBSTITUTION http.api.url=http://localhost:8080/api/messages/${key} behavior.on.null.values=delete # SinkRecord with key = 12, value = "mark@email.com" # DELETE sent to http://localhost:8080/api/messages/12 # EXAMPLE - KEY APPENDED TO END http.api.url=http://localhost:8080/api/messages behavior.on.null.values=delete # SinkRecord with key = 25, value = "jane@email.com" # DELETE sent to http://localhost:8080/api/messages/25 .. _http_connector_tombstone_example: ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ Delete Behavior on Null Values Example ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ #. Run the demo app with the ``simple-auth`` Spring profile. .. codewithvars:: bash mvn spring-boot:run -Dspring.profiles.active=basic-auth #. Create a ``http-sink.properties`` file with the following contents: :: name=DeleteNullHttpSink topics=http-messages tasks.max=1 connector.class=io.confluent.connect.http.HttpSinkConnector # key/val converters key.converter=org.apache.kafka.connect.storage.StringConverter value.converter=org.apache.kafka.connect.storage.StringConverter # licensing for local single-node Kafka cluster confluent.topic.bootstrap.servers=localhost:9092 confluent.topic.replication.factor=1 # http sink connector configs http.api.url=http://localhost:8080/api/messages auth.type=BASIC connection.user=admin connection.password=password behavior.on.null.values=delete #. Publish messages to the topic that have keys and values. .. include:: ../../includes/confluent-local-consume-limit.rst .. codewithvars:: bash |confluent_produce| http-messages|dash| --property parse.key=true --property key.separator=, > 1,message-value > 2,another-message #. Run and validate the connector as described in the :ref:`http_connector_quickstart`. .. tip:: Check for messages in the demo API with this command: ``curl http://localhost:8080/api/messages -H 'Authorization: Basic YWRtaW46cGFzc3dvcmQ=' | jq`` #. Publish messages to the topic that have keys with null values (tombstones). .. note:: This cannot be done with :litwithvars:`|confluent_produce|` but there is an API in the demo app to send tombstones. :: curl -X POST \ 'localhost:8080/api/tombstone?topic=http-messages&key=1' \ -H 'Authorization: Basic YWRtaW46cGFzc3dvcmQ=' #. Validate that the demo app deleted the messages. :: curl http://localhost:8080/api/messages \ -H 'Authorization: Basic YWRtaW46cGFzc3dvcmQ=' | jq Additional Documentation ------------------------ .. toctree:: :titlesonly: connector_config changelog