MQTT Source Connector for Confluent Platform¶
The Kafka Connect MQTT connector is used to integrate with existing MQTT servers. The connector is compatible with Confluent Platform, version 4.1 and later. Prior versions do not work with this connector. The Kafka Connect MQTT Source Connector connects to a MQTT broker and subscribes to the specified topics. SSL is supported. For information on how to create SSL keys and certificates, see Creating SSL Keys and Certificates. For the relevant configuration properties, see the MQTT Source Connector configuration reference.
Note
MQTT client version: The connector uses paho mqttv3 client
.
Features¶
At least once delivery¶
For at-least-once semantics, some restrictions apply. To support this feature,
you must run the connector with mqtt.clean.session.enabled
set to false
.
Under this mode, the connector will assign each task a unique client ID, which
will be used for a persistent connection.
The topics are distributed across tasks. Confluent recommends you do not change the topic assignments–that is, do not alter the task count, or change the topic listing once the connector is up and running. This can potentially lead to data loss as the connector will be restarted with a different topic to task assignments, and hence, a different client ID.
Multiple tasks¶
The MQTT Source Connector supports running one or more tasks. You can specify
the number of tasks in the tasks.max
configuration parameter. This can lead
to huge performance gains when multiple files need to be parsed.
License¶
You can use this connector for a 30-day trial period without a license key.
After 30 days, you must purchase a connector subscription which includes Confluent enterprise license keys to subscribers, along with enterprise-level support for Confluent Platform and your connectors. If you are a subscriber, you can contact Confluent Support at support@confluent.io for more information.
For license properties, see Confluent Platform license and License topic configuration, and for information about the license topic, see License topic configuration .
Configuration Properties¶
For a complete list of configuration properties for this source connector, see Configuration Reference for MQTT Source Connector for Confluent Platform.
For an example of how to get Kafka Connect connected to Confluent Cloud, see Connect Self-Managed Kafka Connect to Confluent Cloud.
Install the MQTT Source Connector¶
You can install this connector by using the confluent connect plugin install command, or by manually downloading the ZIP file.
Prerequisites¶
- You must install the connector on every machine where Connect will run.
- Java 8+. Note that Java 8 is deprecated in versions 7.2 and later of Confluent Platform. For more details, view Java compatibility with Confluent Platform by version.
- Confluent CLI
- An installation of the latest (
latest
) connector version.
Install the connector using the Confluent CLI¶
To install the latest
connector version, navigate to your Confluent Platform
installation directory and run the following command:
confluent connect plugin install confluentinc/kafka-connect-mqtt:latest
You can install a specific version by replacing latest
with a version number
as shown in the following example:
confluent connect plugin install confluentinc/kafka-connect-mqtt:1.7.0
Install the connector manually¶
Download and extract the ZIP file for your connector and then follow the manual connector installation instructions.
Usage Notes¶
If you need to change the topic take a look at the RegexRouter transformation which can be used to change the topic name before it is written to Apache Kafka®.
The output of this connector is an envelope with all of the properties of the incoming message. The body of the message is written to the value as bytes. The key is the topic the message was written to.
Quick start¶
The MQTT Source connector is used to receive messages from MQTT brokers, and write them into an Apache Kafka® topic. This quick start demonstrates how to configure a MQTT Source connector for the Eclipse Mosquitto broker.
Set up Mosquitto broker and clients¶
In this step, an Eclipse Mosquitto broker is set up by using Docker images.
Install the Mosquitto utilities for your operating system.
Create a config file named
mosquitto.conf
for the broker with the following contents.persistence false log_dest stdout allow_anonymous true connection_messages true
Tip
To start a broker that requires clients to authenticate using a username and password, change
allow_anonymous
tofalse
and addpassword_file /etc/mosquitto/passwd
. Usemosquitto_passwd -c password username
to store passwords in this file.Start the Docker container.
docker run --name mosquitto -p 1881:1881 -v `pwd`/mosquitto.conf:/mosquitto/config/mosquitto.conf eclipse-mosquitto
Start the Docker container using this command.
docker run --name mosquitto -p 1881:1881 -v `pwd`/mosquitto.conf:/mosquitto/config/mosquitto.conf -v `pwd`/password:/etc/mosquitto/passwd eclipse-mosquitto
Verify that the broker is running by publishing a message to it.
mosquitto_pub -h localhost -p 1881 -t my-mqtt-topic -m "sample-msg-1"
Tip
If a username and password are needed to connect to the Mosquitto broker, use the
-u
and-P
arguments of themosquitt_pub
to pass in the correct credentials.To subscribe to all messages on a Mosquitto topic, use the
mosquitto_sub
command.mosquitto_sub -h localhost -p 1881 -t my-mqtt-topic
Install the MQTT Connector Plugin¶
Navigate to your Confluent Platform installation directory and run this command to install the latest version of the MQTT connector.
confluent connect plugin install confluentinc/kafka-connect-mqtt:latest
Tip
To install a specific version of connector, replace
latest
with version number (for example,1.1.0-preview
) in the command above.Restart Connect to pick up the new plugin.
confluent local services connect stop && confluent local services connect start
Check if the MQTT plugin has been installed correctly and picked up by the plugin loader:
curl -sS localhost:8083/connector-plugins | jq -c '.[] | select( .class | contains("Mqtt") )'
This command should print the new MQTT connector plugins available to the worker.
{"class": "io.confluent.connect.mqtt.MqttSinkConnector","type": "sink","version": "1.1.0-preview"} {"class": "io.confluent.connect.mqtt.MqttSourceConnector","type": "source","version": "1.1.0-preview"}
Configure the connector¶
Create a configuration file named
mqtt-source-config.json
with the following contents.{ "name": "source-mqtt", "config": { "connector.class": "io.confluent.connect.mqtt.MqttSourceConnector", "tasks.max": "1", "mqtt.server.uri": "tcp://127.0.0.1:1881", "mqtt.topics":"hello", "kafka.topic":"mqtt-source-1", "mqtt.qos": "2", "mqtt.username": " Omit if Mqtt broker supports anonymous mode ", "mqtt.password": " Omit if Mqtt broker supports anonymous mode ", "confluent.topic.bootstrap.servers": "kafka:9092", "confluent.topic.replication.factor": "1", "confluent.license": " Omit to enable trial mode " } }
Run the Confluent CLI confluent local services connect connector load command to start the MQTT source connector.
Caution
You must include a double dash (
--
) between the topic name and your flag. For more information, see this post.confluent local services connect connector load source-mqtt --config mqtt-source-config.json
Your output should resemble:
{ "name": "source-mqtt", "config": { "connector.class": "io.confluent.connect.mqtt.MqttSourceConnector", "tasks.max": "1", "mqtt.server.uri": "tcp://127.0.0.1:1881", "mqtt.topics":"hello", "kafka.topic":"mqtt-source-1", "mqtt.qos": "2", "mqtt.username": " ### ", "mqtt.password": " [hidden] ", "confluent.topic.bootstrap.servers": "kafka:9092", "confluent.license": " ### ", "confluent.topic.replication.factor": "1" } "tasks": [], "type": null }
Examples¶
Property-based example¶
This configuration is used typically along with standalone workers.
name=MqttSourceConnector1
connector.class=io.confluent.connect.mqtt.MqttSourceConnector
tasks.max=1
mqtt.server.uri=< Required Configuration >
mqtt.topics=< Required Configuration >
REST-based example¶
This configuration is used typically along with distributed workers.
Write the following json to connector.json
, configure all of the required values, and use the command below to
post the configuration to one the distributed connect worker(s). Check here for more information about the
Kafka Connect REST API
Connect Distributed REST example:
{
"config" : {
"name" : "MqttSourceConnector1",
"connector.class" : "io.confluent.connect.mqtt.MqttSourceConnector",
"tasks.max" : "1",
"mqtt.server.uri" : "< Required Configuration >",
"mqtt.topics" : "< Required Configuration >"
}
}
Use curl to post the configuration to one of the Kafka Connect Workers. Change http://localhost:8083/ the endpoint of one of your Kafka Connect worker(s).
Create a new connector:
curl -s -X POST -H 'Content-Type: application/json' --data @connector.json http://localhost:8083/connectors
Update an existing connector:
curl -s -X PUT -H 'Content-Type: application/json' --data @connector.json http://localhost:8083/connectors/MqttSourceConnector1/config