Important
You are viewing documentation for an older version of Confluent Platform. For the latest, click here.
Example: Configure MQTT Sink Connector for Eclipse Mosquitto Broker¶
The MQTT sink connector is used to publish records in an Apache Kafka® topic to MQTT brokers. This example demonstrates how to configure an MQTT sink connector for the Eclipse Mosquitto broker.
Set Up Mosquitto Broker and Clients¶
In this step, an Eclipse Mosquitto broker is set up by using Docker images.
Install the Mosquitto utilities for your operating system.
Create a config file named
mosquitto.conf
for the broker with the following contents.persistence false log_dest stdout allow_anonymous true connection_messages true
Tip
To start a broker that requires clients to authenticate using a username and password, change
allow_anonymous
tofalse
and addpassword_file /etc/mosquitto/passwd
. Usemosquitto_passwd -c password username
to store passwords in this file.Start the Docker container.
docker run --name mosquitto -p 1881:1881 -v `pwd`/mosquitto.conf:/mosquitto/config/mosquitto.conf eclipse-mosquitto
Start the Docker container using this command.
docker run --name mosquitto -p 1881:1881 -v `pwd`/mosquitto.conf:/mosquitto/config/mosquitto.conf -v `pwd`/password:/etc/mosquitto/passwd eclipse-mosquitto
Verify that the broker is running by publishing a message to it.
mosquitto_pub -h localhost -p 1881 -t my-mqtt-topic -m "sample-msg-1"
Tip
If a username and password are needed to connect to the Mosquitto broker, use the
-u
and-P
arguments of themosquitt_pub
to pass in the correct credentials.To subscribe to all messages on a Mosquitto topic, use the
mosquitto_sub
command.mosquitto_sub -h localhost -p 1881 -t my-mqtt-topic
Install the MQTT Connector Plugin¶
- Prerequisites
- Confluent Platform
- Confluent CLI (requires separate installation)
Navigate to your Confluent Platform installation directory and run this command to install the latest version of the MQTT connector.
confluent-hub install confluentinc/kafka-connect-mqtt:latest
Tip
To install a specific version of connector, replace
latest
with version number (for example,1.1.0-preview
) in the command above.Restart Connect to pick up the new plugin.
confluent local stop connect && confluent local start connect
Check if the MQTT plugin has been installed correctly and picked up by the plugin loader:
curl -sS localhost:8083/connector-plugins | jq -c '.[] | select( .class | contains("Mqtt") )'
This command should print the new MQTT connector plugins available to the worker.
{"class": "io.confluent.connect.mqtt.MqttSinkConnector","type": "sink","version": "1.1.0-preview"} {"class": "io.confluent.connect.mqtt.MqttSourceConnector","type": "source","version": "1.1.0-preview"}
Configure Sink Connector¶
- Prerequisites
- Confluent Platform
- Confluent CLI (requires separate installation)
Create a configuration file named
mqtt-sink-config.json
with the following contents.{ "name": "mqtt-sink", "config": { "connector.class": "io.confluent.connect.mqtt.MqttSinkConnector", "tasks.max": "1", "mqtt.server.uri": "tcp://127.0.0.1:1881", "topics":"kafka-topic-name", "mqtt.qos": "2", "mqtt.username": " Omit if MQTT broker supports anonymous mode ", "mqtt.password": " Omit if MQTT broker supports anonymous mode ", "key.converter": "org.apache.kafka.connect.storage.StringConverter", "value.converter": "org.apache.kafka.connect.storage.StringConverter", "confluent.topic.bootstrap.servers": "kafka:9092", "confluent.license": " Omit to enable trial mode ", "confluent.topic.replication.factor": "1" } }
Run this command to start the MQTT sink connector using the Confluent CLI confluent local load command.
Caution
You must include a double dash (
--
) between the topic name and your flag. For more information, see this post.confluent local load mqtt-sink -- -d mqtt-sink-config.json
Your output should resemble:
{ "name": "mqtt-sink", "config": { "connector.class": "io.confluent.connect.mqtt.MqttSinkConnector", "tasks.max": "1", "mqtt.server.uri": "tcp://127.0.0.1:1881", "topics":"kafka-topic-name", "mqtt.qos": "2", "mqtt.username": " ##### ", "mqtt.password": " [hidden] ", "key.converter": "org.apache.kafka.connect.storage.StringConverter", "value.converter": "org.apache.kafka.connect.storage.StringConverter", "confluent.topic.bootstrap.servers": "kafka:9092", "confluent.license": " [hidden] ", "confluent.topic.replication.factor": "1" } "tasks": [], "type": null }