Troubleshoot Self-Managed Kafka Connect for Confluent Platform¶
Use this page to troubleshoot self-managed Kafka Connect. For the procedures outlined in this page, the assumption is that your Connect worker and source connector are running, but Kafka Connect is not ingesting any data.
Get the connector and task status¶
One way to troubleshoot self-managed Kafka Connect is to retrieve the connector and task status. If Kafka Connect is not ingesting any data, you can check whether one or more of the connector’s tasks has failed. You can use the Connect API to gather more details:
Get the status of the connector instance:
curl -s "http://localhost:8083/connectors/idbc-sink/status" | \ jq '.connector.state'
You should see output similar to:
"RUNNING"
Get the status of the connector’s tasks:
curl -s "http://localhost:8083/connectors/jdbc-sink/status" | \ jq '.tasks[0l.state'
If one of the connector’s tasks has failed, you should see output similar to:
"FAILED"
Use the following
curl
command to request a stack trace for the task and identify what caused the failure. Finally, pipe the results throughjq
. Note that the followingcurl
command requests the stack trace for the first element in the tasks array.$curl -s "http://localhost: 8083/connectors/jdbc-sink/status" | jq '.tasks[0].trace' | sed 's/\In/In/g; s/\lt/It/g'
You should see output similar to:
"org.apache.kafka.connect.errors. ConnectException: Exiting WorkerSinkTask due to unrecoverable exception. at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkersinkTask.java:618) at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:334) at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:235) at org.apache.kafka. connect. runtime.WorkerSinkTask. execute(WorkerSinkTask. java: 204) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:200) at org.apache.kaka.connect.runtime.WorkerTask.run(WorkerTask.java:255) at java.base/java.util.concurrent.Executors$RunnableAdapter.call (Executors. java:515) at java.base/java.util.concurrent.FutureTask. run (FutureTask. java: 264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor•java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run (ThreadPoolExecutor. java:628) at java.base/java.lang.Thread.run (Thread. java :829) Caused by: org.apache.kaka.connect.errors.ConnectException: java.sql.SOLException: No suitable driver found for jdbc:mysql://localhost/demo at io.confluent.connect.jdbc.util.CachedConnectionProvider.getConnection(CachedConnectionProvider•java:59) at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:64) at 10.confluent.connect.jabc.sink.JabcsinkTask.putJabcsinkTask.java:84) at org.apache.Katka.connect.runtime.WorkersinkTask.deliverMessagesworkersinkTask.java:584) Caused by: java.sql.SQLException: No suitable driver found for jdbc:mysql://localhost/demo at java.sql/java.sql.DriverManager.getConnection(DriverManager•java:702) at java.sql/java.sql.DriverManager.getConnection(DriverManager•java:189) at io.confluent.connect.jdbc.dialect.GenericDatabaseDialect.getConnection(GenericDatabaseDialect.java:247) at io.confluent.connect.jdbc.util.CachedConnectionProvider.newConnection(CachedConnectionProvider.java:80) at io.confluent.connect.jdbc.util.CachedConnectionProvider-getConnection(CachedConnectionProvider•java:52) ...13 more "
As you read through the previous stack trace, you should notice the following issues:
- A Connect exception
- A driver is missing
Use Kafka Connect Log4j logging¶
In addition to the stack trace, it is recommended that you read the log. The Log4j properties files controls what is logged, including the log message layout, and where log files are stored:
/etc/kafka/connect-logj.properties (default location)
The following are different ways to access the log, depending on how you are running Connect:
Confluent CLI
If you are running the Confluent CLI locally, use the following command:
confluent local services connect log
Docker
If you are using Docker, it’s docker logs and the name of the container:
docker-compose logs kafka-connect
If you are running vanilla Kafka Connect using Apache Kafka®, you can read the log files with
cat
ortail
(the location varies by installation). For example:cat /var/log/kafka/connect.log
In Apache Kafka 2.3, connector contexts were added to logging with KIP-449, and they make the diagnostic process a lot easier.
How to identify the cause of the problem¶
In the following Connect worker log example, line 1 could be a symptom of a problem, but is not the cause of the problem itself. When you see this, you should look further in the Connect worker log (or stack trace) for your problem.
[2022-07-19 23:57:28, 600] ERROR [jdbc-sink|task-0] WorkerSinkTask{id=jdbc-sink-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted org. apache.Kafka.connect. runtime. WorkerTask: 207 org.apache.kafka.connect.errors.ConnectException: Exiting WorkersinkTask due to unrecoveral
seption.
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkT
at org.apache.kafka. connect. runtime.WorkerSinkTask.poll (WorkerSinkTask. java:33. Symptom, not the cause at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.ja
at org.apache.kaka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:204)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:200)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:255)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run (FutureTask. java: 264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor•java: 1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run (ThreadPoolExecutor•java:628)
at lava base /iava lang Thread run (Thread iava:829)
Caused by: org.apache.kafka.connect.errors. ConnectException: java.sql.SQLException: No suitable driver found for jdbc:mysql://localhost/demo
at io. confluent. connect. jdbc.util. CachedConnectionProvider. getConnection (CachedConne ConProvider.java:59)
at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:64)
at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:84)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkT
Caused by: java.sql.SOLException: No suitable driver found for idbc:mysql://localhost/demo
at Java.sqt/Java.sqL.Driver vanager • getconnect Ton (orvervanager • Javas 702)
at java.sql/java.sql. DriverManager•getConnection(DriverManager•java: 189)
at io.confluent.connect.jdbc.dialect.GenericDatabaseDialect.getConnection(GenericDatabaseDialect.java:247)
at io.confluent.connect.jdbc.util.CachedConnectionProvider.newConnection(CachedConnectionProvider.java:80)
at io.confluent.connect.jdbc.util.CachedConnectionProvider•getConnection(CachedConnectionProvider•java:52)
...13 more
If you look further in the log for exceptions, you can identify the source of the problem. In this case, you should notice that the highlighted lines (lines 13 and 18) are potential causes of the problem. Once you have identified the source of the problem, you’ll need to do some research. Research might lead you to find that the JDBC connector documentation indicates that you must install the MySQL JDBC driver on the connect worker machine when a MySQL database is part of the pipeline.
In the end, if your research isn’t fruitful, you can post your problem in the Confluent Community Forum.
Configure the log dynamically¶
Dynamic log configuration allows you to change the level of logging detail without having to restart the Connect worker.
For example, perhaps there is a particular connector that you’d like to log at TRACE level to try and troubleshoot. If you set everything to TRACE, it would be overwhelming. Using dynamic log configuration, you can conveniently do so at runtime using REST without restarting Kafka Connect, and targeting the specific logger of interest.
List the current logger configuration:
$curl -s http://localhost:8083/admin/loggers/ | jq { "org.apache.kafka.connect.runtime.rest":{ "level": "WARN" }, "org.reflections": { "level": "ERROR" }, "root": { "level": "INFO" } }
Modify the logger configuration:
curl -s -X PUT -H "Content-Type:application/json" \ http://localhost:8083/admin/loggers/io.confluent.connect.jdbc -d '{"level": "TRACE"}'