HTTP Sink Connector for Confluent Platform¶
The Kafka Connect HTTP Sink connector integrates Apache Kafka® with an API using HTTP or HTTPS.
The connector consumes records from Kafka topic(s) and converts each record value
to a String or a JSON with request.body.format=json
before sending it in the
request body to the configured http.api.url
, which optionally can reference
the record key and/or topic name. You can also use fields from the Kafka
record. The targeted API must support either
a POST
, PATCH
, or PUT
request.
The connector batches records up to the set batch.max.size
before sending
the batched request to the API. Each record is converted to its String
representation or its JSON representation with request.body.format=json
and
then separated with the batch.separator
.
Note
The HTTP Sink connector does not batch requests for messages containing Kafka header values that are different.
The HTTP Sink connector supports connecting to APIs using SSL along with Basic Authentication, OAuth2, or a Proxy Authentication Server.
Features¶
The HTTP Sink connector for Confluent Platform includes the following features:
At least once delivery¶
This connector guarantees that records from the Kafka topic are delivered at least once.
Dead Letter Queue¶
This connector supports the Dead Letter Queue (DLQ) functionality. For information about accessing and using the DLQ, see Confluent Platform Dead Letter Queue.
Multiple tasks¶
The HTTP Sink connector supports running one or more tasks. You can specify
the number of tasks in the tasks.max
configuration parameter. This can lead
to performance gains when multiple files need to be parsed.
Template parameters¶
The HTTP Sink connector forwards the message (record) value to the HTTP API. You
can add parameters to have the connector construct a unique HTTP API URL
containing the record key and topic name. For example, you enter
http://eshost1:9200/api/messages/${topic}/${key}
to have the HTTP API URL
contain the topic name and record key.
In addition to the ${topic}
and ${key}
parameters, you can also refer to
fields from the Kafka record. As shown in the following example, you may want the
connector to construct a URL that uses the Order ID and Customer ID.
The following example shows the Avro format the producer uses to generate
records in the Apache Kafka® topic order
:
{
"name": "MyClass",
"type": "record",
"namespace": "com.acme.avro",
"fields": [
{
"name": "customerId",
"type": "int"
},
{
"name": "order",
"type": {
"name": "order",
"type": "record",
"fields": [
{
"name": "id",
"type": "int"
},
{
"name": "amount",
"type": "int"
}
]
}
}
]
}
To send the Order ID and Customer ID, you would use the following URL in the
HTTP API URL (http.api.url
) configuration property:
"http.api.url" : "http://eshost1:9200/api/messages/order/${order.id}/customer/${customerId}/"
Assuming the data in the Kafka topic contains the following values:
{
"customerId": 123,
"order": {
"id": 1,
"amount": 12345
}
}
The connector constructs the following URL:
http://eshost1:9200/api/messages/order/1/customer/123/
Note
- The maximum depth for added parameters is 10. For example, connector
validation fails if you use the following URL:
https://eshost1:9200/api/messages/order/${a.b.c.d.e.f.g.h.i.j.k}
. - When you add parameters to the HTTP API URL, each record can result in a unique URL. For this reason, batching is disabled when using additional URL parameters.
- The connector throws a runtime exception if fields referred to in the HTTP API URL do not exist in the Kafka record.
Install the HTTP Sink Connector¶
You can install this connector by using the Confluent Hub client installation instructions or by manually downloading the ZIP file.
Prerequisites¶
Important
You must install the connector on every machine where Connect will run.
An installation of the Confluent Hub Client.
Note
This is installed by default with Confluent Enterprise.
An installation of the latest (
latest
) connector version.To install the
latest
connector version, navigate to your Confluent Platform installation directory and run the following command:confluent-hub install confluentinc/kafka-connect-http:latest
You can install a specific version by replacing
latest
with a version number as shown in the following example:confluent-hub install confluentinc/kafka-connect-http:1.0.3
Install the connector manually¶
Download and extract the ZIP file for your connector and then follow the manual connector installation instructions.
License¶
You can use this connector for a 30-day trial period without a license key.
After 30 days, you must purchase a connector subscription which includes Confluent enterprise license keys to subscribers, along with enterprise-level support for Confluent Platform and your connectors. If you are a subscriber, please contact Confluent Support at support@confluent.io for more information.
See Confluent Platform license for license properties and License topic configuration for information about the license topic.
Configuration Properties¶
For a complete list of configuration properties for this connector, see Configuration Properties.
Note
For an example of how to get Kafka Connect connected to Confluent Cloud, see Distributed Cluster.
Quick Start¶
This quick start uses the HTTP Sink connector to consume records and send HTTP requests to a demo HTTP service running locally that is running without any authentication.
- Prerequisites
- Confluent Platform
- Confluent CLI (requires separate installation)
Before starting the connector, clone and run the kafka-connect-http-demo app on your machine.
git clone https://github.com/confluentinc/kafka-connect-http-demo.git cd kafka-connect-http-demo mvn spring-boot:run -Dspring.profiles.active=simple-auth
Install the connector through the Confluent Hub Client
Tip
The command syntax for the Confluent CLI development commands changed in 5.3.0. These commands have been moved to
confluent local
. For example, the syntax forconfluent start
is nowconfluent local services start
. For more information, see confluent local.confluent local services start
Produce test data to the
http-messages
topic in Kafka using the Confluent CLI confluent local services kafka produce command.seq 10 | confluent local services kafka produce http-messages
Create a
http-sink.json
file with the following contents:{ "name": "HttpSink", "config": { "topics": "http-messages", "tasks.max": "1", "connector.class": "io.confluent.connect.http.HttpSinkConnector", "http.api.url": "http://localhost:8080/api/messages", "value.converter": "org.apache.kafka.connect.storage.StringConverter", "confluent.topic.bootstrap.servers": "localhost:9092", "confluent.topic.replication.factor": "1", "reporter.bootstrap.servers": "localhost:9092", "reporter.result.topic.name": "success-responses", "reporter.result.topic.replication.factor": "1", "reporter.error.topic.name":"error-responses", "reporter.error.topic.replication.factor":"1" } }
Load the HTTP Sink connector.
Caution
You must include a double dash (
--
) between the topic name and your flag. For more information, see this post.confluent local services connect connector load HttpSink --config http-sink.json
Important
Don’t use the confluent local commands in production environments.
Confirm that the connector is in a
RUNNING
state.confluent local services connect connector status HttpSink
Confirm that the data was sent to the HTTP endpoint.
curl localhost:8080/api/messages
Note
Before running other examples, kill the demo app (CTRL + C
) to avoid port
conflicts.
Examples¶
- Authentication
- Key and value converters
- Header forwarding
- Key and topic substitution
- Regex replacements
- Tombstone records
- Retries
- Reporter
Authentication¶
The HTTP Sink connector can run with SSL enabled/disabled and also supports various authentication types like Basic Auth, OAuth2, and Proxy Server Auth.
Basic authentication example¶
Run the demo app with the
basic-auth
Spring profile.mvn spring-boot:run -Dspring.profiles.active=basic-auth
Note
If the demo app is already running, you will need to kill that instance (
CTRL + C
) before running a new instance to avoid port conflicts.Create a
http-sink.properties
file with the following contents:name=HttpSinkBasicAuth topics=http-messages tasks.max=1 connector.class=io.confluent.connect.http.HttpSinkConnector # key/val converters key.converter=org.apache.kafka.connect.storage.StringConverter value.converter=org.apache.kafka.connect.storage.StringConverter # licensing for local single-node Kafka cluster confluent.topic.bootstrap.servers=localhost:9092 confluent.topic.replication.factor=1 # connect reporter required bootstrap server reporter.bootstrap.servers=localhost:9092 reporter.result.topic.name=success-responses reporter.result.topic.replication.factor=1 reporter.error.topic.name=error-responses reporter.error.topic.replication.factor=1 # http sink connector configs http.api.url=http://localhost:8080/api/messages auth.type=BASIC connection.user=admin connection.password=password
Note
For details about using this connector with Kafka Connect Reporter, see Connect Reporter.
Run and validate the connector as described in the Quick Start.
OAuth2 authentication example¶
Note
The connector’s OAuth2 configuration only allows for use of the Client Credentials grant type.
Run the demo app with the
oauth2
Spring profile.mvn spring-boot:run -Dspring.profiles.active=oauth2
Create a
http-sink.properties
file with the following contents:name=HttpSinkOAuth2 topics=http-messages tasks.max=1 connector.class=io.confluent.connect.http.HttpSinkConnector # key/val converters key.converter=org.apache.kafka.connect.storage.StringConverter value.converter=org.apache.kafka.connect.storage.StringConverter # licensing for local single-node Kafka cluster confluent.topic.bootstrap.servers=localhost:9092 confluent.topic.replication.factor=1 # connect reporter required bootstrap server reporter.bootstrap.servers=localhost:9092 reporter.result.topic.name=success-responses reporter.result.topic.replication.factor=1 reporter.error.topic.name=error-responses reporter.error.topic.replication.factor=1 # http sink connector configs http.api.url=http://localhost:8080/api/messages auth.type=OAUTH2 oauth2.token.url=http://localhost:8080/oauth/token oauth2.client.id=kc-client oauth2.client.secret=kc-secret
Note
For details about using this connector with Kafka Connect Reporter, see Connect Reporter.
Run and validate the connector as described in the Quick Start.
SSL with basic authentication example¶
Run the demo app with the
ssl-auth
Spring profile.mvn spring-boot:run -Dspring.profiles.active=ssl-auth
Create a
http-sink.properties
file with the following contents:name=SSLHttpSink topics=string-topic tasks.max=1 connector.class=io.confluent.connect.http.HttpSinkConnector # key/val converters key.converter=org.apache.kafka.connect.storage.StringConverter value.converter=org.apache.kafka.connect.storage.StringConverter # licensing for local single-node Kafka cluster confluent.topic.bootstrap.servers=localhost:9092 confluent.topic.replication.factor=1 # connect reporter required bootstrap server reporter.bootstrap.servers=localhost:9092 reporter.result.topic.name=success-responses reporter.result.topic.replication.factor=1 reporter.error.topic.name=error-responses reporter.error.topic.replication.factor=1 # http sink connector configs http.api.url=https://localhost:8443/api/messages # http sink connector SSL config ssl.enabled=true https.ssl.truststore.location=/path/to/http-sink-demo/src/main/resources/localhost-keystore.jks https.ssl.truststore.type=JKS https.ssl.truststore.password=changeit https.ssl.keystore.location=/path/to/http-sink-demo/src/main/resources/localhost-keystore.jks https.ssl.keystore.type=JKS https.ssl.keystore.password=changeit https.ssl.key.password=changeit https.ssl.protocol=TLSv1.2 auth.type=BASIC connection.user=admin connection.password=password
Tip
Don’t forget to update the
https.ssl.truststore.location
andhttps.ssl.keystore.location
with the path to yourhttp-sink-demo
project.Run and validate the connector as described in the Quick Start.
Proxy authentication example¶
Note
The proxy authentication example is dependent on MacOS X 10.6.8 or higher due to the proxy that is utilized.
Run the demo app with the
simple-auth
Spring profile.mvn spring-boot:run -Dspring.profiles.active=simple-auth
Install Squidman Proxy.
In SquidMan, navigate to the Preferences > General tab, and set the HTTP port to
3128
.In SquidMan, navigate to the Preferences > Template tab, and add the following criteria:
auth_param basic program /usr/local/squid/libexec/basic_ncsa_auth /etc/squid/passwords auth_param basic realm proxy acl authenticated proxy_auth REQUIRED http_access allow authenticated
Create a credentials file for the proxy.
sudo mkdir /etc/squid sudo htpasswd -c /etc/squid/passwords proxyuser # set password to proxypassword
Open the SquidMan application and select
Start Squid
.Create a
http-sink.properties
file with the following contents:name=HttpSinkProxyAuth topics=http-messages tasks.max=1 connector.class=io.confluent.connect.http.HttpSinkConnector # key/val converters key.converter=org.apache.kafka.connect.storage.StringConverter value.converter=org.apache.kafka.connect.storage.StringConverter # licensing for local single-node Kafka cluster confluent.topic.bootstrap.servers=localhost:9092 confluent.topic.replication.factor=1 # connect reporter required bootstrap server reporter.bootstrap.servers=localhost:9092 reporter.result.topic.name=success-responses reporter.result.topic.replication.factor=1 reporter.error.topic.name=error-responses reporter.error.topic.replication.factor=1 # http sink connector configs http.api.url=http://localhost:8080/api/messages http.proxy.host=localhost http.proxy.port=3128 http.proxy.user=proxyuser http.proxy.password=proxypassword
Run and validate the connector as described in the Quick Start.
Key and value converters¶
Similar to other connectors, the key and value converters can be configured to
fit the incoming Kafka topic’s data. However, once the records have been received
by the connector it will attempt to convert them to a String regardless of their
type. The String representation of each record is then put into the request body
before being sent to the downstream http.api.url
via a POST
or PUT
request.
Avro converter example¶
Run the demo app with the
basic-auth
Spring profile.mvn spring-boot:run -Dspring.profiles.active=basic-auth
Create a
http-sink.properties
file with the following contents:name=AvroHttpSink topics=avro-topic tasks.max=1 connector.class=io.confluent.connect.http.HttpSinkConnector # key/val converters key.converter=io.confluent.connect.avro.AvroConverter value.converter=io.confluent.connect.avro.AvroConverter # licensing for local single-node Kafka cluster confluent.topic.bootstrap.servers=localhost:9092 confluent.topic.replication.factor=1 # http sink connector configs http.api.url=http://localhost:8080/api/messages auth.type=BASIC connection.user=admin connection.password=password
Note
Publish Avro messages to the
avro-topic
instead of to the String messagesshown in the Quick Start.
Run and validate the connector as described in the Quick Start.
JSON converter example¶
Run the demo app with the
basic-auth
Spring profile.mvn spring-boot:run -Dspring.profiles.active=basic-auth
Create a
http-sink.properties
file with the following contents:name=JsonHttpSink topics=json-topic tasks.max=1 connector.class=io.confluent.connect.http.HttpSinkConnector # key/val converters key.converter=org.apache.kafka.connect.storage.StringConverter value.converter=org.apache.kafka.connect.json.JsonConverter value.converter.schemas.enable=false # licensing for local single-node Kafka cluster confluent.topic.bootstrap.servers=localhost:9092 confluent.topic.replication.factor=1 # http sink connector configs http.api.url=http://localhost:8080/api/messages auth.type=BASIC connection.user=admin connection.password=password
Note
Publish JSON messages to the
json-topic
instead of to the String messagesshown in the Quick Start.
Run and validate the connector as described in the Quick Start.
Header forwarding¶
The HTTP Sink connector includes the following configurations for adding headers to all requests:
headers
header.separator
The connector will forward any headers you configure using the headers
property. You can use |
to separate multiple headers, which is configurable
by setting the header.separator
property. To view more details about each of
these configuration properties, see Configuration Properties.
The HTTP Sink connector also reads record headers for incoming records, and appends them as-is to the outbound requests. These types of record headers are applied after the unconditional headers mentioned previously and should take precedence.
Header forwarding example¶
Run the demo app with the
basic-auth
Spring profile.mvn spring-boot:run -Dspring.profiles.active=basic-auth
Create a
http-sink.properties
file with the following contents:name=HttpSinkBasicAuth topics=http-messages tasks.max=1 connector.class=io.confluent.connect.http.HttpSinkConnector # key/val converters key.converter=org.apache.kafka.connect.storage.StringConverter value.converter=org.apache.kafka.connect.storage.StringConverter # licensing for local single-node Kafka cluster confluent.topic.bootstrap.servers=localhost:9092 confluent.topic.replication.factor=1 # connect reporter required bootstrap server reporter.bootstrap.servers=localhost:9092 reporter.result.topic.name=success-responses reporter.result.topic.replication.factor=1 reporter.error.topic.name=error-responses reporter.error.topic.replication.factor=1 # http sink connector configs http.api.url=http://localhost:8080/api/messages auth.type=BASIC connection.user=admin connection.password=password headers=Forward-Me:header_value|Another-Header:another_value
Run and validate the connector as described in the Quick Start.
Key and topic substitution¶
The record’s value is the only piece of data forwarded to the API by default.
However, the record key and/or topic can be substituted into the
http.api.url
so that it can be sent to the API.
The example below illustrates how this can be done. Notice the structure of the
http.api.url
.
Key and topic substitution example¶
Run the demo app with the
simple-auth
Spring profile.mvn spring-boot:run -Dspring.profiles.active=simple-auth
Create a
http-sink.properties
file with the following contents:name=KeyTopicSubstitution topics=key-val-messages tasks.max=1 connector.class=io.confluent.connect.http.HttpSinkConnector # key/val converters key.converter=org.apache.kafka.connect.storage.StringConverter value.converter=org.apache.kafka.connect.storage.StringConverter # licensing for local single-node Kafka cluster confluent.topic.bootstrap.servers=localhost:9092 confluent.topic.replication.factor=1 # connect reporter required bootstrap server reporter.bootstrap.servers=localhost:9092 reporter.result.topic.name=success-responses reporter.result.topic.replication.factor=1 reporter.error.topic.name=error-responses reporter.error.topic.replication.factor=1 # http sink connector configs auth.type=NONE confluent.topic.bootstrap.servers=localhost:9092 reporter.bootstrap.servers=localhost:9092 confluent.topic.replication.factor=1 http.api.url=http://localhost:8080/api/messages/${topic}/${key}
Produce a set of messages with keys and values.
Caution
You must include a double dash (
--
) between the topic name and your flag. For more information, see this post.confluent local services kafka produce key-val-topic --property parse.key=true --property key.separator=, > 1,value > 2,another-value
Run and validate the connector as described in the Quick Start.
Tip
Run curl localhost:8080/api/messages | jq
to see that the messages key
and topic were saved.
Regex replacements¶
The connector can be configured to match on regex.patterns
and replace any
matches with the regex.replacements
. The regex pattern match and replacement
is done after the record has been converted into its string representation.
For using multiple regex patterns, the default separator is ~
but can be
configured via regex.separator
.
Regex replacement example¶
Run the demo app with the
basic-auth
Spring profile.mvn spring-boot:run -Dspring.profiles.active=basic-auth
Create a
http-sink.properties
file with the following contents:name=RegexHttpSink topics=email-topic,non-email-topic tasks.max=1 connector.class=io.confluent.connect.http.HttpSinkConnector # key/val converters key.converter=org.apache.kafka.connect.storage.StringConverter value.converter=org.apache.kafka.connect.storage.StringConverter # licensing for local single-node Kafka cluster confluent.topic.bootstrap.servers=localhost:9092 confluent.topic.replication.factor=1 # connect reporter required bootstrap server reporter.bootstrap.servers=localhost:9092 reporter.result.topic.name=success-responses reporter.result.topic.replication.factor=1 reporter.error.topic.name=error-responses reporter.error.topic.replication.factor=1 # http sink connector configs http.api.url=http://localhost:8080/api/messages auth.type=BASIC connection.user=admin connection.password=password # regex to mask emails regex.patterns=^.+@.+$ regex.replacements=********
Publish messages to the topics that are configured. Emails should be redacted with
********
before being sent to the demo app.confluent local services kafka produce email-topic > example@domain.com > another@email.com confluent local services kafka produce non-email-topic > not an email > another normal string
Run and validate the connector as described in the Quick Start.
Note
Regex replacement is not supported when request.body.format
configuration is set to JSON
.
Tombstone records¶
A record that has a non-null key and a null value is referred to as a tombstone in Kafka. These records are handled specially by the HTTP Sink connector.
By default, tombstone records are ignored but this behavior can be configured
with the behavior.on.null.values
property.
The other two configuration options are:
fail
: If a tombstone record is received, the connector task is killed immediately.delete
: The connector attempts to send aDELETE
request to the configured API.
If key substitution is being used (for example,
localhost:8080/api/messages/${key}
), a DELETE
request is sent to the
configured URL with the key injected into the ${key}
placeholder. If key
substitution is not configured, the record key is appended to the end of the URI
and a DELETE
is sent to the formatted URL.
Delete URL example¶
# EXAMPLE - KEY SUBSTITUTION
http.api.url=http://localhost:8080/api/messages/${key}
behavior.on.null.values=delete
# SinkRecord with key = 12, value = "mark@email.com"
# DELETE sent to http://localhost:8080/api/messages/12
# EXAMPLE - KEY APPENDED TO END
http.api.url=http://localhost:8080/api/messages
behavior.on.null.values=delete
# SinkRecord with key = 25, value = "jane@email.com"
# DELETE sent to http://localhost:8080/api/messages/25
Delete behavior on null values example¶
Run the demo app with the
simple-auth
Spring profile.mvn spring-boot:run -Dspring.profiles.active=basic-auth
Create a
http-sink.properties
file with the following contents:name=DeleteNullHttpSink topics=http-messages tasks.max=1 connector.class=io.confluent.connect.http.HttpSinkConnector # key/val converters key.converter=org.apache.kafka.connect.storage.StringConverter value.converter=org.apache.kafka.connect.storage.StringConverter # licensing for local single-node Kafka cluster confluent.topic.bootstrap.servers=localhost:9092 confluent.topic.replication.factor=1 # connect reporter required bootstrap server reporter.bootstrap.servers=localhost:9092 reporter.result.topic.name=success-responses reporter.result.topic.replication.factor=1 reporter.error.topic.name=error-responses reporter.error.topic.replication.factor=1 # http sink connector configs http.api.url=http://localhost:8080/api/messages auth.type=BASIC connection.user=admin connection.password=password behavior.on.null.values=delete
Publish messages to the topic that have keys and values.
Caution
You must include a double dash (
--
) between the topic name and your flag. For more information, see this post.confluent local services kafka produce http-messages --property parse.key=true --property key.separator=, > 1,message-value > 2,another-message
Run and validate the connector as described in the Quick Start.
Tip
Check for messages in the demo API with this command:
curl http://localhost:8080/api/messages -H 'Authorization: Basic YWRtaW46cGFzc3dvcmQ=' | jq
Publish messages to the topic that have keys with null values (tombstones).
Note
This cannot be done with
confluent local services kafka produce
but there is an API in the demo app to send tombstones.curl -X POST \ 'localhost:8080/api/tombstone?topic=http-messages&key=1' \ -H 'Authorization: Basic YWRtaW46cGFzc3dvcmQ='
Validate that the demo app deleted the messages.
curl http://localhost:8080/api/messages \ -H 'Authorization: Basic YWRtaW46cGFzc3dvcmQ=' | jq
Retries¶
In case of failures, the connector can be configured to retry the operations by
configuring max.retries
and retry.backoff.ms
parameters.
Retries example¶
Run the demo app with the
basic-auth
Spring profile.mvn spring-boot:run -Dspring.profiles.active=basic-auth
Create a
http-sink.properties
file with the following contents:name=RetriesExample topics=http-messages tasks.max=1 connector.class=io.confluent.connect.http.HttpSinkConnector # key/val converters key.converter=org.apache.kafka.connect.storage.StringConverter value.converter=org.apache.kafka.connect.storage.StringConverter # licensing for local single-node Kafka cluster confluent.topic.bootstrap.servers=localhost:9092 confluent.topic.replication.factor=1 # connect reporter required bootstrap server reporter.bootstrap.servers=localhost:9092 reporter.result.topic.name=success-responses reporter.result.topic.replication.factor=1 reporter.error.topic.name=error-responses reporter.error.topic.replication.factor=1 # http sink connector configs http.api.url=http://localhost:8080/api/messages auth.type=BASIC connection.user=admin connection.password=password behavior.on.null.values=delete # retry configurations max.retries=20 retry.backoff.ms=5000
Publish messages to the topic that have keys and values.
Caution
You must include a double dash (
--
) between the topic name and your flag. For more information, see this post.confluent local services kafka produce http-messages --property parse.key=true --property key.separator=, > 1,message-value > 2,another-message
Stop the demo app.
Run and validate the connector as described in the Quick Start.
The Connector will retry for maximum 20 times with an initial backoff duration of 5000ms. If the http operation is successful then the retry will be stopped. In this case the connector will retry for 20 times and the connector task will get failed.
The default value for
max.retries
is 10 and forretry.backoff.ms
is 3000ms.
Reporter¶
The Connector can be configured to capture the success/failure responses from http operations by configuring reporter parameters.
Reporter example¶
Run the demo app with the
basic-auth
Spring profile.mvn spring-boot:run -Dspring.profiles.active=basic-auth
Create a
http-sink.properties
file with the following contents:name=ReporterExample topics=http-messages tasks.max=1 connector.class=io.confluent.connect.http.HttpSinkConnector # key/val converters key.converter=org.apache.kafka.connect.storage.StringConverter value.converter=org.apache.kafka.connect.storage.StringConverter # licensing for local single-node Kafka cluster confluent.topic.bootstrap.servers=localhost:9092 confluent.topic.replication.factor=1 # http sink connector configs http.api.url=http://localhost:8080/api/messages auth.type=BASIC connection.user=admin connection.password=password behavior.on.null.values=delete # reporter configurations reporter.bootstrap.servers=localhost:9092 reporter.result.topic.name=success-responses reporter.result.topic.replication.factor=1 reporter.error.topic.name=error-responses reporter.error.topic.replication.factor=1 reporter.admin.bootstrap.servers=<host>.confluent.cloud:9092 reporter.admin.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule / required username=<username> password=<password> reporter.admin.security.protocol=SASL_SSL reporter.admin.sasl.mechanism=PLAIN" reporter.producer.bootstrap.servers=<host>.confluent.cloud:9092 reporter.producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule / required username=<username> password=<password> reporter.producer.security.protocol=SASL_SSL reporter.producer.sasl.mechanism=PLAIN"
Note
For additional information about Connect Reporter for secure environments, see Kafka Connect Reporter.
Publish messages to the topic that have keys and values.
Caution
You must include a double dash (
--
) between the topic name and your flag. For more information, see this post.confluent local services kafka produce http-messages --property parse.key=true --property key.separator=, > 1,message-value > 2,another-message
Run and validate the connector as described in the Quick Start.
Consume the records from
success-responses
anderror-responses
topic to see the http operation response.kafkacat -C -b localhost:9092 -t success-responses -J |jq
{ "topic": "success-responses", "partition": 0, "offset": 0, "tstype": "create", "ts": 1581579911854, "headers": [ "input_record_offset", "0", "input_record_timestamp", "1581488456476", "input_record_partition", "0", "input_record_topic", "http-connect" ], "key": null, "payload": "{\"id\":1,\"message\":\"1,message-value\"}" } { "topic": "success-responses", "partition": 0, "offset": 1, "tstype": "create", "ts": 1581579911854, "headers": [ "input_record_offset", "1", "input_record_timestamp", "1581488456476", "input_record_partition", "0", "input_record_topic", "http-connect" ], "key": null, "payload": "{\"id\":2,\"message\":\"2,message-value\"}" }
In case of retryable errors (that is, errors with a 5xx status code), a response like the one shown below is included in the error-responses topic.
kafkacat -C -b localhost:9092 -t error-responses -J |jq
{ "topic": "error-responses", "partition": 0, "offset": 0, "tstype": "create", "ts": 1581579911854, "headers": [ "input_record_offset", "0", "input_record_timestamp", "1581579931450", "input_record_partition", "0", "input_record_topic", "http-messages" ], "key": null, "payload": "Retry time lapsed, unable to process HTTP request. Error while processing HTTP request with Url : http://localhost:8080/api/messages, Payload : 6,test, Status code : 500, Reason Phrase : , Response Content : {\"timestamp\":\"2020-02-11T10:44:41.574+0000\",\"status\":500,\"error\":\"Internal Server Error\",\"message\":\"Unresolved compilation problem: \\n\\tlog cannot be resolved\\n\",\"path\":\"/api/messages\"}, " }
Suggested Reading¶
Blog post: Webify Event Streams Using the Kafka Connect HTTP Sink connector