MQTT Connectors Advanced Debugging

Trace Logging

Both the sink and source connectors have trace logs which show in greater detail what records are passing through them. To enable these logs, add the following lines to your log4j.properties and restart the Connect worker:

log4j.logger.io.confluent.connect.mqtt.MqttSourceTask=TRACE
log4j.logger.io.confluent.connect.mqtt.MqttSinkTask=TRACE

For Confluent packages, the default log4j properties file for Connect resides at etc/kafka/connect-log4j.properties, and adding the above lines makes the file resemble the following:

log4j.rootLogger=INFO, stdout

log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c:%L)%n

log4j.logger.org.apache.zookeeper=ERROR
log4j.logger.org.I0Itec.zkclient=ERROR
log4j.logger.org.reflections=ERROR

log4j.logger.io.confluent.connect.mqtt.MqttSourceTask=TRACE
log4j.logger.io.confluent.connect.mqtt.MqttSinkTask=TRACE

Mosquitto CLI tools

Mosquitto comes with CLI tools that make debugging broker issues relatively easier. This includes the mosquitto_sub and mosquitto_pub utility to publish and subscribe to MQTT brokers.

To publish a message to the broker running on localhost:1881 to the topic my-mqtt-topic, use the command:

mosquitto_pub -h localhost -p 1881 -t my-mqtt-topic -m "sample-msg-1"

To run the same command against a broker that requires a username and password, try:

mosquitto_pub -h localhost -p 1881 -t my-mqtt-topic -m "sample-msg-1" -u username -P password

To subscribe to all changes on a topic, use the mosquitto_sub command:

mosquitto_sub -h localhost -p 1881 -t my-mqtt-topic -u test -P test

Unable to Connect to the MQTT Broker

To check if the tasks are unable to connect to the MQTT broker, look at the Connect logs for the following exception:

org.apache.kafka.connect.errors.ConnectException: Unable to connect to server (32103) - java.net.ConnectException: Connection refused (Connection refused)
    at io.confluent.connect.mqtt.MqttSourceTask.start(MqttSourceTask.java:59)
    at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:164)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: Unable to connect to server (32103) - java.net.ConnectException: Connection refused (Connection refused)
    at org.eclipse.paho.client.mqttv3.internal.TCPNetworkModule.start(TCPNetworkModule.java:94)
    at org.eclipse.paho.client.mqttv3.internal.ClientComms$ConnectBG.run(ClientComms.java:701)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
   ... 3 more
Caused by: java.net.ConnectException: Connection refused (Connection refused)
    at java.net.PlainSocketImpl.socketConnect(Native Method)
    at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
    at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
    at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
    at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
    at java.net.Socket.connect(Socket.java:589)
    at org.eclipse.paho.client.mqttv3.internal.TCPNetworkModule.start(TCPNetworkModule.java:84)
    ... 8 more

To debug this, try to find the configs of the connector, and run mosquitto_sub against the broker (specified in the mqtt.server.uri property). If the endpoint is incorrect or the broker is inaccessible, we should see the following error message on running mosquitto_sub -h 127.0.0.1 -p 1881 -t my-mqtt-topic.

Error: Connection refused

Invalid Username/Password

To check if tasks are unable to connect to the MQTT broker due to authentication issues, look at the Connect logs for the following exception:

[2018-10-22 14:11:16,839] ERROR WorkerSourceTask{id=mqtt-anon-source-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:172)
org.apache.kafka.connect.errors.ConnectException: Not authorized to connect (5)
    at io.confluent.connect.mqtt.MqttSourceTask.start(MqttSourceTask.java:59)
    at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:164)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: Not authorized to connect (5)
    at org.eclipse.paho.client.mqttv3.internal.ExceptionHelper.createMqttException(ExceptionHelper.java:28)
    at org.eclipse.paho.client.mqttv3.internal.ClientState.notifyReceivedAck(ClientState.java:988)
    at org.eclipse.paho.client.mqttv3.internal.CommsReceiver.run(CommsReceiver.java:145)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
    ... 3 more

You can verify the credentials using mosquitto_sub -h 127.0.0.1 -p 32771 -t my-mqtt-topic -u username -P password, which returns the following message on error:

Connection Refused: not authorised.