MQTT Source Connector for Confluent Platform


If you are using Confluent Cloud, see MQTT Source Connector for Confluent Cloud for the cloud Quick Start.

The Kafka Connect MQTT Source Connector connects to a MQTT broker and subscribes to the specified topics. SSL is supported. For information on how to create SSL keys and certificates, see Creating SSL Keys and Certificates. For the relevant configuration properties, see the MQTT Source connector configuration reference.


At least once delivery

Some restrictions apply in order to gurantee this under normal operations. Please see limitations section below.

This connector guarantees that records are delivered to the Kafka topic at least once. If the connector restarts, there may be some duplicate records in the Kafka topic.

Multiple tasks

The MQTT Source connector supports running one or more tasks. You can specify the number of tasks in the tasks.max configuration parameter. This can lead to huge performance gains when multiple files need to be parsed.

Configuration Properties

For a complete list of configuration properties for this source connector, see MQTT Source Connector Configuration Properties.


For an example of how to get Kafka Connect connected to Confluent Cloud, see Distributed Cluster.

Usage Notes

If you need to change the topic take a look at the RegexRouter transformation which can be used to change the topic name before it is written to Apache Kafka®.

The output of this connector is an envelope with all of the properties of the incoming message. The body of the message is written to the value as bytes. The key is the topic the message was written to.


At least once delivery

To support at-least-once semantics, you should run the connector with mqtt.clean.session.enabled set to false. Under this mode, the connector will assign each task a unique client ID, which will be used for a persistent connection.

The topics are distributed across tasks. Confluent recommends you do not change the topic assignments–that is, do not alter the task count, or change the topic listing once the connector is up and running. This can potentially lead to data loss as the connector will be restarted with a different topic to task assignments, and hence, a different client ID.


Property-based example

This configuration is used typically along with standalone workers.

 mqtt.server.uri=< Required Configuration >
 mqtt.topics=< Required Configuration >

REST-based example

This configuration is used typically along with distributed workers. Write the following json to connector.json, configure all of the required values, and use the command below to post the configuration to one the distributed connect worker(s). Check here for more information about the Kafka Connect REST API

Connect Distributed REST example:

   "config" : {
     "name" : "MqttSourceConnector1",
     "connector.class" : "io.confluent.connect.mqtt.MqttSourceConnector",
     "tasks.max" : "1",
     "mqtt.server.uri" : "< Required Configuration >",
     "mqtt.topics" : "< Required Configuration >"

Use curl to post the configuration to one of the Kafka Connect Workers. Change http://localhost:8083/ the endpoint of one of your Kafka Connect worker(s).

  • Create a new connector:

    curl -s -X POST -H 'Content-Type: application/json' --data @connector.json http://localhost:8083/connectors
  • Update an existing connector:

    curl -s -X PUT -H 'Content-Type: application/json' --data @connector.json http://localhost:8083/connectors/MqttSourceConnector1/config