Kafka Connect Logging

Kafka Connect and other Confluent Platform components use the Java-based logging utility Apache Log4j to collect runtime data and record component events. The following table describes each log level. The Kafka Connect Log4j properties file is located in the Confluent Platform installation directory path etc/kafka/connect-log4j.properties.

Level

Description

OFF

Turns off logging.

FATAL

Severe errors that cause premature termination.

ERROR

Other runtime errors or unexpected conditions.

WARN

Runtime situations that are undesirable or unexpected, but not necessarily wrong.

INFO

Runtime events of interest at startup and shutdown.

DEBUG

Detailed diagnostic information about events.

TRACE

Detailed diagnostic information about everything.

By default, Connect writes INFO, WARN, ERROR, and FATAL information to standard output (stdout). When Connect starts, it writes the settings that it’s using and any WARN and ERROR (or FATAL) messages along the way. This could be a missing configuration, a broken connector, and so on. If you want to see all events, and each step Connect and a connector take from startup to shutdown, you can set DEBUG or TRACE logging levels.

View Connect Logs

Kafka Connect writes logs to stdout by default. The following sections provide commands that allow you to view the Connect log.

Docker

Enter the following command to tail the Connect log for Confluent Platform running on Docker:

docker logs -f kafka-connect

For information about how to set up logging levels with Docker, see Changing Docker log levels.

Confluent CLI

Enter the following command to display a snapshot of the log for Confluent Platform running locally:

confluent local services connect log

You can pipe the log through grep to view specific log information. For example, to see S3 connector messages, you could enter the following command:

confluent local services connect log | grep s3

When you run the standard log command, the connect.stdout path is shown in the displayed output. You can use this path in the log command to tail the log and display any new messages that are appended to the log. For example:

confluent local services connect log -f

Example output:

[2020-07-07 15:04:29,333] DEBUG Progress event: HTTP_REQUEST_COMPLETED_EVENT, bytes: 0 (io.confluent.connect.s3.storage.S3OutputStream:286)
[2020-07-07 15:04:29,333] DEBUG Progress event: HTTP_RESPONSE_STARTED_EVENT, bytes: 0 (io.confluent.connect.s3.storage.S3OutputStream:286)
[2020-07-07 15:04:29,333] DEBUG Progress event: HTTP_RESPONSE_COMPLETED_EVENT, bytes: 0 (io.confluent.connect.s3.storage.S3OutputStream:286)
[2020-07-07 15:04:29,333] DEBUG Progress event: CLIENT_REQUEST_SUCCESS_EVENT, bytes: 0 (io.confluent.connect.s3.storage.S3OutputStream:286)
[2020-07-07 15:04:29,334] DEBUG Progress event: REQUEST_BYTE_TRANSFER_EVENT, bytes: 406 (io.confluent.connect.s3.storage.S3OutputStream:286)
[2020-07-07 15:04:29,334] DEBUG Progress event: TRANSFER_PART_COMPLETED_EVENT, bytes: 0 (io.confluent.connect.s3.storage.S3OutputStream:286)

Scripted Confluent Platform startup

Connect log files are written to /var/log/confluent/connect when using Confluent Platform systemd Service Unit Files to run Confluent Platform. The systemd journal service writes the logs. Enter the following journalctl commands to view log messages for Kafka Connect:

sudo journalctl -u confluent-connect

Enter the following command to tail the log:

sudo journalctl -f -u confluent-connect

Log4j Properties File

A default Connect Log4j template file is provided with Confluent Platform. This properties file is located in the Confluent Platform directory etc/kafka/connect-log4j.properties. The following shows an example of what this file contains:

log4j.rootLogger=INFO, stdout, connectAppender

# Send the logs to the console.
#
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout

# Send the logs to a file, rolling the file at midnight local time. For example, the `File` option specifies the
# location of the log files (e.g. ${kafka.logs.dir}/connect.log), and at midnight local time the file is closed
# and copied in the same directory but with a filename that ends in the `DatePattern` option.
#
log4j.appender.connectAppender=org.apache.log4j.DailyRollingFileAppender
log4j.appender.connectAppender.DatePattern='.'yyyy-MM-dd-HH
log4j.appender.connectAppender.File=${kafka.logs.dir}/connect.log
log4j.appender.connectAppender.layout=org.apache.log4j.PatternLayout

# The `%X{connector.context}` parameter in the layout includes connector-specific and task-specific information
# in the log message, where appropriate. This makes it easier to identify those log messages that apply to a
# specific connector. Simply add this parameter to the log layout configuration below to include the contextual
# information.
#
connect.log.pattern=[%d] %p %m (%c:%L)%n
#connect.log.pattern=[%d] %p %X{connector.context}%m (%c:%L)%n

log4j.appender.stdout.layout.ConversionPattern=${connect.log.pattern}
log4j.appender.connectAppender.layout.ConversionPattern=${connect.log.pattern}

log4j.logger.org.apache.zookeeper=ERROR
log4j.logger.org.reflections=ERROR

You can add additional entries and change log levels by updating this configuration file.

Change Log Levels

The following sections provide information on adding or changing log levels to debug Connect and running connectors.

Use the Connect Log4j properties file

The basic Connect log4j template provided at etc/kafka/connect-log4j.properties is likely insufficient to debug issues. The following example shows a Log4j template you use to set DEBUG level for consumers, producers, and connectors. This is preferred over simply enabling DEBUG on everything, since that makes the logs verbose and harder to follow. You can also enable TRACE for connector logging to see records that are processed.

In the Log4j properties file example below, DEBUG level is configured for Connect, the worker tasks, the Datagen source connector, and the Amazon S3 Sink connector. Note that the lines that also send the logs to separate file named connect.log are commented out for this example.

# root log level (if an override to a class or package is not specified,
# it will now log at this level).
log4j.rootLogger=INFO, stdout

# Append logs to console. If the customer is using different appenders,
# update the following lines accordingly. The "%X{connector.context}"
# fragment instructs Connect to include connector- and task-specific information
# on every log message and is now recommended.log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout

# Uncomment the following lines to also send the logs to a file,
# rolling the file at midnight local time. For example, the `File` option
# specifies the location of the log files (e.g. ${kafka.logs.dir}/connect.log),
# and at midnight local time the file is closed and copied in the same
# directory but with a filename that ends in the `DatePattern` option.
#
#log4j.appender.connectAppender=org.apache.log4j.DailyRollingFileAppender
#log4j.appender.connectAppender.DatePattern='.'yyyy-MM-dd-HH
#log4j.appender.connectAppender.File=${kafka.logs.dir}/connect.log
#log4j.appender.connectAppender.layout=org.apache.log4j.PatternLayout

# The `%X{connector.context}` parameter in the layout includes connector-specific and task-specific information
# in the log message, where appropriate. This makes it easier to identify those log messages that apply to a
# specific connector. Simply add this parameter to the log layout configuration below to include the contextual
# information.
#
connect.log.pattern=[%d] %p %m (%c:%L)%n
#connect.log.pattern=[%d] %p %X{connector.context}%m (%c:%L)%n
log4j.appender.stdout.layout.ConversionPattern=${connect.log.pattern}
log4j.appender.connectAppender.layout.ConversionPattern=${connect.log.pattern}

# suppress noisy logs from dependencies
log4j.logger.org.reflections=ERROR
log4j.logger.org.eclipse.jetty=ERROR
log4j.logger.kafka=ERROR
log4j.logger.org.apache.kafka=ERROR
log4j.logger.org.apache.zookeeper=ERROR

# Uncomment the following line to debug consumers (very verbose, use carefully):
#log4j.logger.org.apache.kafka.clients.consumer=DEBUG

# Uncomment the following line to debug producers (very verbose, use carefully):
#log4j.logger.org.apache.kafka.clients.producer=DEBUG

# Uncomment the following line when enabling debug on source connectors:
log4j.logger.org.apache.kafka.connect.runtime.WorkerSourceTask=DEBUG

# Uncomment the following line when enabling debug on sink connectors:
log4j.logger.org.apache.kafka.connect.runtime.WorkerSinkTask=DEBUG

# Uncomment the following line when the problem may be with Connect, SMTs, converters:
log4j.logger.org.apache.kafka.connect=DEBUG

# When one or more connectors are not behaving correctly, enable debug logging only
# for those connectors. Optionally enable TRACE logging to see records that are processed.

# Uncomment the following line to enable debug for the Datagen connector:
log4j.logger.io.confluent.kafka.connect.datagen=DEBUG

# Uncomment the following to enable debug for the JDBC connector:
#log4j.logger.io.confluent.connect.jdbc=DEBUG

# Uncomment the following to enable debug for the Elasticsearch connector:
#log4j.logger.io.confluent.connect.elasticsearch=DEBUG

# Uncomment the following to enable debug for the for the HDFS connector:
#log4j.logger.io.confluent.connect.storage=DEBUG
#log4j.logger.io.confluent.connect.hdfs=DEBUG

# Uncomment the following to enable debug for the for the S3 connector:
log4j.logger.io.confluent.connect.storage=DEBUG
log4j.logger.io.confluent.connect.s3=DEBUG

# Uncomment the following to enable debug for the for the GCS connector:
#log4j.logger.io.confluent.connect.storage=DEBUG
#log4j.logger.io.confluent.connect.gcs=DEBUG

# Uncomment the following to enable debug for the JMS connectors (and derivatives IBM MQ, ActiveMq):
#log4j.logger.io.confluent.connect.jms=DEBUG
#log4j.logger.io.confluent.connect.ibm.mq=DEBUG
#log4j.logger.io.confluent.connect.activemq=DEBUG

# Add similar lines to enable debug for the specific connector(s):
#log4j.logger.<root package of the connector>=DEBUG

Use the Connect API

After you have Confluent Platform and Kafka Connect and your connectors running, you can check the log levels and change log levels using Connect API endpoints.

Log level changes made through the API behave differently based on the scope query parameter:

  • Worker scope (default): Changes are temporary and affect only the worker receiving the request. When the worker restarts, logging configuration reverts to what is defined in the configuration file.

  • Cluster scope (scope=cluster): In distributed mode, changes are broadcasted to all workers in the cluster and persisted to the configuration topic (config.storage.topic). Changes apply to workers currently running and to any workers that start or rejoin the cluster later.

In both cases, changes do not modify the logging configuration file (connect-log4j2.yaml). These API calls work regardless of the underlying configuration file format.

The examples are based on a Connect Log4j properties file with the following configuration properties:

log4j.rootLogger=INFO, stdout

log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout

connect.log.pattern=[%d] %p %m (%c:%L)%n

log4j.appender.stdout.layout.ConversionPattern=${connect.log.pattern}
log4j.appender.connectAppender.layout.ConversionPattern=${connect.log.pattern}

# suppress noisy logs from dependencies
log4j.logger.org.reflections=ERROR
log4j.logger.org.eclipse.jetty=ERROR
log4j.logger.kafka=ERROR
log4j.logger.org.apache.kafka=ERROR
log4j.logger.org.apache.zookeeper=ERROR

# Uncomment the following line when enabling debug on source connectors:
log4j.logger.org.apache.kafka.connect.runtime.WorkerSourceTask=DEBUG

# Uncomment the following line when enabling debug on sink connectors:
log4j.logger.org.apache.kafka.connect.runtime.WorkerSinkTask=DEBUG

# Uncomment the following line when the problem may be with Connect, SMTs, converters:
log4j.logger.org.apache.kafka.connect=DEBUG

# Uncomment the following line to enable debug for the Datagen connector:
log4j.logger.io.confluent.kafka.connect.datagen=DEBUG

# Uncomment the following lines to enable debug for the Amazon S3 connector:
log4j.logger.io.confluent.connect.storage=DEBUG
log4j.logger.io.confluent.connect.s3=DEBUG

Check the currently set log levels

Enter the following command to list all loggers with explicitly configured levels (including the root logger). Use jq for formatted JSON output.

curl -Ss http://localhost:8083/admin/loggers | jq

Example output:

{
  "io.confluent.connect.s3": {
    "level": "DEBUG",
    "last_modified": 1700000000000
  },
  "io.confluent.connect.storage": {
    "level": "DEBUG",
    "last_modified": 1700000000000
  },
  "io.confluent.kafka.connect.datagen": {
    "level": "DEBUG",
    "last_modified": null
  },
  "kafka": {
    "level": "ERROR",
    "last_modified": null
  },
  "org.apache.kafka": {
    "level": "ERROR",
    "last_modified": null
  },
  "org.apache.kafka.connect": {
    "level": "DEBUG",
    "last_modified": null
  },
  "org.apache.kafka.connect.runtime.WorkerSinkTask": {
    "level": "DEBUG",
    "last_modified": 1700150000000
  },
  "org.apache.kafka.connect.runtime.WorkerSourceTask": {
    "level": "DEBUG",
    "last_modified": null
  },
  "org.eclipse.jetty": {
    "level": "ERROR",
    "last_modified": null
  },
  "org.reflections": {
    "level": "ERROR",
    "last_modified": null
  },
  "root": {
    "level": "INFO",
    "last_modified": null
  }
}

The last_modified field shows the Unix timestamp (in milliseconds since January 1, 1970) when the worker last applied a change to each logger. It is null for loggers that have not been modified since worker startup.

Get the log level for a specific logger

Enter the following command to check the current log level for the WorkerSourceTask logger:

curl -Ss http://localhost:8083/admin/loggers/org.apache.kafka.connect.runtime.WorkerSourceTask | jq

Example output: Shows the log level and when it was last modified.

{
  "level": "DEBUG",
  "last_modified": 1700000000000
}

Change the log level for a specific logger

Enter the following command to change the log level from DEBUG to TRACE for the WorkerSourceTask logger:

curl -s -X PUT -H "Content-Type:application/json" \
http://localhost:8083/admin/loggers/org.apache.kafka.connect.runtime.WorkerSourceTask \
-d '{"level": "TRACE"}' | jq '.'

Example output: Shows the logger name that was modified with the new timestamp.

{
  "org.apache.kafka.connect.runtime.WorkerSourceTask": {
    "level": "TRACE",
    "last_modified": 1700250000000
  }
}

Note

This modifies the log level in the specific worker that receives this REST request. The request does not modify any other workers across the Connect cluster. For persistent cluster-wide changes in distributed mode, use the scope=cluster query parameter as described in Change log levels cluster-wide.

Revert the log level

Enter the following command to change the log level back to DEBUG for the WorkerSourceTask logger:

curl -s -X PUT -H "Content-Type:application/json" \
http://localhost:8083/admin/loggers/org.apache.kafka.connect.runtime.WorkerSourceTask \
-d '{"level": "DEBUG"}' | jq '.'

Example output:

{
  "org.apache.kafka.connect.runtime.WorkerSourceTask": {
    "level": "DEBUG",
    "last_modified": 1700260000000
  }
}

Change the log level for a specific connector

Enter the following command to change the log level to TRACE for the Amazon S3 sink connector:

curl -s -X PUT -H "Content-Type:application/json" \
http://localhost:8083/admin/loggers/io.confluent.connect.s3 \
-d '{"level": "TRACE"}' | jq '.'

Example output:

{
  "io.confluent.connect.s3": {
    "level": "TRACE",
    "last_modified": 1700270000000
  }
}

Change log levels cluster-wide

Enter the following command to change the log level for all workers in a distributed mode cluster. Use the scope=cluster query parameter to broadcast the change to all workers.

curl -s -X PUT -H "Content-Type:application/json" \
http://localhost:8083/admin/loggers/org.apache.kafka.connect.runtime.WorkerSinkTask?scope=cluster \
-d '{"level": "TRACE"}'

Example output:

HTTP/1.1 204 No Content

When you use scope=cluster, the API returns 204 No Content with an empty response body instead of the standard JSON response. The log level adjustment is written to the configuration topic (config.storage.topic) and asynchronously propagated to all workers in the cluster. This includes:

  • Workers currently running in the cluster

  • Workers that start after the change is made

  • Workers that rejoin the cluster after a temporary disconnection

Note

Cluster-wide log level changes may take some time to propagate to all workers, as each worker reads from the configuration topic independently. You can verify propagation by checking the last_modified timestamp using GET /admin/loggers on individual workers. The scope=cluster parameter is only meaningful in distributed mode; in standalone mode, it returns 204 No Content but only affects the single worker instance.

To verify that the cluster-wide change has been applied, check the log levels on any worker:

curl -Ss http://localhost:8083/admin/loggers/org.apache.kafka.connect.runtime.WorkerSinkTask | jq

Example output:

{
  "level": "TRACE",
  "last_modified": 1700280000000
}

The last_modified timestamp helps you verify when each worker applied the cluster-wide change. All workers should eventually show similar timestamps after the propagation completes.

Change the default listener port

KIP-495 introduced the /admin/loggers REST API endpoint that can be used to get and modify the log levels for any named loggers in the Connect worker. The admin.listeners property in the worker configuration controls where this endpoint is made available. The /admin/loggers endpoint uses the default REST API port 8083.

You can change the Connect admin.listeners property to bring up the admin/loggers endpoint on a separate port, a secure port, or disable the endpoint. You make these changes in the Connect worker configuration file.

  • To make the admin/loggers endpoint listen on a separate port (using example port 9093), add the following line to the worker configuration file:

    admin.listeners=http://localhost:9093
    
  • To set up TLS properties for a separate HTTPS listener, add the following lines to the worker configuration file:

    admin.listeners=https://localhost:9093
    admin.listeners.https.ssl.truststore.location=/path/to/truststore.jks
    admin.listeners.https.ssl.truststore.password=<truststore-password>
    admin.listeners.https.ssl.keystore.location=/path/to/keystore.jks
    admin.listeners.https.ssl.keystore.password=<keystore-password>
    
  • To disable the admin/loggers endpoint entirely, enter a blank string in the worker configuration file. For example:

    admin.listeners=
    

Use environment variables (Docker)

The logging levels for Docker are configured using environment variables. To enable DEBUG logging for a connector, use the following environment variables when starting your Confluent Platform Connect container:

CONNECT_LOG4J_LOGGERS="log4j.logger.io.confluent.<connector-name>=DEBUG"

To enable DEBUG log messages for the Connect worker, including all connectors, use the following environment variables when starting your Confluent Platform Connect container:

CONNECT_LOG4J_LOGGERS="org.apache.kafka.connect=DEBUG"

The environment variable can take a comma-separated list of key-value pairs. For example, the following variables enable DEBUG on a connector and the Connect framework:

CONNECT_LOG4J_LOGGERS="log4j.logger.io.confluent.<connector-name>=DEBUG,org.apache.kafka.connect=DEBUG"

For additional Docker logging information, see Configure Kafka Logs for Docker in Confluent Platform.

Stack Trace

You can find the error trace for a task using the Connect status API endpoint. Enter the following command to get the errors for a failed connector.

curl localhost:8083/connectors/<connector-name>/status | jq

For example:

curl localhost:8083/connectors/http/status | jq

Example output:

{
  "name": "http",
  "connector": {
    "state": "RUNNING",
    "worker_id": "127.0.0.1:8083"
  },
  "tasks": [
    {
      "id": 0,
      "state": "FAILED",
      "worker_id": "127.0.0.1:8083",
      "trace": "org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:568)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:326)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:228)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:196)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:184)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)\n\tat java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat java.lang.Thread.run(Thread.java:748)\nCaused by: org.apache.kafka.connect.errors.ConnectException: Error while processing HTTP request with Url : http://localhost:8080/api/messages, Payload : one, Error Message : Exception while processing HTTP request for a batch of 1 records., Exception : org.apache.http.conn.HttpHostConnectException: Connect to localhost:8080 [localhost/127.0.0.1, localhost/0:0:0:0:0:0:0:1] failed: Connection refused (Connection refused)\n\tat io.confluent.connect.http.writer.HttpWriterImpl.handleException(HttpWriterImpl.java:349)\n\tat io.confluent.connect.http.writer.HttpWriterImpl.sendBatch(HttpWriterImpl.java:224)\n\tat io.confluent.connect.http.writer.HttpWriterImpl.write(HttpWriterImpl.java:149)\n\tat io.confluent.connect.http.HttpSinkTask.put(HttpSinkTask.java:70)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:546)\n\t... 10 more\nCaused by: Error while processing HTTP request with Url : http://localhost:8080/api/messages, Payload : one, Error Message : Exception while processing HTTP request for a batch of 1 records., Exception : org.apache.http.conn.HttpHostConnectException: Connect to localhost:8080 [localhost/127.0.0.1, localhost/0:0:0:0:0:0:0:1] failed: Connection refused (Connection refused)\n\tat io.confluent.connect.http.writer.HttpWriterImpl.executeBatchRequest(HttpWriterImpl.java:287)\n\tat io.confluent.connect.http.writer.HttpWriterImpl.executeRequestWithBackOff(HttpWriterImpl.java:234)\n\tat io.confluent.connect.http.writer.HttpWriterImpl.sendBatch(HttpWriterImpl.java:222)\n\t... 13 more\n"
    }
  ],
  "type": "sink"
}

You can run the following command to make the output easier to read:

echo -e $(curl localhost:8083/connectors/<connector-name>/status | jq .tasks[].trace)

For example:

echo -e $(curl localhost:8083/connectors/http/status | jq .tasks[].trace)

Example output:

"org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
     at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:568)
     at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:326)
     at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:228)
     at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:196)
     at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:184)
     at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)
     at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
     at java.util.concurrent.FutureTask.run(FutureTask.java:266)
     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
     at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.connect.errors.ConnectException: Error while processing HTTP request with Url : http://localhost:8080/api/messages, Payload : one, Error Message : Exception while processing HTTP request for a batch of 1 records., Exception : org.apache.http.conn.HttpHostConnectException: Connect to localhost:8080 [localhost/127.0.0.1, localhost/0:0:0:0:0:0:0:1] failed: Connection refused (Connection refused)
     at io.confluent.connect.http.writer.HttpWriterImpl.handleException(HttpWriterImpl.java:349)
     at io.confluent.connect.http.writer.HttpWriterImpl.sendBatch(HttpWriterImpl.java:224)
     at io.confluent.connect.http.writer.HttpWriterImpl.write(HttpWriterImpl.java:149)
     at io.confluent.connect.http.HttpSinkTask.put(HttpSinkTask.java:70)
     at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:546)
     ... 10 more
Caused by: Error while processing HTTP request with Url : http://localhost:8080/api/messages, Payload : one, Error Message : Exception while processing HTTP request for a batch of 1 records., Exception : org.apache.http.conn.HttpHostConnectException: Connect to localhost:8080 [localhost/127.0.0.1, localhost/0:0:0:0:0:0:0:1] failed: Connection refused (Connection refused)
     at io.confluent.connect.http.writer.HttpWriterImpl.executeBatchRequest(HttpWriterImpl.java:287)
     at io.confluent.connect.http.writer.HttpWriterImpl.executeRequestWithBackOff(HttpWriterImpl.java:234)
     at io.confluent.connect.http.writer.HttpWriterImpl.sendBatch(HttpWriterImpl.java:222)
     ... 13 more