Troubleshoot Self-Managed Kafka Connect for Confluent Platform

Use this page to troubleshoot self-managed Kafka Connect. For the procedures outlined in this page, the assumption is that your Connect worker and source connector are running, but Kafka Connect is not ingesting any data.

Get the connector and task status

One way to troubleshoot self-managed Kafka Connect is to retrieve the connector and task status. If Kafka Connect is not ingesting any data, you can check whether one or more of the connector’s tasks has failed. You can use the Connect API to gather more details:

  1. Get the status of the connector instance:

    curl -s "http://localhost:8083/connectors/idbc-sink/status" | \ jq '.connector.state'
    

    You should see output similar to:

    "RUNNING"
    
  2. Get the status of the connector’s tasks:

    curl -s "http://localhost:8083/connectors/jdbc-sink/status" | \ jq '.tasks[0l.state'
    

    If one of the connector’s tasks has failed, you should see output similar to:

    "FAILED"
    
  3. Use the following curl command to request a stack trace for the task and identify what caused the failure. Finally, pipe the results through jq. Note that the following curl command requests the stack trace for the first element in the tasks array.

    $curl -s "http://localhost: 8083/connectors/jdbc-sink/status" | jq '.tasks[0].trace' | sed 's/\In/In/g; s/\lt/It/g'
    

    You should see output similar to:

    "org.apache.kafka.connect.errors. ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
       at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkersinkTask.java:618)
       at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:334)
       at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:235)
       at org.apache.kafka. connect. runtime.WorkerSinkTask. execute(WorkerSinkTask. java: 204)
       at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:200)
       at org.apache.kaka.connect.runtime.WorkerTask.run(WorkerTask.java:255)
       at java.base/java.util.concurrent.Executors$RunnableAdapter.call (Executors. java:515)
       at java.base/java.util.concurrent.FutureTask. run (FutureTask. java: 264)
       at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor•java:1128)
       at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run (ThreadPoolExecutor. java:628)
       at java.base/java.lang.Thread.run (Thread. java :829)
    Caused by: org.apache.kaka.connect.errors.ConnectException: java.sql.SOLException: No suitable driver found for jdbc:mysql://localhost/demo
       at io.confluent.connect.jdbc.util.CachedConnectionProvider.getConnection(CachedConnectionProvider•java:59)
       at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:64)
       at 10.confluent.connect.jabc.sink.JabcsinkTask.putJabcsinkTask.java:84)
       at org.apache.Katka.connect.runtime.WorkersinkTask.deliverMessagesworkersinkTask.java:584)
    Caused by: java.sql.SQLException: No suitable driver found for jdbc:mysql://localhost/demo
       at java.sql/java.sql.DriverManager.getConnection(DriverManager•java:702)
       at java.sql/java.sql.DriverManager.getConnection(DriverManager•java:189)
       at io.confluent.connect.jdbc.dialect.GenericDatabaseDialect.getConnection(GenericDatabaseDialect.java:247)
       at io.confluent.connect.jdbc.util.CachedConnectionProvider.newConnection(CachedConnectionProvider.java:80)
       at io.confluent.connect.jdbc.util.CachedConnectionProvider-getConnection(CachedConnectionProvider•java:52)
       ...13 more
    "
    

    As you read through the previous stack trace, you should notice the following issues:

    • A Connect exception
    • A driver is missing

Use Kafka Connect Log4j logging

In addition to the stack trace, it is recommended that you read the log. The Log4j properties files controls what is logged, including the log message layout, and where log files are stored:

/etc/kafka/connect-logj.properties (default location)

The following are different ways to access the log, depending on how you are running Connect:

  • Confluent CLI

    If you are running the Confluent CLI locally, use the following command:

    confluent local services connect log
    
  • Docker

    If you are using Docker, it’s docker logs and the name of the container:

    docker-compose logs kafka-connect
    
  • If you are running vanilla Kafka Connect using Apache Kafka®, you can read the log files with cat or tail (the location varies by installation). For example:

    cat /var/log/kafka/connect.log
    

In Apache Kafka 2.3, connector contexts were added to logging with KIP-449, and they make the diagnostic process a lot easier.

How to identify the cause of the problem

In the following Connect worker log example, line 1 could be a symptom of a problem, but is not the cause of the problem itself. When you see this, you should look further in the Connect worker log (or stack trace) for your problem.

 [2022-07-19 23:57:28, 600] ERROR [jdbc-sink|task-0] WorkerSinkTask{id=jdbc-sink-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted org. apache.Kafka.connect. runtime. WorkerTask: 207 org.apache.kafka.connect.errors.ConnectException: Exiting WorkersinkTask due to unrecoveral
 seption.
     at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkT
     at org.apache.kafka. connect. runtime.WorkerSinkTask.poll (WorkerSinkTask. java:33. Symptom, not the cause at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.ja
     at org.apache.kaka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:204)
     at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:200)
     at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:255)
     at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
     at java.base/java.util.concurrent.FutureTask.run (FutureTask. java: 264)
     at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor•java: 1128)
     at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run (ThreadPoolExecutor•java:628)
     at lava base /iava lang Thread run (Thread iava:829)
 Caused by: org.apache.kafka.connect.errors. ConnectException: java.sql.SQLException: No suitable driver found for jdbc:mysql://localhost/demo
     at io. confluent. connect. jdbc.util. CachedConnectionProvider. getConnection (CachedConne ConProvider.java:59)
     at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:64)
     at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:84)
     at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkT
 Caused by: java.sql.SOLException: No suitable driver found for idbc:mysql://localhost/demo
     at Java.sqt/Java.sqL.Driver vanager  getconnect Ton (orvervanager  Javas 702)
     at java.sql/java.sql. DriverManager•getConnection(DriverManager•java: 189)
     at io.confluent.connect.jdbc.dialect.GenericDatabaseDialect.getConnection(GenericDatabaseDialect.java:247)
     at io.confluent.connect.jdbc.util.CachedConnectionProvider.newConnection(CachedConnectionProvider.java:80)
     at io.confluent.connect.jdbc.util.CachedConnectionProvider•getConnection(CachedConnectionProvider•java:52)
     ...13 more

If you look further in the log for exceptions, you can identify the source of the problem. In this case, you should notice that the highlighted lines (lines 13 and 18) are potential causes of the problem. Once you have identified the source of the problem, you’ll need to do some research. Research might lead you to find that the JDBC connector documentation indicates that you must install the MySQL JDBC driver on the connect worker machine when a MySQL database is part of the pipeline.

In the end, if your research isn’t fruitful, you can post your problem in the Confluent Community Forum.

Configure the log dynamically

Dynamic log configuration allows you to change the level of logging detail without having to restart the Connect worker.

For example, perhaps there is a particular connector that you’d like to log at TRACE level to try and troubleshoot. If you set everything to TRACE, it would be overwhelming. Using dynamic log configuration, you can conveniently do so at runtime using REST without restarting Kafka Connect, and targeting the specific logger of interest.

  1. List the current logger configuration:

    $curl -s http://localhost:8083/admin/loggers/ | jq
    {
      "org.apache.kafka.connect.runtime.rest":{
        "level": "WARN"
      },
      "org.reflections": {
        "level": "ERROR"
      },
      "root": {
        "level": "INFO"
      }
    }
    
  2. Modify the logger configuration:

    curl -s -X PUT -H "Content-Type:application/json" \
        http://localhost:8083/admin/loggers/io.confluent.connect.jdbc
        -d '{"level": "TRACE"}'