Advanced Debugging for MQTT Source and Sink Connector for Confluent Platform¶
Trace Logging¶
Both the sink and source connectors have trace logs which show in greater detail what records are passing through them. To enable these logs, add the following lines to your log4j.properties
and restart the Connect worker:
log4j.logger.io.confluent.connect.mqtt.MqttSourceTask=TRACE
log4j.logger.io.confluent.connect.mqtt.MqttSinkTask=TRACE
For Confluent packages, the default log4j properties file for Connect resides at etc/kafka/connect-log4j.properties
, and adding the above lines makes the file resemble the following:
log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c:%L)%n
log4j.logger.org.apache.zookeeper=ERROR
log4j.logger.org.I0Itec.zkclient=ERROR
log4j.logger.org.reflections=ERROR
log4j.logger.io.confluent.connect.mqtt.MqttSourceTask=TRACE
log4j.logger.io.confluent.connect.mqtt.MqttSinkTask=TRACE
Mosquitto CLI tools¶
Mosquitto comes with CLI tools that make debugging broker issues relatively easier. This includes the mosquitto_sub
and mosquitto_pub
utility to publish and subscribe to MQTT brokers.
To publish a message to the broker running on localhost:1881
to the topic my-mqtt-topic
, use the command:
mosquitto_pub -h localhost -p 1881 -t my-mqtt-topic -m "sample-msg-1"
To run the same command against a broker that requires a username and password, try:
mosquitto_pub -h localhost -p 1881 -t my-mqtt-topic -m "sample-msg-1" -u username -P password
To subscribe to all changes on a topic, use the mosquitto_sub
command:
mosquitto_sub -h localhost -p 1881 -t my-mqtt-topic -u test -P test
Unable to Connect to the MQTT Broker¶
To check if the tasks are unable to connect to the MQTT broker, look at the Connect logs for the following exception:
org.apache.kafka.connect.errors.ConnectException: Unable to connect to server (32103) - java.net.ConnectException: Connection refused (Connection refused)
at io.confluent.connect.mqtt.MqttSourceTask.start(MqttSourceTask.java:59)
at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:164)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: Unable to connect to server (32103) - java.net.ConnectException: Connection refused (Connection refused)
at org.eclipse.paho.client.mqttv3.internal.TCPNetworkModule.start(TCPNetworkModule.java:94)
at org.eclipse.paho.client.mqttv3.internal.ClientComms$ConnectBG.run(ClientComms.java:701)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
... 3 more
Caused by: java.net.ConnectException: Connection refused (Connection refused)
at java.net.PlainSocketImpl.socketConnect(Native Method)
at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
at java.net.Socket.connect(Socket.java:589)
at org.eclipse.paho.client.mqttv3.internal.TCPNetworkModule.start(TCPNetworkModule.java:84)
... 8 more
To debug this, try to find the configs of the connector, and run mosquitto_sub
against the broker (specified in the mqtt.server.uri
property). If the endpoint is incorrect or the broker is inaccessible, we should see the following error message on running mosquitto_sub -h 127.0.0.1 -p 1881 -t my-mqtt-topic
.
Error: Connection refused
Invalid Username/Password¶
To check if tasks are unable to connect to the MQTT broker due to authentication issues, look at the Connect logs for the following exception:
[2018-10-22 14:11:16,839] ERROR WorkerSourceTask{id=mqtt-anon-source-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:172)
org.apache.kafka.connect.errors.ConnectException: Not authorized to connect (5)
at io.confluent.connect.mqtt.MqttSourceTask.start(MqttSourceTask.java:59)
at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:164)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: Not authorized to connect (5)
at org.eclipse.paho.client.mqttv3.internal.ExceptionHelper.createMqttException(ExceptionHelper.java:28)
at org.eclipse.paho.client.mqttv3.internal.ClientState.notifyReceivedAck(ClientState.java:988)
at org.eclipse.paho.client.mqttv3.internal.CommsReceiver.run(CommsReceiver.java:145)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
... 3 more
You can verify the credentials using mosquitto_sub -h 127.0.0.1 -p 32771 -t my-mqtt-topic -u username -P password
, which returns the following message on error:
Connection Refused: not authorised.