MQTT Sink Connector for Confluent Platform¶
The Kafka Connect MQTT connector is used to integrate with existing MQTT servers. The connector is compatible with Confluent Platform, version 4.1 and later. Prior versions do not work with this connector. The Kafka Connect MQTT Sink Connector connects to an MQTT broker and publishes data to an MQTT topic. SSL is supported. For information on how to create SSL keys and certificates see Creating SSL Keys and Certificates. For the relevant configuration properties, see the MQTT Sink Connector Configuration Properties.
Features¶
At least once delivery¶
This connector guarantees that records are delivered at least once from the Kafka topic.
Dead Letter Queue¶
This connector supports the Dead Letter Queue (DLQ) functionality. For information about accessing and using the DLQ, see Confluent Platform Dead Letter Queue.
Multiple tasks¶
The MQTT Sink Connector supports running one or more tasks. You can
specify the number of tasks in the tasks.max
configuration parameter. This
can lead to huge performance gains when multiple files need to be parsed.
Schemas¶
The connector supports Avro, JSON Schema, and Protobuf input data formats. Schema Registry must be enabled to use a Schema Registry-based format. Note that the connector only supports bytes and string schemas. It does not support structs. If you want to have struct type schemas, you can store the struct data as bytes and select bytes in the connector.
License¶
You can use this connector for a 30-day trial period without a license key.
After 30 days, you must purchase a connector subscription which includes Confluent enterprise license keys to subscribers, along with enterprise-level support for Confluent Platform and your connectors. If you are a subscriber, you can contact Confluent Support at support@confluent.io for more information.
For license properties, see Confluent Platform license and License topic configuration, and for information about the license topic, see License topic configuration .
Configuration Properties¶
For a complete list of configuration properties for this connector, see MQTT Sink Connector Configuration Properties.
Note
For an example of how to get Kafka Connect connected to Confluent Cloud, see Distributed Cluster.
Install the MQTT Sink Connector¶
You can install this connector by using the Confluent Hub client installation instructions or by manually downloading the ZIP file.
Prerequisites¶
- You must install the connector on every machine where Connect will run.
- If you want to install the connector using Confluent Hub, you must install the Confluent Hub Client. This is installed by default with Confluent Enterprise.
Install the connector using Confluent Hub¶
To install the latest
connector version using Confluent Hub Client, navigate to your Confluent Platform installation directory
and run the following command:
confluent-hub install confluentinc/kafka-connect-mqtt:latest
You can install a specific version by replacing latest
with a version number
as shown in the following example:
confluent-hub install confluentinc/kafka-connect-mqtt:1.7.0
Install the connector manually¶
Download and extract the ZIP file for your connector and then follow the manual connector installation instructions.
Usage Notes¶
If you want to change the topic take a look at the RegexRouter transformation which can be used to change the topic name it is sent to the MQTT Server.
This connector publishes using the Apache Kafka® topic name. If you need to publish to a different topic name use a transformation to change the topic name.
Examples¶
Property-based example¶
This configuration is used typically along with standalone workers.
name=MqttSinkConnector1
connector.class=io.confluent.connect.mqtt.MqttSinkConnector
tasks.max=1
topics=< Required Configuration >
mqtt.server.uri=< Required Configuration >
REST-based example¶
This configuration is used typically along with distributed
workers. Write the following json to
connector.json
, configure all of the required values, and use the command
below to post the configuration to one the distributed connect worker(s). Check
here for more information about the Kafka Connect REST
API
Connect Distributed REST example:
{
"config" : {
"name" : "MqttSinkConnector1",
"connector.class" : "io.confluent.connect.mqtt.MqttSinkConnector",
"tasks.max" : "1",
"topics" : "< Required Configuration >",
"mqtt.server.uri" : "< Required Configuration >"
}
}
Use curl to post the configuration to one of the Kafka Connect Workers. Change http://localhost:8083/ the endpoint of one of your Kafka Connect worker(s).
Create a new connector:
curl -s -X POST -H 'Content-Type: application/json' --data @connector.json http://localhost:8083/connectors
Update an existing connector:
curl -s -X PUT -H 'Content-Type: application/json' --data @connector.json http://localhost:8083/connectors/MqttSinkConnector1/config