MQTT Source Connector for Confluent Platform

Note

If you are using Confluent Cloud, see MQTT Source Connector for Confluent Cloud for the cloud Quick Start.

The Kafka Connect MQTT connector is used to integrate with existing MQTT servers. The connector is compatible with Confluent Platform, version 4.1 and later. Prior versions do not work with this connector. The Kafka Connect MQTT Source Connector connects to a MQTT broker and subscribes to the specified topics. SSL is supported. For information on how to create SSL keys and certificates, see Creating SSL Keys and Certificates. For the relevant configuration properties, see the MQTT Source Connector configuration reference.

Features

Multiple tasks

The MQTT Source Connector supports running one or more tasks. You can specify the number of tasks in the tasks.max configuration parameter. This can lead to huge performance gains when multiple files need to be parsed.

License

You can use this connector for a 30-day trial period without a license key.

After 30 days, you must purchase a connector subscription which includes Confluent enterprise license keys to subscribers, along with enterprise-level support for Confluent Platform and your connectors. If you are a subscriber, please contact Confluent Support at support@confluent.io for more information.

For license properties, see Confluent Platform license and License topic configuration, and for information about the license topic, see License topic configuration .

Configuration Properties

For a complete list of configuration properties for this source connector, see MQTT Source Connector Configuration Properties.

Note

For an example of how to get Kafka Connect connected to Confluent Cloud, see Distributed Cluster.

Install the MQTT Source Connector

You can install this connector by using the Confluent Hub client installation instructions or by manually downloading the ZIP file.

Prerequisites

  • You must install the connector on every machine where Connect will run.
  • If you want to install the connector using Confluent Hub, you must install the Confluent Hub Client. This is installed by default with Confluent Enterprise.

Install the connector using Confluent Hub

To install the latest connector version using Confluent Hub Client, navigate to your Confluent Platform installation directory and run the following command:

confluent-hub install confluentinc/kafka-connect-mqtt:latest

You can install a specific version by replacing latest with a version number as shown in the following example:

confluent-hub install confluentinc/kafka-connect-mqtt:1.5.4

Install the connector manually

Download and extract the ZIP file for your connector and then follow the manual connector installation instructions.

Usage Notes

If you need to change the topic take a look at the RegexRouter transformation which can be used to change the topic name before it is written to Apache Kafka®.

The output of this connector is an envelope with all of the properties of the incoming message. The body of the message is written to the value as bytes. The key is the topic the message was written to.

Examples

Property-based example

This configuration is used typically along with standalone workers.

 name=MqttSourceConnector1
 connector.class=io.confluent.connect.mqtt.MqttSourceConnector
 tasks.max=1
 mqtt.server.uri=< Required Configuration >
 mqtt.topics=< Required Configuration >

REST-based example

This configuration is used typically along with distributed workers. Write the following json to connector.json, configure all of the required values, and use the command below to post the configuration to one the distributed connect worker(s). Check here for more information about the Kafka Connect REST API

Connect Distributed REST example:

 {
   "config" : {
     "name" : "MqttSourceConnector1",
     "connector.class" : "io.confluent.connect.mqtt.MqttSourceConnector",
     "tasks.max" : "1",
     "mqtt.server.uri" : "< Required Configuration >",
     "mqtt.topics" : "< Required Configuration >"
   }
 }

Use curl to post the configuration to one of the Kafka Connect Workers. Change http://localhost:8083/ the endpoint of one of your Kafka Connect worker(s).

  • Create a new connector:

    curl -s -X POST -H 'Content-Type: application/json' --data @connector.json http://localhost:8083/connectors
    
  • Update an existing connector:

    curl -s -X PUT -H 'Content-Type: application/json' --data @connector.json http://localhost:8083/connectors/MqttSourceConnector1/config