HTTP Sink Connector for Confluent Platform

The Kafka Connect HTTP Sink connector integrates Apache Kafka® with an API via HTTP or HTTPS.

The connector consumes records from Kafka topic(s) and converts each record value to a String or a JSON with request.body.format=json before sending it in the request body to the configured http.api.url, which optionally can reference the record key and/or topic name. The targeted API must support either a POST or PUT request.

The connector batches records up to the set batch.max.size before sending the batched request to the API. Each record is converted to its String representation or its JSON representation with request.body.format=json and then separated with the batch.separator.

The HTTP Sink connector supports connecting to APIs using SSL along with Basic Authentication, OAuth2, or a Proxy Authentication Server.

Features

The HTTP Sink connector for Confluent Platform includes the following features:

At least once delivery

This connector guarantees that records from the Kafka topic are delivered at least once.

Dead Letter Queue

This connector supports the Dead Letter Queue (DLQ) functionality. For information about accessing and using the DLQ, see Confluent Platform Dead Letter Queue.

Multiple tasks

The HTTP Sink connector supports running one or more tasks. You can specify the number of tasks in the tasks.max configuration parameter. This can lead to huge performance gains when multiple files need to be parsed.

Install the HTTP Sink Connector

You can install this connector by using the Confluent Hub client installation instructions or by manually downloading the ZIP file.

Prerequisites

Important

You must install the connector on every machine where Connect will run.

  • An installation of the Confluent Hub Client.

    Note

    This is installed by default with Confluent Enterprise.

  • An installation of the latest (latest) connector version.

    To install the latest connector version, navigate to your Confluent Platform installation directory and run the following command:

    confluent-hub install confluentinc/kafka-connect-http:latest
    

    You can install a specific version by replacing latest with a version number as shown in the following example:

    confluent-hub install confluentinc/kafka-connect-http:1.0.3
    

Install the connector manually

Download and extract the ZIP file for your connector and then follow the manual connector installation instructions.

License

You can use this connector for a 30-day trial period without a license key.

After 30 days, this connector is available under a Confluent enterprise license. Confluent issues Confluent enterprise license keys to subscribers, along with providing enterprise-level support for Confluent Platform and your connectors. If you are a subscriber, please contact Confluent Support at support@confluent.io for more information.

See Confluent Platform license for license properties and License topic configuration for information about the license topic.

Configuration Properties

For a complete list of configuration properties for this connector, see HTTP Sink Connector Configuration Properties.

Note

For an example of how to get Kafka Connect connected to Confluent Cloud, see Distributed Cluster.

Quick Start

This quick start uses the HTTP Sink connector to consume records and send HTTP requests to a demo HTTP service running locally that is running without any authentication.

Prerequisites
  1. Before starting the connector, clone and run the kafka-connect-http-demo app on your machine.

    git clone https://github.com/confluentinc/kafka-connect-http-demo.git
    cd kafka-connect-http-demo
    mvn spring-boot:run -Dspring.profiles.active=simple-auth
    
  2. Install the connector through the Confluent Hub Client.

    Tip

    The command syntax for the Confluent CLI development commands changed in 5.3.0. These commands have been moved to confluent local. For example, the syntax for confluent start is now confluent local services start. For more information, see confluent local.

    confluent local services start
    
  3. Produce test data to the http-messages topic in Kafka using the Confluent CLI confluent local services kafka produce command.

    seq 10 | confluent local services kafka produce http-messages
    
  4. Create a http-sink.json file with the following contents:

    {
      "name": "HttpSink",
      "config": {
        "topics": "http-messages",
        "tasks.max": "1",
        "connector.class": "io.confluent.connect.http.HttpSinkConnector",
        "http.api.url": "http://localhost:8080/api/messages",
        "value.converter": "org.apache.kafka.connect.storage.StringConverter",
        "confluent.topic.bootstrap.servers": "localhost:9092",
        "confluent.topic.replication.factor": "1",
        "reporter.bootstrap.servers": "localhost:9092",
        "reporter.result.topic.name": "success-responses",
        "reporter.result.topic.replication.factor": "1",
        "reporter.error.topic.name":"error-responses",
        "reporter.error.topic.replication.factor":"1"
      }
    }
    
  5. Load the HTTP Sink connector.

    Caution

    You must include a double dash (--) between the topic name and your flag. For more information, see this post.

    confluent local services connect connector load HttpSink --config http-sink.json
    

    Important

    Don’t use the confluent local commands in production environments.

  6. Confirm that the connector is in a RUNNING state.

    confluent local services connect connector status HttpSink
    
  7. Confirm that the data was sent to the HTTP endpoint.

    curl localhost:8080/api/messages
    

Note

Before running other examples, kill the demo app (CTRL + C) to avoid port conflicts.

Examples

Authentication

The HTTP Sink connector can run with SSL enabled/disabled and also supports various authentication types like Basic Auth, OAuth2, and Proxy Server Auth.

Basic authentication example

  1. Run the demo app with the basic-auth Spring profile.

    mvn spring-boot:run -Dspring.profiles.active=basic-auth
    

    Note

    If the demo app is already running, you will need to kill that instance (CTRL + C) before running a new instance to avoid port conflicts.

  2. Create a http-sink.properties file with the following contents:

    name=HttpSinkBasicAuth
    topics=http-messages
    tasks.max=1
    connector.class=io.confluent.connect.http.HttpSinkConnector
    # key/val converters
    key.converter=org.apache.kafka.connect.storage.StringConverter
    value.converter=org.apache.kafka.connect.storage.StringConverter
    # licensing for local single-node Kafka cluster
    confluent.topic.bootstrap.servers=localhost:9092
    confluent.topic.replication.factor=1
    # connect reporter required bootstrap server
    reporter.bootstrap.servers=localhost:9092
    reporter.result.topic.name=success-responses
    reporter.result.topic.replication.factor=1
    reporter.error.topic.name=error-responses
    reporter.error.topic.replication.factor=1
    # http sink connector configs
    http.api.url=http://localhost:8080/api/messages
    auth.type=BASIC
    connection.user=admin
    connection.password=password
    

    Note

    For details about using this connector with Kafka Connect Reporter, see Connect Reporter.

  3. Run and validate the connector as described in the Quick Start.

OAuth2 authentication example

Note

The connector’s OAuth2 configuration only allows for use of the Client Credentials grant type.

  1. Run the demo app with the oauth2 Spring profile.

    mvn spring-boot:run -Dspring.profiles.active=oauth2
    
  2. Create a http-sink.properties file with the following contents:

    name=HttpSinkOAuth2
    topics=http-messages
    tasks.max=1
    connector.class=io.confluent.connect.http.HttpSinkConnector
    # key/val converters
    key.converter=org.apache.kafka.connect.storage.StringConverter
    value.converter=org.apache.kafka.connect.storage.StringConverter
    # licensing for local single-node Kafka cluster
    confluent.topic.bootstrap.servers=localhost:9092
    confluent.topic.replication.factor=1
    # connect reporter required bootstrap server
    reporter.bootstrap.servers=localhost:9092
    reporter.result.topic.name=success-responses
    reporter.result.topic.replication.factor=1
    reporter.error.topic.name=error-responses
    reporter.error.topic.replication.factor=1
    # http sink connector configs
    http.api.url=http://localhost:8080/api/messages
    auth.type=OAUTH2
    oauth2.token.url=http://localhost:8080/oauth/token
    oauth2.client.id=kc-client
    oauth2.client.secret=kc-secret
    

    Note

    For details about using this connector with Kafka Connect Reporter, see Connect Reporter.

  3. Run and validate the connector as described in the Quick Start.

SSL with basic authentication example

  1. Run the demo app with the ssl-auth Spring profile.

    mvn spring-boot:run -Dspring.profiles.active=ssl-auth
    
  2. Create a http-sink.properties file with the following contents:

    name=SSLHttpSink
    topics=string-topic
    tasks.max=1
    connector.class=io.confluent.connect.http.HttpSinkConnector
    # key/val converters
    key.converter=org.apache.kafka.connect.storage.StringConverter
    value.converter=org.apache.kafka.connect.storage.StringConverter
    # licensing for local single-node Kafka cluster
    confluent.topic.bootstrap.servers=localhost:9092
    confluent.topic.replication.factor=1
    # connect reporter required bootstrap server
    reporter.bootstrap.servers=localhost:9092
    reporter.result.topic.name=success-responses
    reporter.result.topic.replication.factor=1
    reporter.error.topic.name=error-responses
    reporter.error.topic.replication.factor=1
    # http sink connector configs
    http.api.url=https://localhost:8443/api/messages
    # http sink connector SSL config
    ssl.enabled=true
    https.ssl.truststore.location=/path/to/http-sink-demo/src/main/resources/localhost-keystore.jks
    https.ssl.truststore.type=JKS
    https.ssl.truststore.password=changeit
    https.ssl.keystore.location=/path/to/http-sink-demo/src/main/resources/localhost-keystore.jks
    https.ssl.keystore.type=JKS
    https.ssl.keystore.password=changeit
    https.ssl.key.password=changeit
    https.ssl.protocol=TLSv1.2
    
    auth.type=BASIC
    connection.user=admin
    connection.password=password
    

    Tip

    Don’t forget to update the https.ssl.truststore.location and https.ssl.keystore.location with the path to your http-sink-demo project.

  3. Run and validate the connector as described in the Quick Start.

Proxy authentication example

Note

The proxy authentication example is dependent on MacOS X 10.6.8 or higher due to the proxy that is utilized.

  1. Run the demo app with the simple-auth Spring profile.

    mvn spring-boot:run -Dspring.profiles.active=simple-auth
    
  2. Install Squidman Proxy.

  3. In SquidMan, navigate to the Preferences > General tab, and set the HTTP port to 3128.

  4. In SquidMan, navigate to the Preferences > Template tab, and add the following criteria:

    auth_param basic program /usr/local/squid/libexec/basic_ncsa_auth /etc/squid/passwords
    auth_param basic realm proxy
    acl authenticated proxy_auth REQUIRED
    http_access allow authenticated
    
  5. Create a credentials file for the proxy.

    sudo mkdir /etc/squid
    sudo htpasswd -c /etc/squid/passwords proxyuser
    # set password to proxypassword
    
  6. Open the SquidMan application and select Start Squid.

  7. Create a http-sink.properties file with the following contents:

    name=HttpSinkProxyAuth
    topics=http-messages
    tasks.max=1
    connector.class=io.confluent.connect.http.HttpSinkConnector
    # key/val converters
    key.converter=org.apache.kafka.connect.storage.StringConverter
    value.converter=org.apache.kafka.connect.storage.StringConverter
    # licensing for local single-node Kafka cluster
    confluent.topic.bootstrap.servers=localhost:9092
    confluent.topic.replication.factor=1
    # connect reporter required bootstrap server
    reporter.bootstrap.servers=localhost:9092
    reporter.result.topic.name=success-responses
    reporter.result.topic.replication.factor=1
    reporter.error.topic.name=error-responses
    reporter.error.topic.replication.factor=1
    # http sink connector configs
    http.api.url=http://localhost:8080/api/messages
    http.proxy.host=localhost
    http.proxy.port=3128
    http.proxy.user=proxyuser
    http.proxy.password=proxypassword
    
  8. Run and validate the connector as described in the Quick Start.

Key and value converters

Similar to other connectors, the key and value converters can be configured to fit the incoming Kafka topic’s data. However, once the records have been received by the connector it will attempt to convert them to a String regardless of their type. The String representation of each record is then put into the request body before being sent to the downstream http.api.url via a POST or PUT request.

Avro converter example

  1. Run the demo app with the basic-auth Spring profile.

    mvn spring-boot:run -Dspring.profiles.active=basic-auth
    
  2. Create a http-sink.properties file with the following contents:

    name=AvroHttpSink
    topics=avro-topic
    tasks.max=1
    connector.class=io.confluent.connect.http.HttpSinkConnector
    # key/val converters
    key.converter=io.confluent.connect.avro.AvroConverter
    value.converter=io.confluent.connect.avro.AvroConverter
    # licensing for local single-node Kafka cluster
    confluent.topic.bootstrap.servers=localhost:9092
    confluent.topic.replication.factor=1
    # http sink connector configs
    http.api.url=http://localhost:8080/api/messages
    auth.type=BASIC
    connection.user=admin
    connection.password=password
    

    Note

    Publish Avro messages to the avro-topic instead of to the String messages

    shown in the Quick Start.

  3. Run and validate the connector as described in the Quick Start.

JSON converter example

  1. Run the demo app with the basic-auth Spring profile.

    mvn spring-boot:run -Dspring.profiles.active=basic-auth
    
  2. Create a http-sink.properties file with the following contents:

    name=JsonHttpSink
    topics=json-topic
    tasks.max=1
    connector.class=io.confluent.connect.http.HttpSinkConnector
    # key/val converters
    key.converter=org.apache.kafka.connect.storage.StringConverter
    value.converter=org.apache.kafka.connect.storage.JsonConverter
    value.converter.schemas.enable=false
    # licensing for local single-node Kafka cluster
    confluent.topic.bootstrap.servers=localhost:9092
    confluent.topic.replication.factor=1
    # http sink connector configs
    http.api.url=http://localhost:8080/api/messages
    auth.type=BASIC
    connection.user=admin
    connection.password=password
    

    Note

    Publish JSON messages to the json-topic instead of to the String messages

    shown in the Quick Start.

  3. Run and validate the connector as described in the Quick Start.

Header forwarding

The HTTP Sink connector includes the following configurations for adding headers to all requests:

  • headers
  • header.separator

The connector will forward any headers you configure using the headers property. You can use | to separate multiple headers, which is configurable by setting the header.separator property. To view more details about each of these configuration properties, see HTTP Sink Connector Configuration Properties.

The HTTP Sink connector also reads record headers for incoming records, and appends them as-is to the outbound requests. These types of record headers are applied after the unconditional headers mentioned previously and should take precedence.

Header forwarding example

  1. Run the demo app with the basic-auth Spring profile.

    mvn spring-boot:run -Dspring.profiles.active=basic-auth
    
  2. Create a http-sink.properties file with the following contents:

    name=HttpSinkBasicAuth
    topics=http-messages
    tasks.max=1
    connector.class=io.confluent.connect.http.HttpSinkConnector
    # key/val converters
    key.converter=org.apache.kafka.connect.storage.StringConverter
    value.converter=org.apache.kafka.connect.storage.StringConverter
    # licensing for local single-node Kafka cluster
    confluent.topic.bootstrap.servers=localhost:9092
    confluent.topic.replication.factor=1
    # connect reporter required bootstrap server
    reporter.bootstrap.servers=localhost:9092
    reporter.result.topic.name=success-responses
    reporter.result.topic.replication.factor=1
    reporter.error.topic.name=error-responses
    reporter.error.topic.replication.factor=1
    # http sink connector configs
    http.api.url=http://localhost:8080/api/messages
    auth.type=BASIC
    connection.user=admin
    connection.password=password
    headers=Forward-Me:header_value|Another-Header:another_value
    
  3. Run and validate the connector as described in the Quick Start.

Key and topic substitution

The record’s value is the only piece of data forwarded to the API by default. However, the record key and/or topic can be substituted into the http.api.url so that it can be sent to the API.

The example below illustrates how this can be done. Notice the structure of the http.api.url.

Key and topic substitution example

  1. Run the demo app with the simple-auth Spring profile.

    mvn spring-boot:run -Dspring.profiles.active=simple-auth
    
  2. Create a http-sink.properties file with the following contents:

    name=KeyTopicSubstitution
    topics=key-val-messages
    tasks.max=1
    connector.class=io.confluent.connect.http.HttpSinkConnector
    # key/val converters
    key.converter=org.apache.kafka.connect.storage.StringConverter
    value.converter=org.apache.kafka.connect.storage.StringConverter
    # licensing for local single-node Kafka cluster
    confluent.topic.bootstrap.servers=localhost:9092
    confluent.topic.replication.factor=1
    # connect reporter required bootstrap server
    reporter.bootstrap.servers=localhost:9092
    reporter.result.topic.name=success-responses
    reporter.result.topic.replication.factor=1
    reporter.error.topic.name=error-responses
    reporter.error.topic.replication.factor=1
    # http sink connector configs
    auth.type=NONE
    confluent.topic.bootstrap.servers=localhost:9092
    reporter.bootstrap.servers=localhost:9092
    confluent.topic.replication.factor=1
    http.api.url=http://localhost:8080/api/messages/${topic}/${key}
    
  3. Produce a set of messages with keys and values.

    Caution

    You must include a double dash (--) between the topic name and your flag. For more information, see this post.

    confluent local services kafka produce key-val-topic --property parse.key=true --property key.separator=,
    
    > 1,value
    > 2,another-value
    
  4. Run and validate the connector as described in the Quick Start.

Tip

Run curl localhost:8080/api/messages | jq to see that the messages key and topic were saved.

Regex replacements

The connector can be configured to match on regex.patterns and replace any matches with the regex.replacements. The regex pattern match and replacement is done after the record has been converted into its string representation.

For using multiple regex patterns, the default separator is ~ but can be configured via regex.separator.

Regex replacement example

  1. Run the demo app with the basic-auth Spring profile.

    mvn spring-boot:run -Dspring.profiles.active=basic-auth
    
  2. Create a http-sink.properties file with the following contents:

    name=RegexHttpSink
    topics=email-topic,non-email-topic
    tasks.max=1
    connector.class=io.confluent.connect.http.HttpSinkConnector
    # key/val converters
    key.converter=org.apache.kafka.connect.storage.StringConverter
    value.converter=org.apache.kafka.connect.storage.StringConverter
    # licensing for local single-node Kafka cluster
    confluent.topic.bootstrap.servers=localhost:9092
    confluent.topic.replication.factor=1
    # connect reporter required bootstrap server
    reporter.bootstrap.servers=localhost:9092
    reporter.result.topic.name=success-responses
    reporter.result.topic.replication.factor=1
    reporter.error.topic.name=error-responses
    reporter.error.topic.replication.factor=1
    # http sink connector configs
    http.api.url=http://localhost:8080/api/messages
    auth.type=BASIC
    connection.user=admin
    connection.password=password
    # regex to mask emails
    regex.patterns=^.+@.+$
    regex.replacements=********
    
  3. Publish messages to the topics that are configured. Emails should be redacted with ******** before being sent to the demo app.

    confluent local services kafka produce email-topic
    > example@domain.com
    > another@email.com
    
    confluent local services kafka produce non-email-topic
    > not an email
    > another normal string
    
  4. Run and validate the connector as described in the Quick Start.

Note

Regex replacement is not supported when request.body.format configuration is set to JSON.

Tombstone records

A record that has a non-null key and a null value is referred to as a tombstone in Kafka. These records are handled specially by the HTTP Sink connector.

By default, tombstone records are ignored but this behavior can be configured with the behavior.on.null.values property.

The other two configuration options are:

  • fail: If a tombstone record is received, the connector task is killed immediately.
  • delete: The connector attempts to send a DELETE request to the configured API.

If key substitution is being used (for example, localhost:8080/api/messages/${key}), a DELETE request is sent to the configured URL with the key injected into the ${key} placeholder. If key substitution is not configured, the record key is appended to the end of the URI and a DELETE is sent to the formatted URL.

Delete URL example

# EXAMPLE - KEY SUBSTITUTION

http.api.url=http://localhost:8080/api/messages/${key}
behavior.on.null.values=delete

# SinkRecord with key = 12, value = "mark@email.com"
# DELETE sent to http://localhost:8080/api/messages/12


# EXAMPLE - KEY APPENDED TO END

http.api.url=http://localhost:8080/api/messages
behavior.on.null.values=delete

# SinkRecord with key = 25, value = "jane@email.com"
# DELETE sent to http://localhost:8080/api/messages/25

Delete behavior on null values example

  1. Run the demo app with the simple-auth Spring profile.

    mvn spring-boot:run -Dspring.profiles.active=basic-auth
    
  2. Create a http-sink.properties file with the following contents:

    name=DeleteNullHttpSink
    topics=http-messages
    tasks.max=1
    connector.class=io.confluent.connect.http.HttpSinkConnector
    # key/val converters
    key.converter=org.apache.kafka.connect.storage.StringConverter
    value.converter=org.apache.kafka.connect.storage.StringConverter
    # licensing for local single-node Kafka cluster
    confluent.topic.bootstrap.servers=localhost:9092
    confluent.topic.replication.factor=1
    # connect reporter required bootstrap server
    reporter.bootstrap.servers=localhost:9092
    reporter.result.topic.name=success-responses
    reporter.result.topic.replication.factor=1
    reporter.error.topic.name=error-responses
    reporter.error.topic.replication.factor=1
    # http sink connector configs
    http.api.url=http://localhost:8080/api/messages
    auth.type=BASIC
    connection.user=admin
    connection.password=password
    behavior.on.null.values=delete
    
  3. Publish messages to the topic that have keys and values.

    Caution

    You must include a double dash (--) between the topic name and your flag. For more information, see this post.

    confluent local services kafka produce http-messages --property parse.key=true --property key.separator=,
    > 1,message-value
    > 2,another-message
    
  4. Run and validate the connector as described in the Quick Start.

    Tip

    Check for messages in the demo API with this command: curl http://localhost:8080/api/messages -H 'Authorization: Basic YWRtaW46cGFzc3dvcmQ=' | jq

  5. Publish messages to the topic that have keys with null values (tombstones).

    Note

    This cannot be done with confluent local services kafka produce but there is an API in the demo app to send tombstones.

    curl -X POST \
      'localhost:8080/api/tombstone?topic=http-messages&key=1' \
      -H 'Authorization: Basic YWRtaW46cGFzc3dvcmQ='
    
  6. Validate that the demo app deleted the messages.

    curl http://localhost:8080/api/messages \
      -H 'Authorization: Basic YWRtaW46cGFzc3dvcmQ=' | jq
    

Retries

In case of failures, the connector can be configured to retry the operations by configuring max.retries and retry.backoff.ms parameters.

Retries example

  1. Run the demo app with the basic-auth Spring profile.

    mvn spring-boot:run -Dspring.profiles.active=basic-auth
    
  2. Create a http-sink.properties file with the following contents:

    name=RetriesExample
    topics=http-messages
    tasks.max=1
    connector.class=io.confluent.connect.http.HttpSinkConnector
    # key/val converters
    key.converter=org.apache.kafka.connect.storage.StringConverter
    value.converter=org.apache.kafka.connect.storage.StringConverter
    # licensing for local single-node Kafka cluster
    confluent.topic.bootstrap.servers=localhost:9092
    confluent.topic.replication.factor=1
    # connect reporter required bootstrap server
    reporter.bootstrap.servers=localhost:9092
    reporter.result.topic.name=success-responses
    reporter.result.topic.replication.factor=1
    reporter.error.topic.name=error-responses
    reporter.error.topic.replication.factor=1
    # http sink connector configs
    http.api.url=http://localhost:8080/api/messages
    auth.type=BASIC
    connection.user=admin
    connection.password=password
    behavior.on.null.values=delete
    # retry configurations
    max.retries=20
    retry.backoff.ms=5000
    
  3. Publish messages to the topic that have keys and values.

    Caution

    You must include a double dash (--) between the topic name and your flag. For more information, see this post.

    confluent local services kafka produce http-messages --property parse.key=true --property key.separator=,
    > 1,message-value
    > 2,another-message
    
  4. Stop the demo app.

  5. Run and validate the connector as described in the Quick Start.

  6. The Connector will retry for maximum 20 times with an initial backoff duration of 5000ms. If the http operation is successful then the retry will be stopped. In this case the connector will retry for 20 times and the connector task will get failed.

  7. The default value for max.retries is 10 and for retry.backoff.ms is 3000ms.

Reporter

The Connector can be configured to capture the success/failure responses from http operations by configuring reporter parameters.

Reporter example

  1. Run the demo app with the basic-auth Spring profile.

    mvn spring-boot:run -Dspring.profiles.active=basic-auth
    
  2. Create a http-sink.properties file with the following contents:

    name=ReporterExample
    topics=http-messages
    tasks.max=1
    connector.class=io.confluent.connect.http.HttpSinkConnector
    # key/val converters
    key.converter=org.apache.kafka.connect.storage.StringConverter
    value.converter=org.apache.kafka.connect.storage.StringConverter
    # licensing for local single-node Kafka cluster
    confluent.topic.bootstrap.servers=localhost:9092
    confluent.topic.replication.factor=1
    # http sink connector configs
    http.api.url=http://localhost:8080/api/messages
    auth.type=BASIC
    connection.user=admin
    connection.password=password
    behavior.on.null.values=delete
    # reporter configurations
    reporter.bootstrap.servers=localhost:9092
    reporter.result.topic.name=success-responses
    reporter.result.topic.replication.factor=1
    reporter.error.topic.name=error-responses
    reporter.error.topic.replication.factor=1
    
    reporter.admin.bootstrap.servers=<host>.confluent.cloud:9092
    reporter.admin.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule /
    required username=<username> password=<password>
    reporter.admin.security.protocol=SASL_SSL
    reporter.admin.sasl.mechanism=PLAIN"
    
    reporter.producer.bootstrap.servers=<host>.confluent.cloud:9092
    reporter.producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule /
    required username=<username> password=<password>
    reporter.producer.security.protocol=SASL_SSL
    reporter.producer.sasl.mechanism=PLAIN"
    

    Note

    For additional information about Connect Reporter for secure environments, see Kafka Connect Reporter.

  3. Publish messages to the topic that have keys and values.

    Caution

    You must include a double dash (--) between the topic name and your flag. For more information, see this post.

    confluent local services kafka produce http-messages --property parse.key=true --property key.separator=,
    > 1,message-value
    > 2,another-message
    
  4. Run and validate the connector as described in the Quick Start.

  5. Consume the records from success-responses and error-responses topic to see the http operation response.

    kafkacat -C -b localhost:9092 -t success-responses -J |jq
    
    {
      "topic": "success-responses",
      "partition": 0,
      "offset": 0,
      "tstype": "create",
      "ts": 1581579911854,
      "headers": [
        "input_record_offset",
        "0",
        "input_record_timestamp",
        "1581488456476",
        "input_record_partition",
        "0",
        "input_record_topic",
        "http-connect"
      ],
      "key": null,
      "payload": "{\"id\":1,\"message\":\"1,message-value\"}"
    }
    {
      "topic": "success-responses",
      "partition": 0,
      "offset": 1,
      "tstype": "create",
      "ts": 1581579911854,
      "headers": [
        "input_record_offset",
        "1",
        "input_record_timestamp",
        "1581488456476",
        "input_record_partition",
        "0",
        "input_record_topic",
        "http-connect"
      ],
      "key": null,
      "payload": "{\"id\":2,\"message\":\"2,message-value\"}"
    }
    

    In case of retryable errors (that is, errors with a 5xx status code), a response like the one shown below is included in the error-responses topic.

    kafkacat -C -b localhost:9092 -t error-responses -J |jq
    
    {
      "topic": "error-responses",
      "partition": 0,
      "offset": 0,
      "tstype": "create",
      "ts": 1581579911854,
      "headers": [
        "input_record_offset",
        "0",
        "input_record_timestamp",
        "1581579931450",
        "input_record_partition",
        "0",
        "input_record_topic",
        "http-messages"
      ],
      "key": null,
      "payload": "Retry time lapsed, unable to process HTTP request. Error while processing HTTP request with Url : http://localhost:8080/api/messages, Payload : 6,test, Status code : 500, Reason Phrase : , Response Content : {\"timestamp\":\"2020-02-11T10:44:41.574+0000\",\"status\":500,\"error\":\"Internal Server Error\",\"message\":\"Unresolved compilation problem: \\n\\tlog cannot be resolved\\n\",\"path\":\"/api/messages\"}, "
    }