Kafka Connect Logging

Kafka Connect and other Confluent Platform components use the Java-based logging utility Apache Log4j to collect runtime data and record component events. The following table describes each log level. The Kafka Connect Log4j properties file is located in the Confluent Platform installation directory path etc/kafka/connect-log4j.properties.

Level Description
OFF Turns off logging.
FATAL Severe errors that cause premature termination.
ERROR Other runtime errors or unexpected conditions.
WARN Runtime situations that are undesirable or unexpected, but not necessarily wrong.
INFO Runtime events of interest at startup and shutdown.
DEBUG Detailed diagnostic information about events.
TRACE Detailed diagnostic information about everything.

By default, Connect writes INFO, WARN, ERROR, and FATAL information to standard output (stdout). When Connect starts, it writes the settings that it’s using and any WARN and ERROR (or FATAL) messages along the way. This could be a missing configuration, a broken connector, and so on. If you want to see all events, and each step Connect and a connector take from startup to shutdown, you can set DEBUG or TRACE logging levels.

View Connect Logs

Kafka Connect writes logs to stdout by default. The following sections provide commands that allow you to view the Connect log.

Docker

Enter the following command to tail the Connect log for Confluent Platform running on Docker:

docker logs -f kafka-connect

For information about how to set up logging levels with Docker, see Changing Docker log levels.

Confluent CLI

Enter the following command to display a snapshot of the log for Confluent Platform running locally:

confluent local services connect log

You can pipe the log through grep to view specific log information. For example, to see S3 connector messages, you could enter the following command:

confluent local services connect log | grep s3

When you run the standard log command, the connect.stdout path is shown in the displayed output. You can use this path in the log command to tail the log and display any new messages that are appended to the log. For example:

confluent local services connect log -f

Example output:

[2020-07-07 15:04:29,333] DEBUG Progress event: HTTP_REQUEST_COMPLETED_EVENT, bytes: 0 (io.confluent.connect.s3.storage.S3OutputStream:286)
[2020-07-07 15:04:29,333] DEBUG Progress event: HTTP_RESPONSE_STARTED_EVENT, bytes: 0 (io.confluent.connect.s3.storage.S3OutputStream:286)
[2020-07-07 15:04:29,333] DEBUG Progress event: HTTP_RESPONSE_COMPLETED_EVENT, bytes: 0 (io.confluent.connect.s3.storage.S3OutputStream:286)
[2020-07-07 15:04:29,333] DEBUG Progress event: CLIENT_REQUEST_SUCCESS_EVENT, bytes: 0 (io.confluent.connect.s3.storage.S3OutputStream:286)
[2020-07-07 15:04:29,334] DEBUG Progress event: REQUEST_BYTE_TRANSFER_EVENT, bytes: 406 (io.confluent.connect.s3.storage.S3OutputStream:286)
[2020-07-07 15:04:29,334] DEBUG Progress event: TRANSFER_PART_COMPLETED_EVENT, bytes: 0 (io.confluent.connect.s3.storage.S3OutputStream:286)

Scripted Confluent Platform startup

Connect log files are written to /var/log/confluent/connect when using Confluent Platform systemd Service Unit Files to run Confluent Platform. The systemd journal service writes the logs. Enter the following journalctl commands to view log messages for Kafka Connect:

sudo journalctl -u confluent-connect

Enter the following command to tail the log:

sudo journalctl -f -u confluent-connect

Log4j Properties File

A default Connect Log4j template file is provided with Confluent Platform. This properties file is located in the Confluent Platform directory etc/kafka/connect-log4j.properties. The following shows an example of what this file contains:

log4j.rootLogger=INFO, stdout, connectAppender

# Send the logs to the console.
#
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout

# Send the logs to a file, rolling the file at midnight local time. For example, the `File` option specifies the
# location of the log files (e.g. ${kafka.logs.dir}/connect.log), and at midnight local time the file is closed
# and copied in the same directory but with a filename that ends in the `DatePattern` option.
#
log4j.appender.connectAppender=org.apache.log4j.DailyRollingFileAppender
log4j.appender.connectAppender.DatePattern='.'yyyy-MM-dd-HH
log4j.appender.connectAppender.File=${kafka.logs.dir}/connect.log
log4j.appender.connectAppender.layout=org.apache.log4j.PatternLayout

# The `%X{connector.context}` parameter in the layout includes connector-specific and task-specific information
# in the log message, where appropriate. This makes it easier to identify those log messages that apply to a
# specific connector. Simply add this parameter to the log layout configuration below to include the contextual
# information.
#
connect.log.pattern=[%d] %p %m (%c:%L)%n
#connect.log.pattern=[%d] %p %X{connector.context}%m (%c:%L)%n

log4j.appender.stdout.layout.ConversionPattern=${connect.log.pattern}
log4j.appender.connectAppender.layout.ConversionPattern=${connect.log.pattern}

log4j.logger.org.apache.zookeeper=ERROR
log4j.logger.org.reflections=ERROR

You can add additional entries and change log levels by updating this configuration file.

Change Log Levels

The following sections provide information on adding or changing log levels to debug Connect and running connectors.

Use the Connect Log4j properties file

The basic Connect log4j template provided at etc/kafka/connect-log4j.properties is likely insufficient to debug issues. The following example shows a Log4j template you use to set DEBUG level for consumers, producers, and connectors. This is preferred over simply enabling DEBUG on everything, since that makes the logs verbose and harder to follow. You can also enable TRACE for connector logging to see records that are processed.

In the Log4j properties file example below, DEBUG level is configured for Connect, the worker tasks, the Datagen source connector, and the Amazon S3 sink connector. Note that the lines that also send the logs to separate file named connect.log are commented out for this example.

# root log level (if an override to a class or package is not specified,
# it will now log at this level).
log4j.rootLogger=INFO, stdout

# Append logs to console. If the customer is using different appenders,
# update the following lines accordingly. The "%X{connector.context}"
# fragment instructs Connect to include connector- and task-specific information
# on every log message and is now recommended.log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout

# Uncomment the following lines to also send the logs to a file,
# rolling the file at midnight local time. For example, the `File` option
# specifies the location of the log files (e.g. ${kafka.logs.dir}/connect.log),
# and at midnight local time the file is closed and copied in the same
# directory but with a filename that ends in the `DatePattern` option.
#
#log4j.appender.connectAppender=org.apache.log4j.DailyRollingFileAppender
#log4j.appender.connectAppender.DatePattern='.'yyyy-MM-dd-HH
#log4j.appender.connectAppender.File=${kafka.logs.dir}/connect.log
#log4j.appender.connectAppender.layout=org.apache.log4j.PatternLayout

# The `%X{connector.context}` parameter in the layout includes connector-specific and task-specific information
# in the log message, where appropriate. This makes it easier to identify those log messages that apply to a
# specific connector. Simply add this parameter to the log layout configuration below to include the contextual
# information.
#
connect.log.pattern=[%d] %p %m (%c:%L)%n
#connect.log.pattern=[%d] %p %X{connector.context}%m (%c:%L)%n
log4j.appender.stdout.layout.ConversionPattern=${connect.log.pattern}
log4j.appender.connectAppender.layout.ConversionPattern=${connect.log.pattern}

# suppress noisy logs from dependencies
log4j.logger.org.reflections=ERROR
log4j.logger.org.eclipse.jetty=ERROR
log4j.logger.kafka=ERROR
log4j.logger.org.apache.kafka=ERROR
log4j.logger.org.apache.zookeeper=ERROR

# Uncomment the following line to debug consumers (very verbose, use carefully):
#log4j.logger.org.apache.kafka.clients.consumer=DEBUG

# Uncomment the following line to debug producers (very verbose, use carefully):
#log4j.logger.org.apache.kafka.clients.producer=DEBUG

# Uncomment the following line when enabling debug on source connectors:
log4j.logger.org.apache.kafka.connect.runtime.WorkerSourceTask=DEBUG

# Uncomment the following line when enabling debug on sink connectors:
log4j.logger.org.apache.kafka.connect.runtime.WorkerSinkTask=DEBUG

# Uncomment the following line when the problem may be with Connect, SMTs, converters:
log4j.logger.org.apache.kafka.connect=DEBUG

# When one or more connectors are not behaving correctly, enable debug logging only
# for those connectors. Optionally enable TRACE logging to see records that are processed.

# Uncomment the following line to enable debug for the Datagen connector:
log4j.logger.io.confluent.kafka.connect.datagen=DEBUG

# Uncomment the following to enable debug for the JDBC connector:
#log4j.logger.io.confluent.connect.jdbc=DEBUG

# Uncomment the following to enable debug for the Elasticsearch connector:
#log4j.logger.io.confluent.connect.elasticsearch=DEBUG

# Uncomment the following to enable debug for the for the HDFS connector:
#log4j.logger.io.confluent.connect.storage=DEBUG
#log4j.logger.io.confluent.connect.hdfs=DEBUG

# Uncomment the following to enable debug for the for the S3 connector:
log4j.logger.io.confluent.connect.storage=DEBUG
log4j.logger.io.confluent.connect.s3=DEBUG

# Uncomment the following to enable debug for the for the GCS connector:
#log4j.logger.io.confluent.connect.storage=DEBUG
#log4j.logger.io.confluent.connect.gcs=DEBUG

# Uncomment the following to enable debug for the JMS connectors (and derivatives IBM MQ, ActiveMq):
#log4j.logger.io.confluent.connect.jms=DEBUG
#log4j.logger.io.confluent.connect.ibm.mq=DEBUG
#log4j.logger.io.confluent.connect.activemq=DEBUG

# Add similar lines to enable debug for the specific connector(s):
#log4j.logger.<root package of the connector>=DEBUG

Use the Connect API

After you have Confluent Platform and Kafka Connect and your connectors running, you can check the log levels and change log levels using Connect API endpoints.

Note

Changes made through the API are not permanent. That is, any changes made using the API do not change properties in the connect-log4j.properties file. When the worker is restarted, logging reverts back to using the logging properties defined in the file. Also, the API only changes logging on the worker that’s accessed, not across an entire distributed Connect cluster.

The examples are based on a Connect Log4j properties file with the following configuration properties:

log4j.rootLogger=INFO, stdout

log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout

connect.log.pattern=[%d] %p %m (%c:%L)%n

log4j.appender.stdout.layout.ConversionPattern=${connect.log.pattern}
log4j.appender.connectAppender.layout.ConversionPattern=${connect.log.pattern}

# suppress noisy logs from dependencies
log4j.logger.org.reflections=ERROR
log4j.logger.org.eclipse.jetty=ERROR
log4j.logger.kafka=ERROR
log4j.logger.org.apache.kafka=ERROR
log4j.logger.org.apache.zookeeper=ERROR

# Uncomment the following line when enabling debug on source connectors:
log4j.logger.org.apache.kafka.connect.runtime.WorkerSourceTask=DEBUG

# Uncomment the following line when enabling debug on sink connectors:
log4j.logger.org.apache.kafka.connect.runtime.WorkerSinkTask=DEBUG

# Uncomment the following line when the problem may be with Connect, SMTs, converters:
log4j.logger.org.apache.kafka.connect=DEBUG

# Uncomment the following line to enable debug for the Datagen connector:
log4j.logger.io.confluent.kafka.connect.datagen=DEBUG

# Uncomment the following lines to enable debug for the Amazon S3 connector:
log4j.logger.io.confluent.connect.storage=DEBUG
log4j.logger.io.confluent.connect.s3=DEBUG

Check log levels

Enter the following command to check the current log levels. Use jq to print the output as JSON.

curl -Ss http://localhost:8083/admin/loggers | jq

Example output:

{
  "io.confluent.connect.s3": {
    "level": "DEBUG"
  },
  "io.confluent.connect.storage": {
    "level": "DEBUG"
  },
  "io.confluent.kafka.connect.datagen": {
    "level": "DEBUG"
  },
  "kafka": {
    "level": "ERROR"
  },
  "org.apache.kafka": {
    "level": "ERROR"
  },
  "org.apache.kafka.connect": {
    "level": "DEBUG"
  },
  "org.apache.kafka.connect.runtime.WorkerSinkTask": {
    "level": "DEBUG"
  },
  "org.apache.kafka.connect.runtime.WorkerSourceTask": {
    "level": "DEBUG"
  },
  "org.apache.zookeeper": {
    "level": "ERROR"
  },
  "org.eclipse.jetty": {
    "level": "ERROR"
  },
  "org.reflections": {
    "level": "ERROR"
  },
  "root": {
    "level": "INFO"
  }
}

Get the log level for a specific logger

Enter the following command to check the log level for the WorkerSourceTask logger:

curl -Ss http://localhost:8083/admin/loggers/org.apache.kafka.connect.runtime.WorkerSourceTask | jq

Example output:

{
  "level": "DEBUG"
}

Change the log level for a specific logger

Enter the following command to change the log level from DEBUG to TRACE for the WorkerSourceTask logger:

curl -s -X PUT -H "Content-Type:application/json" \
http://localhost:8083/admin/loggers/org.apache.kafka.connect.runtime.WorkerSourceTask \
-d '{"level": "TRACE"}' | jq '.'

Example output:

[
  "org.apache.kafka.connect.runtime.WorkerSourceTask"
]

Note

This modifies the log level in the specific worker that receives this REST request. The request does not modify any other workers across the Connect cluster.

Revert the log level

Enter the following command to change the log level back to DEBUG for the WorkerSourceTask logger:

curl -s -X PUT -H "Content-Type:application/json" \
http://localhost:8083/admin/loggers/org.apache.kafka.connect.runtime.WorkerSourceTask \
-d '{"level": "DEBUG"}' | jq '.'

Example output:

[
  "org.apache.kafka.connect.runtime.WorkerSourceTask"
]

Note

Log levels set by the admin/loggers REST API do not persist when the worker restarts. Restarting a worker will undo all changes made by the REST commands.

Change the log level for a specific connector

Enter the following command to change the log level to TRACE for the Amazon S3 sink connector:

curl -s -X PUT -H "Content-Type:application/json" \
http://localhost:8083/admin/loggers/io.confluent.connect.s3 \
-d '{"level": "TRACE"}' | jq '.'

Example output:

[
  "io.confluent.connect.s3",
  "io.confluent.connect.s3.S3SinkConnector",
  "io.confluent.connect.s3.S3SinkConnectorConfig",
  "io.confluent.connect.s3.S3SinkTask",
  "io.confluent.connect.s3.TopicPartitionWriter",
  "io.confluent.connect.s3.format.avro.AvroRecordWriterProvider",
  "io.confluent.connect.s3.storage.S3OutputStream",
  "io.confluent.connect.s3.storage.S3Storage",
  "io.confluent.connect.s3.util.Version"
]

Change the default listener port

KIP-495 introduced the /admin/loggers REST API endpoint that can be used to get and modify the log levels for any named loggers in the Connect worker. The admin.listeners property in the worker configuration controls where this endpoint is made available. The /admin/loggers endpoint uses the default REST API port 8083.

You can change the Connect admin.listeners property to bring up the admin/loggers endpoint on a separate port, a secure port, or disable the endpoint. You make these changes in the Connect worker configuration file.

  • To make the admin/loggers endpoint listen on a separate port (using example port 9093), add the following line to the worker configuration file:

    admin.listeners=http://localhost:9093
    
  • To set up TLS properties for a separate HTTPS listener, add the following lines to the worker configuration file:

    admin.listeners=https://localhost:9093
    admin.listeners.https.ssl.truststore.location=/path/to/truststore.jks
    admin.listeners.https.ssl.truststore.password=<truststore-password>
    admin.listeners.https.ssl.keystore.location=/path/to/keystore.jks
    admin.listeners.https.ssl.keystore.password=<keystore-password>
    
  • To disable the admin/loggers endpoint entirely, enter a blank string in the worker configuration file. For example:

    admin.listeners=
    

Use environment variables (Docker)

The logging levels for Docker are configured using environment variables. To enable DEBUG logging for a connector, use the following environment variables when starting your Confluent Platform Connect container:

CONNECT_LOG4J_LOGGERS="log4j.logger.io.confluent.<connector-name>=DEBUG"

To enable DEBUG log messages for the Connect worker, including all connectors, use the following environment variables when starting your Confluent Platform Connect container:

CONNECT_LOG4J_LOGGERS="org.apache.kafka.connect=DEBUG"

The environment variable can take a comma-separated list of key-value pairs. For example, the following variables enable DEBUG on a connector and the Connect framework:

CONNECT_LOG4J_LOGGERS="log4j.logger.io.confluent.<connector-name>=DEBUG,org.apache.kafka.connect=DEBUG"

For additional Docker logging information, see Configure Docker Logging.

Stack Trace

You can find the error trace for a task using the Connect status API endpoint. Enter the following command to get the errors for a failed connector.

curl localhost:8083/connectors/<connector-name>/status | jq

For example:

curl localhost:8083/connectors/http/status | jq

Example output:

{
  "name": "http",
  "connector": {
    "state": "RUNNING",
    "worker_id": "127.0.0.1:8083"
  },
  "tasks": [
    {
      "id": 0,
      "state": "FAILED",
      "worker_id": "127.0.0.1:8083",
      "trace": "org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:568)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:326)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:228)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:196)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:184)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)\n\tat java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat java.lang.Thread.run(Thread.java:748)\nCaused by: org.apache.kafka.connect.errors.ConnectException: Error while processing HTTP request with Url : http://localhost:8080/api/messages, Payload : one, Error Message : Exception while processing HTTP request for a batch of 1 records., Exception : org.apache.http.conn.HttpHostConnectException: Connect to localhost:8080 [localhost/127.0.0.1, localhost/0:0:0:0:0:0:0:1] failed: Connection refused (Connection refused)\n\tat io.confluent.connect.http.writer.HttpWriterImpl.handleException(HttpWriterImpl.java:349)\n\tat io.confluent.connect.http.writer.HttpWriterImpl.sendBatch(HttpWriterImpl.java:224)\n\tat io.confluent.connect.http.writer.HttpWriterImpl.write(HttpWriterImpl.java:149)\n\tat io.confluent.connect.http.HttpSinkTask.put(HttpSinkTask.java:70)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:546)\n\t... 10 more\nCaused by: Error while processing HTTP request with Url : http://localhost:8080/api/messages, Payload : one, Error Message : Exception while processing HTTP request for a batch of 1 records., Exception : org.apache.http.conn.HttpHostConnectException: Connect to localhost:8080 [localhost/127.0.0.1, localhost/0:0:0:0:0:0:0:1] failed: Connection refused (Connection refused)\n\tat io.confluent.connect.http.writer.HttpWriterImpl.executeBatchRequest(HttpWriterImpl.java:287)\n\tat io.confluent.connect.http.writer.HttpWriterImpl.executeRequestWithBackOff(HttpWriterImpl.java:234)\n\tat io.confluent.connect.http.writer.HttpWriterImpl.sendBatch(HttpWriterImpl.java:222)\n\t... 13 more\n"
    }
  ],
  "type": "sink"
}

You can run the following command to make the output easier to read:

echo -e $(curl localhost:8083/connectors/<connector-name>/status | jq .tasks[].trace)

For example:

echo -e $(curl localhost:8083/connectors/http/status | jq .tasks[].trace)

Example output:

"org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
     at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:568)
     at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:326)
     at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:228)
     at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:196)
     at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:184)
     at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)
     at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
     at java.util.concurrent.FutureTask.run(FutureTask.java:266)
     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
     at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.connect.errors.ConnectException: Error while processing HTTP request with Url : http://localhost:8080/api/messages, Payload : one, Error Message : Exception while processing HTTP request for a batch of 1 records., Exception : org.apache.http.conn.HttpHostConnectException: Connect to localhost:8080 [localhost/127.0.0.1, localhost/0:0:0:0:0:0:0:1] failed: Connection refused (Connection refused)
     at io.confluent.connect.http.writer.HttpWriterImpl.handleException(HttpWriterImpl.java:349)
     at io.confluent.connect.http.writer.HttpWriterImpl.sendBatch(HttpWriterImpl.java:224)
     at io.confluent.connect.http.writer.HttpWriterImpl.write(HttpWriterImpl.java:149)
     at io.confluent.connect.http.HttpSinkTask.put(HttpSinkTask.java:70)
     at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:546)
     ... 10 more
Caused by: Error while processing HTTP request with Url : http://localhost:8080/api/messages, Payload : one, Error Message : Exception while processing HTTP request for a batch of 1 records., Exception : org.apache.http.conn.HttpHostConnectException: Connect to localhost:8080 [localhost/127.0.0.1, localhost/0:0:0:0:0:0:0:1] failed: Connection refused (Connection refused)
     at io.confluent.connect.http.writer.HttpWriterImpl.executeBatchRequest(HttpWriterImpl.java:287)
     at io.confluent.connect.http.writer.HttpWriterImpl.executeRequestWithBackOff(HttpWriterImpl.java:234)
     at io.confluent.connect.http.writer.HttpWriterImpl.sendBatch(HttpWriterImpl.java:222)
     ... 13 more