Important
You are viewing documentation for an older version of Confluent Platform. For the latest, click here.
Kafka Connect HTTP Sink Connector¶
The Kafka Connect HTTP Sink Connector integrates Kafka with an API via HTTP or HTTPS.
The connector consumes records from Kafka topic(s) and converts each record value
to a String before sending it in the request body to the configured http.api.url
,
which optionally can reference the record key and/or topic name.
The targeted API must support either a POST
or PUT
request.
The connector batches records up to the set batch.max.size
before
sending the batched request to the API. Each record is converted to its String representation
and then separated with the batch.separator
.
The HTTP Sink Connector supports connecting to APIs using SSL along with Basic Authentication, OAuth2, or a Proxy Authentication Server.
Install HTTP Connector¶
You can install this connector by using the Confluent Hub client (recommended) or you can manually download the ZIP file.
Install the connector using Confluent Hub¶
- Prerequisite
- Confluent Hub Client must be installed. This is installed by default with Confluent Enterprise.
Navigate to your Confluent Platform installation directory and run this command to install the latest (latest
) connector version.
The connector must be installed on every machine where Connect will be run.
confluent-hub install confluentinc/kafka-connect-http:latest
You can install a specific version by replacing latest
with a version number. For example:
confluent-hub install confluentinc/kafka-connect-http:1.0.3
Install Connector Manually¶
Download and extract the ZIP file for your connector and then follow the manual connector installation instructions.
License¶
You can use this connector for a 30-day trial period without a license key.
After 30 days, this connector is available under a Confluent enterprise license. Confluent issues enterprise license keys to subscribers, along with providing enterprise-level support for Confluent Platform and your connectors. If you are a subscriber, please contact Confluent Support at support@confluent.io for more information.
See Confluent Platform license for license properties and License topic configuration for information about the license topic.
Quick Start¶
This quick start uses the HTTP Sink Connector to consume records and send HTTP requests to a demo HTTP service running locally that is running without any authentication.
Additional examples can be found in the Feature Descriptions and Examples section below.
- Prerequisites
- Confluent Platform
- Confluent CLI (requires separate installation)
Before starting the connector, clone and run the kafka-connect-http-demo app on your machine.
git clone https://github.com/confluentinc/kafka-connect-http-demo.git cd kafka-connect-http-demo mvn spring-boot:run -Dspring.profiles.active=simple-auth
Install the connector through the Confluent Hub Client.
# run from your Confluent Platform installation directory confluent-hub install confluentinc/kafka-connect-http:latest
Start the Confluent Platform.
confluent start
Produce test data to the
http-messages
topic in Kafka.seq 10 | confluent produce http-messages
Create a
http-sink.json
file with the following contents:{ "name": "HttpSink", "config": { "topics": "http-messages", "tasks.max": "1", "connector.class": "io.confluent.connect.http.HttpSinkConnector", "http.api.url": "http://localhost:8080/api/messages", "value.converter": "org.apache.kafka.connect.storage.StringConverter", "confluent.topic.bootstrap.servers": "localhost:9092", "confluent.topic.replication.factor": "1" } }
Load the HTTP Sink Connector.
confluent load httpsink -d http-sink.json
Important
Don’t use the Confluent CLI in production environments.
Confirm that the connector is in a
RUNNING
state.confluent status httpsink
Confirm that the data was sent to the HTTP endpoint.
curl localhost:8080/api/messages
Note
Before running other examples, kill the demo app (CTRL + C
) to avoid port conflicts.
Examples¶
- Authentication
- Key and Value Converters
- Header Forwarding
- Key and Topic Substitution
- Regex Replacements
- Tombstone Records
Authentication¶
The HTTP Connector can run with SSL enabled/disabled and also supports various authentication types like Basic Auth, OAuth2, and Proxy Server Auth.
Basic Authentication Example¶
Run the demo app with the
basic-auth
Spring profile.mvn spring-boot:run -Dspring.profiles.active=basic-auth
Note
If the demo app is already running, you will need to kill that instance (
CTRL + C
) before running a new instance to avoid port conflicts.Create a
http-sink.properties
file with the following contents:name=HttpSinkBasicAuth topics=http-messages tasks.max=1 connector.class=io.confluent.connect.http.HttpSinkConnector # key/val converters key.converter=org.apache.kafka.connect.storage.StringConverter value.converter=org.apache.kafka.connect.storage.StringConverter # licensing for local single-node Kafka cluster confluent.topic.bootstrap.servers=localhost:9092 confluent.topic.replication.factor=1 # http sink connector configs http.api.url=http://localhost:8080/api/messages auth.type=BASIC connection.user=admin connection.password=password
Run and validate the connector as described in the Quick Start.
OAuth2 Authentication Example¶
Note
The connector’s OAuth2 configuration only allows for use of the Client Credentials grant type.
Run the demo app with the
oauth2
Spring profile.mvn spring-boot:run -Dspring.profiles.active=oauth2
Create a
http-sink.properties
file with the following contents:name=HttpSinkOAuth2 topics=http-messages tasks.max=1 connector.class=io.confluent.connect.http.HttpSinkConnector # key/val converters key.converter=org.apache.kafka.connect.storage.StringConverter value.converter=org.apache.kafka.connect.storage.StringConverter # licensing for local single-node Kafka cluster confluent.topic.bootstrap.servers=localhost:9092 confluent.topic.replication.factor=1 # http sink connector configs http.api.url=http://localhost:8080/api/messages auth.type=OAUTH2 oauth2.token.url=http://localhost:8080/oauth/token oauth2.client.id=kc-client oauth2.client.secret=kc-secret
Run and validate the connector as described in the Quick Start.
SSL with Basic Authentication Example¶
Run the demo app with the
ssl-auth
Spring profile.mvn spring-boot:run -Dspring.profiles.active=ssl-auth
Create a
http-sink.properties
file with the following contents:name=SSLHttpSink topics=string-topic tasks.max=1 connector.class=io.confluent.connect.http.HttpSinkConnector # key/val converters key.converter=org.apache.kafka.connect.storage.StringConverter value.converter=org.apache.kafka.connect.storage.StringConverter # licensing for local single-node Kafka cluster confluent.topic.bootstrap.servers=localhost:9092 confluent.topic.replication.factor=1 # http sink connector configs http.api.url=https://localhost:8443/api/messages # http sink connector SSL config ssl.enabled=true https.ssl.truststore.location=/path/to/http-sink-demo/src/main/resources/localhost-keystore.jks https.ssl.truststore.type=JKS https.ssl.truststore.password=changeit https.ssl.keystore.location=/path/to/http-sink-demo/src/main/resources/localhost-keystore.jks https.ssl.keystore.type=JKS https.ssl.keystore.password=changeit https.ssl.key.password=changeit https.ssl.protocol=TLSv1.2 auth.type=BASIC connection.user=admin connection.password=password
Tip
Don’t forget to update the
https.ssl.truststore.location
andhttps.ssl.keystore.location
with the path to yourhttp-sink-demo
project.Run and validate the connector as described in the Quick Start.
Proxy Authentication Example¶
Note
The proxy authentication example is dependent on MacOS X 10.6.8 or higher due to the proxy that is utilized.
Run the demo app with the
simple-auth
Spring profile.mvn spring-boot:run -Dspring.profiles.active=simple-auth
Install Squidman Proxy.
In Squidman preferences/general, set the http port to
3128
.In Squidman preferences/template, add the following:
auth_param basic program /usr/local/squid/libexec/basic_ncsa_auth /etc/squid/passwords auth_param basic realm proxy acl authenticated proxy_auth REQUIRED http_access allow authenticated
Create a credentials file for the proxy.
sudo mkdir /etc/squid sudo htpasswd -c /etc/squid/passwords proxyuser # set password to proxypassword
Open the Squidman application and select
Start Squid
.Create a
http-sink.properties
file with the following contents:name=HttpSinkProxyAuth topics=http-messages tasks.max=1 connector.class=io.confluent.connect.http.HttpSinkConnector # key/val converters key.converter=org.apache.kafka.connect.storage.StringConverter value.converter=org.apache.kafka.connect.storage.StringConverter # licensing for local single-node Kafka cluster confluent.topic.bootstrap.servers=localhost:9092 confluent.topic.replication.factor=1 # http sink connector configs http.api.url=http://localhost:8080/api/messages http.proxy.host=localhost http.proxy.port=3128 http.proxy.user=proxyuser http.proxy.password=proxypassword
Run and validate the connector as described in the Quick Start.
Key and Value Converters¶
Similar to other connectors, the key and value converters can be configured to fit
the incoming Kafka topic’s data. However, once the records have been received by the connector
it will attempt to convert them to a String regardless of their type. The String representation of
each record is then put into the request body before being sent to the downstream http.api.url
via a POST
or PUT
request.
Avro Converter Example¶
Run the demo app with the
basic-auth
Spring profile.mvn spring-boot:run -Dspring.profiles.active=basic-auth
Create a
http-sink.properties
file with the following contents:name=AvroHttpSink topics=avro-topic tasks.max=1 connector.class=io.confluent.connect.http.HttpSinkConnector # key/val converters key.converter=io.confluent.connect.avro.AvroConverter value.converter=io.confluent.connect.avro.AvroConverter # licensing for local single-node Kafka cluster confluent.topic.bootstrap.servers=localhost:9092 confluent.topic.replication.factor=1 # http sink connector configs http.api.url=http://localhost:8080/api/messages auth.type=BASIC connection.user=admin connection.password=password
Note
Publish Avro messages to
avro-topic
instead of the String messages shown in the Quick Start.Run and validate the connector as described in the Quick Start.
JSON Converter Example¶
Run the demo app with the
basic-auth
Spring profile.mvn spring-boot:run -Dspring.profiles.active=basic-auth
Create a
http-sink.properties
file with the following contents:name=JsonHttpSink topics=json-topic tasks.max=1 connector.class=io.confluent.connect.http.HttpSinkConnector # key/val converters key.converter=org.apache.kafka.connect.storage.StringConverter value.converter=org.apache.kafka.connect.storage.JsonConverter value.converter.schemas.enable=false # licensing for local single-node Kafka cluster confluent.topic.bootstrap.servers=localhost:9092 confluent.topic.replication.factor=1 # http sink connector configs http.api.url=http://localhost:8080/api/messages auth.type=BASIC connection.user=admin connection.password=password
Note
Publish JSON messages to
json-topic
instead of the String messages shown in the Quick Start.Run and validate the connector as described in the Quick Start.
Header Forwarding¶
The connector forwards any headers configured via the headers
property. Multiple headers can be separated via the |
but this is configurable by setting header.separator
.
Note
Headers on the incoming Kafka records will not be forwarded.
Header Forwarding Example¶
Run the demo app with the
basic-auth
Spring profile.mvn spring-boot:run -Dspring.profiles.active=basic-auth
Create a
http-sink.properties
file with the following contents:name=HttpSinkBasicAuth topics=http-messages tasks.max=1 connector.class=io.confluent.connect.http.HttpSinkConnector # key/val converters key.converter=org.apache.kafka.connect.storage.StringConverter value.converter=org.apache.kafka.connect.storage.StringConverter # licensing for local single-node Kafka cluster confluent.topic.bootstrap.servers=localhost:9092 confluent.topic.replication.factor=1 # http sink connector configs http.api.url=http://localhost:8080/api/messages auth.type=BASIC connection.user=admin connection.password=password headers=Forward-Me:header_value|Another-Header:another_value
Run and validate the connector as described in the Quick Start.
Key and Topic Substitution¶
The record’s value is the only piece of data forwarded to the API by default. However, the
record key and/or topic can be substituted into the http.api.url
so that it can be sent
to the API.
The example below illustrates how this can be done. Notice the structure of the http.api.url
.
Key and Topic Substitution Example¶
Run the demo app with the
simple-auth
Spring profile.mvn spring-boot:run -Dspring.profiles.active=simple-auth
Create a
http-sink.properties
file with the following contents:name=KeyTopicSubstitution topics=key-val-messages tasks.max=1 connector.class=io.confluent.connect.http.HttpSinkConnector # key/val converters key.converter=org.apache.kafka.connect.storage.StringConverter value.converter=org.apache.kafka.connect.storage.StringConverter # http sink connector configs auth.type=NONE confluent.topic.bootstrap.servers=localhost:9092 confluent.topic.replication.factor=1 http.api.url=http://localhost:8080/api/messages/${topic}/${key}
Produce a set of messages with keys and values.
confluent produce key-val-topic --property parse.key=true --property key.separator=, > 1,value > 2,another-value
Run and validate the connector as described in the Quick Start.
Tip
Run curl localhost:8080/api/messages | jq
to see that the messages key and topic were saved.
Regex Replacements¶
The connector can be configured to match on regex.patterns
and replace any matches with
the regex.replacements
. The regex pattern match and replacement is done after the record has been converted into its string representation.
For using multiple regex patterns, the default separator is ~
but can be configured via regex.separator
.
Regex Replacement Example¶
Run the demo app with the
basic-auth
Spring profile.mvn spring-boot:run -Dspring.profiles.active=basic-auth
Create a
http-sink.properties
file with the following contents:name=RegexHttpSink topics=email-topic,non-email-topic tasks.max=1 connector.class=io.confluent.connect.http.HttpSinkConnector # key/val converters key.converter=org.apache.kafka.connect.storage.StringConverter value.converter=org.apache.kafka.connect.storage.StringConverter # licensing for local single-node Kafka cluster confluent.topic.bootstrap.servers=localhost:9092 confluent.topic.replication.factor=1 # http sink connector configs http.api.url=http://localhost:8080/api/messages auth.type=BASIC connection.user=admin connection.password=password # regex to mask emails regex.patterns=^.+@.+$ regex.replacements=********
Publish messages to the topics that are configured. Emails should be redacted with
********
before being sent to the demo app.confluent produce email-topic > example@domain.com > another@email.com confluent produce non-email-topic > not an email > another normal string
Run and validate the connector as described in the Quick Start.
Tombstone Records¶
A record that has a non-null key and a null value is refered to as a tombstone in Kafka. These records are handled specially by the HTTP Sink connector.
By default, tombstone records are ignored but this behavior can be configured with the
behavior.on.null.values
property.
The other two configuration options are:
fail
: If a tombstone record is received, the connector task is killed immediately.delete
: The connector attempts to send aDELETE
request to the configured API.
If key substitution is being used (ex. localhost:8080/api/messages/${key}
), a DELETE
request is sent to the configured URL with the key injected into the ${key}
placeholder.
If key substitution is not configured, the record key is appended to the end of the URI and a DELETE
is sent to the formatted URL.
Delete URL Example¶
# EXAMPLE - KEY SUBSTITUTION
http.api.url=http://localhost:8080/api/messages/${key}
behavior.on.null.values=delete
# SinkRecord with key = 12, value = "mark@email.com"
# DELETE sent to http://localhost:8080/api/messages/12
# EXAMPLE - KEY APPENDED TO END
http.api.url=http://localhost:8080/api/messages
behavior.on.null.values=delete
# SinkRecord with key = 25, value = "jane@email.com"
# DELETE sent to http://localhost:8080/api/messages/25
Delete Behavior on Null Values Example¶
Run the demo app with the
simple-auth
Spring profile.mvn spring-boot:run -Dspring.profiles.active=basic-auth
Create a
http-sink.properties
file with the following contents:name=DeleteNullHttpSink topics=http-messages tasks.max=1 connector.class=io.confluent.connect.http.HttpSinkConnector # key/val converters key.converter=org.apache.kafka.connect.storage.StringConverter value.converter=org.apache.kafka.connect.storage.StringConverter # licensing for local single-node Kafka cluster confluent.topic.bootstrap.servers=localhost:9092 confluent.topic.replication.factor=1 # http sink connector configs http.api.url=http://localhost:8080/api/messages auth.type=BASIC connection.user=admin connection.password=password behavior.on.null.values=delete
Publish messages to the topic that have keys and values.
confluent produce http-messages --property parse.key=true --property key.separator=, > 1,message-value > 2,another-message
Run and validate the connector as described in the Quick Start.
Tip
Check for messages in the demo API with this command:
curl http://localhost:8080/api/messages -H 'Authorization: Basic YWRtaW46cGFzc3dvcmQ=' | jq
Publish messages to the topic that have keys with null values (tombstones).
Note
This cannot be done with
confluent produce
but there is an API in the demo app to send tombstones.curl -X POST \ 'localhost:8080/api/tombstone?topic=http-messages&key=1' \ -H 'Authorization: Basic YWRtaW46cGFzc3dvcmQ='
Validate that the demo app deleted the messages.
curl http://localhost:8080/api/messages \ -H 'Authorization: Basic YWRtaW46cGFzc3dvcmQ=' | jq