Kafka Connect Logging¶
Kafka Connect and other Confluent Platform components use the Java-based logging utility
Apache Log4j to collect runtime data
and record component events. The following table describes each log level. The
Kafka Connect Log4j properties file is located in the Confluent Platform installation
directory path etc/kafka/connect-log4j.properties
.
Level | Description |
---|---|
OFF | Turns off logging. |
FATAL | Severe errors that cause premature termination. |
ERROR | Other runtime errors or unexpected conditions. |
WARN | Runtime situations that are undesirable or unexpected, but not necessarily wrong. |
INFO | Runtime events of interest at startup and shutdown. |
DEBUG | Detailed diagnostic information about events. |
TRACE | Detailed diagnostic information about everything. |
By default, Connect writes INFO
, WARN
, ERROR
, and FATAL
information to standard output (stdout). When Connect starts, it writes the
settings that it’s using and any WARN
and ERROR
(or FATAL
) messages
along the way. This could be a missing configuration, a broken connector, and so
on. If you want to see all events, and each step Connect and a connector
take from startup to shutdown, you can set DEBUG
or TRACE
logging
levels.
View Connect Logs¶
Kafka Connect writes logs to stdout
by default. The following sections
provide commands that allow you to view the Connect log.
Docker¶
Enter the following command to tail the Connect log for Confluent Platform running on Docker:
docker logs -f kafka-connect
For information about how to set up logging levels with Docker, see Changing Docker log levels.
Confluent CLI¶
Enter the following command to display a snapshot of the log for Confluent Platform running locally:
confluent local services connect log
You can pipe the log through grep
to view specific log information. For
example, to see S3 connector messages, you could enter the following command:
confluent local services connect log | grep s3
When you run the standard log command, the connect.stdout
path is shown in
the displayed output. You can use this path in the log command to tail the log
and display any new messages that are appended to the log. For example:
confluent local services connect log -f
Example output:
[2020-07-07 15:04:29,333] DEBUG Progress event: HTTP_REQUEST_COMPLETED_EVENT, bytes: 0 (io.confluent.connect.s3.storage.S3OutputStream:286)
[2020-07-07 15:04:29,333] DEBUG Progress event: HTTP_RESPONSE_STARTED_EVENT, bytes: 0 (io.confluent.connect.s3.storage.S3OutputStream:286)
[2020-07-07 15:04:29,333] DEBUG Progress event: HTTP_RESPONSE_COMPLETED_EVENT, bytes: 0 (io.confluent.connect.s3.storage.S3OutputStream:286)
[2020-07-07 15:04:29,333] DEBUG Progress event: CLIENT_REQUEST_SUCCESS_EVENT, bytes: 0 (io.confluent.connect.s3.storage.S3OutputStream:286)
[2020-07-07 15:04:29,334] DEBUG Progress event: REQUEST_BYTE_TRANSFER_EVENT, bytes: 406 (io.confluent.connect.s3.storage.S3OutputStream:286)
[2020-07-07 15:04:29,334] DEBUG Progress event: TRANSFER_PART_COMPLETED_EVENT, bytes: 0 (io.confluent.connect.s3.storage.S3OutputStream:286)
For the command reference, see confluent local services connect log.
Scripted Confluent Platform startup¶
Connect log files are written to /var/log/confluent/connect
when using
Confluent Platform systemd Service Unit Files
to run Confluent Platform. The systemd
journal service writes the logs. Enter the
following journalctl
commands to view log messages for Kafka Connect:
sudo journalctl -u confluent-connect
Enter the following command to tail the log:
sudo journalctl -f -u confluent-connect
Log4j Properties File¶
A default Connect Log4j template file is provided with Confluent Platform. This properties
file is located in the Confluent Platform directory etc/kafka/connect-log4j.properties
.
The following shows an example of what this file contains:
log4j.rootLogger=INFO, stdout
# Send the logs to the console.
#
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
# Send the logs to a file, rolling the file at midnight local time. For example, the `File` option specifies the
# location of the log files (e.g. ${kafka.logs.dir}/connect.log), and at midnight local time the file is closed
# and copied in the same directory but with a filename that ends in the `DatePattern` option.
#
log4j.appender.connectAppender=org.apache.log4j.DailyRollingFileAppender
log4j.appender.connectAppender.DatePattern='.'yyyy-MM-dd-HH
log4j.appender.connectAppender.File=${kafka.logs.dir}/connect.log
log4j.appender.connectAppender.layout=org.apache.log4j.PatternLayout
# The `%X{connector.context}` parameter in the layout includes connector-specific and task-specific information
# in the log message, where appropriate. This makes it easier to identify those log messages that apply to a
# specific connector. Simply add this parameter to the log layout configuration below to include the contextual
# information.
#
connect.log.pattern=[%d] %p %m (%c:%L)%n
#connect.log.pattern=[%d] %p %X{connector.context}%m (%c:%L)%n
log4j.appender.stdout.layout.ConversionPattern=${connect.log.pattern}
log4j.appender.connectAppender.layout.ConversionPattern=${connect.log.pattern}
log4j.logger.org.apache.zookeeper=ERROR
log4j.logger.org.reflections=ERROR
You can add additional entries and change log levels by updating this configuration file.
Change Log Levels¶
The following sections provide information on adding or changing log levels to debug Connect and running connectors.
Use the Connect Log4j properties file¶
The basic Connect log4j template provided at
etc/kafka/connect-log4j.properties
is likely insufficient to debug issues.
The following example shows a Log4j template you use to set DEBUG level for
consumers, producers, and connectors. This is preferred over simply enabling
DEBUG on everything, since that makes the logs verbose and harder to follow. You
can also enable TRACE for connector logging to see records that are
processed.
In the Log4j properties file example below, DEBUG level is configured for
Connect, the worker tasks, the Datagen source connector, and the Amazon S3
sink connector. Note that the lines that also send the logs to separate file named connect.log
are commented out for this example.
# root log level (if an override to a class or package is not specified,
# it will now log at this level).
log4j.rootLogger=INFO, stdout
# Append logs to console. If the customer is using different appenders,
# update the following lines accordingly. The "%X{connector.context}"
# fragment instructs Connect to include connector- and task-specific information
# on every log message and is now recommended.log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
# Uncomment the following lines to also send the logs to a file,
# rolling the file at midnight local time. For example, the `File` option
# specifies the location of the log files (e.g. ${kafka.logs.dir}/connect.log),
# and at midnight local time the file is closed and copied in the same
# directory but with a filename that ends in the `DatePattern` option.
#
#log4j.appender.connectAppender=org.apache.log4j.DailyRollingFileAppender
#log4j.appender.connectAppender.DatePattern='.'yyyy-MM-dd-HH
#log4j.appender.connectAppender.File=${kafka.logs.dir}/connect.log
#log4j.appender.connectAppender.layout=org.apache.log4j.PatternLayout
# The `%X{connector.context}` parameter in the layout includes connector-specific and task-specific information
# in the log message, where appropriate. This makes it easier to identify those log messages that apply to a
# specific connector. Simply add this parameter to the log layout configuration below to include the contextual
# information.
#
connect.log.pattern=[%d] %p %m (%c:%L)%n
#connect.log.pattern=[%d] %p %X{connector.context}%m (%c:%L)%n
log4j.appender.stdout.layout.ConversionPattern=${connect.log.pattern}
log4j.appender.connectAppender.layout.ConversionPattern=${connect.log.pattern}
# suppress noisy logs from dependencies
log4j.logger.org.reflections=ERROR
log4j.logger.org.eclipse.jetty=ERROR
log4j.logger.kafka=ERROR
log4j.logger.org.apache.kafka=ERROR
log4j.logger.org.apache.zookeeper=ERROR
# Uncomment the following line to debug consumers (very verbose, use carefully):
#log4j.logger.org.apache.kafka.clients.consumer=DEBUG
# Uncomment the following line to debug producers (very verbose, use carefully):
#log4j.logger.org.apache.kafka.clients.producer=DEBUG
# Uncomment the following line when enabling debug on source connectors:
log4j.logger.org.apache.kafka.connect.runtime.WorkerSourceTask=DEBUG
# Uncomment the following line when enabling debug on sink connectors:
log4j.logger.org.apache.kafka.connect.runtime.WorkerSinkTask=DEBUG
# Uncomment the following line when the problem may be with Connect, SMTs, converters:
log4j.logger.org.apache.kafka.connect=DEBUG
# When one or more connectors are not behaving correctly, enable debug logging only
# for those connectors. Optionally enable TRACE logging to see records that are processed.
# Uncomment the following line to enable debug for the Datagen connector:
log4j.logger.io.confluent.kafka.connect.datagen=DEBUG
# Uncomment the following to enable debug for the JDBC connector:
#log4j.logger.io.confluent.connect.jdbc=DEBUG
# Uncomment the following to enable debug for the Elasticsearch connector:
#log4j.logger.io.confluent.connect.elasticsearch=DEBUG
# Uncomment the following to enable debug for the for the HDFS connector:
#log4j.logger.io.confluent.connect.storage=DEBUG
#log4j.logger.io.confluent.connect.hdfs=DEBUG
# Uncomment the following to enable debug for the for the S3 connector:
log4j.logger.io.confluent.connect.storage=DEBUG
log4j.logger.io.confluent.connect.s3=DEBUG
# Uncomment the following to enable debug for the for the GCS connector:
#log4j.logger.io.confluent.connect.storage=DEBUG
#log4j.logger.io.confluent.connect.gcs=DEBUG
# Uncomment the following to enable debug for the JMS connectors (and derivatives IBM MQ, ActiveMq):
#log4j.logger.io.confluent.connect.jms=DEBUG
#log4j.logger.io.confluent.connect.ibm.mq=DEBUG
#log4j.logger.io.confluent.connect.activemq=DEBUG
# Add similar lines to enable debug for the specific connector(s):
#log4j.logger.<root package of the connector>=DEBUG
Use the Connect API¶
After you have Confluent Platform and Kafka Connect and your connectors running, you can check the log levels and change log levels using Connect API endpoints.
Note
Changes made through the API are not permanent. That is, any changes made
using the API do not change properties in the connect-log4j.properties
file. When the worker is restarted, logging reverts back to using the logging
properties defined in the file. Also, the API only changes logging on the
worker that’s accessed, not across an entire distributed Connect cluster.
The examples are based on a Connect Log4j properties file with the following configuration properties:
log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
connect.log.pattern=[%d] %p %m (%c:%L)%n
log4j.appender.stdout.layout.ConversionPattern=${connect.log.pattern}
log4j.appender.connectAppender.layout.ConversionPattern=${connect.log.pattern}
# suppress noisy logs from dependencies
log4j.logger.org.reflections=ERROR
log4j.logger.org.eclipse.jetty=ERROR
log4j.logger.kafka=ERROR
log4j.logger.org.apache.kafka=ERROR
log4j.logger.org.apache.zookeeper=ERROR
# Uncomment the following line when enabling debug on source connectors:
log4j.logger.org.apache.kafka.connect.runtime.WorkerSourceTask=DEBUG
# Uncomment the following line when enabling debug on sink connectors:
log4j.logger.org.apache.kafka.connect.runtime.WorkerSinkTask=DEBUG
# Uncomment the following line when the problem may be with Connect, SMTs, converters:
log4j.logger.org.apache.kafka.connect=DEBUG
# Uncomment the following line to enable debug for the Datagen connector:
log4j.logger.io.confluent.kafka.connect.datagen=DEBUG
# Uncomment the following lines to enable debug for the Amazon S3 connector:
log4j.logger.io.confluent.connect.storage=DEBUG
log4j.logger.io.confluent.connect.s3=DEBUG
Check log levels¶
Enter the following command to check the current log levels. Use jq
to print
the output as JSON.
curl -Ss http://localhost:8083/admin/loggers | jq
Example output:
{
"io.confluent.connect.s3": {
"level": "DEBUG"
},
"io.confluent.connect.storage": {
"level": "DEBUG"
},
"io.confluent.kafka.connect.datagen": {
"level": "DEBUG"
},
"kafka": {
"level": "ERROR"
},
"org.apache.kafka": {
"level": "ERROR"
},
"org.apache.kafka.connect": {
"level": "DEBUG"
},
"org.apache.kafka.connect.runtime.WorkerSinkTask": {
"level": "DEBUG"
},
"org.apache.kafka.connect.runtime.WorkerSourceTask": {
"level": "DEBUG"
},
"org.apache.zookeeper": {
"level": "ERROR"
},
"org.eclipse.jetty": {
"level": "ERROR"
},
"org.reflections": {
"level": "ERROR"
},
"root": {
"level": "INFO"
}
}
Get the log level for a specific logger¶
Enter the following command to check the log level for the WorkerSourceTask
logger:
curl -Ss http://localhost:8083/admin/loggers/org.apache.kafka.connect.runtime.WorkerSourceTask | jq
Example output:
{
"level": "DEBUG"
}
Change the log level for a specific logger¶
Enter the following command to change the log level from DEBUG to TRACE
for the WorkerSourceTask
logger:
curl -s -X PUT -H "Content-Type:application/json" \
http://localhost:8083/admin/loggers/org.apache.kafka.connect.runtime.WorkerSourceTask \
-d '{"level": "TRACE"}' | jq '.'
Example output:
[
"org.apache.kafka.connect.runtime.WorkerSourceTask"
]
Note
This modifies the log level in the specific worker that receives this REST request. The request does not modify any other workers across the Connect cluster.
Revert the log level¶
Enter the following command to change the log level back to DEBUG for the
WorkerSourceTask
logger:
curl -s -X PUT -H "Content-Type:application/json" \
http://localhost:8083/admin/loggers/org.apache.kafka.connect.runtime.WorkerSourceTask \
-d '{"level": "DEBUG"}' | jq '.'
Example output:
[
"org.apache.kafka.connect.runtime.WorkerSourceTask"
]
Note
Log levels set by the admin/loggers
REST API do not persist when the worker restarts. Restarting a worker will undo all changes made by the REST commands.
Change the log level for a specific connector¶
Enter the following command to change the log level to TRACE for the Amazon S3 sink connector:
curl -s -X PUT -H "Content-Type:application/json" \
http://localhost:8083/admin/loggers/io.confluent.connect.s3 \
-d '{"level": "TRACE"}' | jq '.'
Example output:
[
"io.confluent.connect.s3",
"io.confluent.connect.s3.S3SinkConnector",
"io.confluent.connect.s3.S3SinkConnectorConfig",
"io.confluent.connect.s3.S3SinkTask",
"io.confluent.connect.s3.TopicPartitionWriter",
"io.confluent.connect.s3.format.avro.AvroRecordWriterProvider",
"io.confluent.connect.s3.storage.S3OutputStream",
"io.confluent.connect.s3.storage.S3Storage",
"io.confluent.connect.s3.util.Version"
]
Change the default listener port¶
KIP-495
introduced the /admin/loggers
REST API endpoint that can be used to get and
modify the log levels for any named loggers in the Connect worker. The
admin.listeners
property in the worker configuration controls where this
endpoint is made available. The /admin/loggers
endpoint uses the default
REST API port 8083
.
You can change the Connect admin.listeners
property to bring up the
admin/loggers
endpoint on a separate port, a secure port, or disable the
endpoint. You make these changes in the Connect worker configuration file.
To make the admin/loggers endpoint listen on a separate port (using example port
9093
), add the following line to the worker configuration file:admin.listeners=http://localhost:9093
To set up SSL properties for a separate HTTPS listener, add the following lines to the worker configuration file:
admin.listeners=https://localhost:9093 admin.listeners.https.ssl.truststore.location=/path/to/truststore.jks admin.listeners.https.ssl.truststore.password=<truststore-password> admin.listeners.https.ssl.keystore.location=/path/to/keystore.jks admin.listeners.https.ssl.keystore.password=<keystore-password>
To disable the admin/loggers endpoint entirely, enter a blank string in the worker configuration file. For example:
admin.listeners=
Use environment variables (Docker)¶
The logging levels for Docker are configured using environment variables. To enable DEBUG logging for a connector, use the following environment variables when starting your Confluent Platform Connect container:
CONNECT_LOG4J_LOGGERS="log4j.logger.io.confluent.<connector-name>=DEBUG"
To enable DEBUG log messages for the Connect worker, including all connectors, use the following environment variables when starting your Confluent Platform Connect container:
CONNECT_LOG4J_LOGGERS="org.apache.kafka.connect=DEBUG"
The environment variable can take a comma-separated list of key-value pairs. For example, the following variables enable DEBUG on a connector and the Connect framework:
CONNECT_LOG4J_LOGGERS="log4j.logger.io.confluent.<connector-name>=DEBUG,org.apache.kafka.connect=DEBUG"
For additional Docker logging information, see Configure Docker Logging.
Stack Trace¶
You can find the error trace for a task using the Connect status API endpoint. Enter the following command to get the errors for a failed connector.
curl localhost:8083/connectors/<connector-name>/status | jq
For example:
curl localhost:8083/connectors/http/status | jq
Example output:
{
"name": "http",
"connector": {
"state": "RUNNING",
"worker_id": "127.0.0.1:8083"
},
"tasks": [
{
"id": 0,
"state": "FAILED",
"worker_id": "127.0.0.1:8083",
"trace": "org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:568)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:326)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:228)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:196)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:184)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)\n\tat java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat java.lang.Thread.run(Thread.java:748)\nCaused by: org.apache.kafka.connect.errors.ConnectException: Error while processing HTTP request with Url : http://localhost:8080/api/messages, Payload : one, Error Message : Exception while processing HTTP request for a batch of 1 records., Exception : org.apache.http.conn.HttpHostConnectException: Connect to localhost:8080 [localhost/127.0.0.1, localhost/0:0:0:0:0:0:0:1] failed: Connection refused (Connection refused)\n\tat io.confluent.connect.http.writer.HttpWriterImpl.handleException(HttpWriterImpl.java:349)\n\tat io.confluent.connect.http.writer.HttpWriterImpl.sendBatch(HttpWriterImpl.java:224)\n\tat io.confluent.connect.http.writer.HttpWriterImpl.write(HttpWriterImpl.java:149)\n\tat io.confluent.connect.http.HttpSinkTask.put(HttpSinkTask.java:70)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:546)\n\t... 10 more\nCaused by: Error while processing HTTP request with Url : http://localhost:8080/api/messages, Payload : one, Error Message : Exception while processing HTTP request for a batch of 1 records., Exception : org.apache.http.conn.HttpHostConnectException: Connect to localhost:8080 [localhost/127.0.0.1, localhost/0:0:0:0:0:0:0:1] failed: Connection refused (Connection refused)\n\tat io.confluent.connect.http.writer.HttpWriterImpl.executeBatchRequest(HttpWriterImpl.java:287)\n\tat io.confluent.connect.http.writer.HttpWriterImpl.executeRequestWithBackOff(HttpWriterImpl.java:234)\n\tat io.confluent.connect.http.writer.HttpWriterImpl.sendBatch(HttpWriterImpl.java:222)\n\t... 13 more\n"
}
],
"type": "sink"
}
You can run the following command to make the output easier to read:
echo -e $(curl localhost:8083/connectors/<connector-name>/status | jq .tasks[].trace)
For example:
echo -e $(curl localhost:8083/connectors/http/status | jq .tasks[].trace)
Example output:
"org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:568)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:326)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:228)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:196)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:184)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.connect.errors.ConnectException: Error while processing HTTP request with Url : http://localhost:8080/api/messages, Payload : one, Error Message : Exception while processing HTTP request for a batch of 1 records., Exception : org.apache.http.conn.HttpHostConnectException: Connect to localhost:8080 [localhost/127.0.0.1, localhost/0:0:0:0:0:0:0:1] failed: Connection refused (Connection refused)
at io.confluent.connect.http.writer.HttpWriterImpl.handleException(HttpWriterImpl.java:349)
at io.confluent.connect.http.writer.HttpWriterImpl.sendBatch(HttpWriterImpl.java:224)
at io.confluent.connect.http.writer.HttpWriterImpl.write(HttpWriterImpl.java:149)
at io.confluent.connect.http.HttpSinkTask.put(HttpSinkTask.java:70)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:546)
... 10 more
Caused by: Error while processing HTTP request with Url : http://localhost:8080/api/messages, Payload : one, Error Message : Exception while processing HTTP request for a batch of 1 records., Exception : org.apache.http.conn.HttpHostConnectException: Connect to localhost:8080 [localhost/127.0.0.1, localhost/0:0:0:0:0:0:0:1] failed: Connection refused (Connection refused)
at io.confluent.connect.http.writer.HttpWriterImpl.executeBatchRequest(HttpWriterImpl.java:287)
at io.confluent.connect.http.writer.HttpWriterImpl.executeRequestWithBackOff(HttpWriterImpl.java:234)
at io.confluent.connect.http.writer.HttpWriterImpl.sendBatch(HttpWriterImpl.java:222)
... 13 more