MQTT Sink Connector for Confluent Platform

The Kafka Connect MQTT connector is used to integrate with existing MQTT servers. The connector is compatible with Confluent Platform, version 4.1 and later. Prior versions do not work with this connector. The Kafka Connect MQTT Sink Connector connects to an MQTT broker and publishes data to an MQTT topic. SSL is supported. For information on how to create SSL keys and certificates see Creating SSL Keys and Certificates. For the relevant configuration properties, see the Configuration Reference for MQTT Sink Connector for Confluent Platform.

Features

At least once delivery

This connector guarantees that records are delivered at least once from the Kafka topic.

Dead Letter Queue

This connector supports the Dead Letter Queue (DLQ) functionality. For information about accessing and using the DLQ, see Confluent Platform Dead Letter Queue.

Multiple tasks

The MQTT Sink Connector supports running one or more tasks. You can specify the number of tasks in the tasks.max configuration parameter. This can lead to huge performance gains when multiple files need to be parsed.

Schemas

The connector supports Avro, JSON Schema, and Protobuf input data formats. Schema Registry must be enabled to use a Schema Registry-based format. Note that the connector only supports bytes and string schemas. It does not support structs. If you want to have struct type schemas, you can store the struct data as bytes and select bytes in the connector.

License

You can use this connector for a 30-day trial period without a license key.

After 30 days, you must purchase a connector subscription which includes Confluent enterprise license keys to subscribers, along with enterprise-level support for Confluent Platform and your connectors. If you are a subscriber, you can contact Confluent Support at support@confluent.io for more information.

For license properties, see Confluent Platform license and License topic configuration, and for information about the license topic, see License topic configuration .

Configuration properties

For a complete list of configuration properties for this connector, see Configuration Reference for MQTT Sink Connector for Confluent Platform.

For an example of how to get Kafka Connect connected to Confluent Cloud, see Connect Self-Managed Kafka Connect to Confluent Cloud.

Connection validation and client IDs

The MQTT Sink connector validates connection using a randomly generated client ID. This validation method is fine in most cases, however, issues may arise if the MQTT broker rejects clients based on client IDs.

Once validated, if mqtt.clean.session.enabled is set to false, the connector generates a client ID in the format <connnector-name>-<task-id>. For example, if a connector with the name “mymqtt” has 3 tasks, the following client IDs are generated: “mymqtt-0”, “mymqtt-1”, and “mymqtt-2”. If you set mqtt.clean.session.enabled to true, the connector uses a randomly generated client ID.

Install the MQTT Sink connector

You can install this connector by using the confluent connect plugin install command, or by manually downloading the ZIP file.

Prerequisites

  • You must install the connector on every machine where Connect will run.
  • Java 8+. Note that Java 8 is deprecated in versions 7.2 and later of Confluent Platform. For more details, view Java compatibility with Confluent Platform by version.
  • Confluent CLI
  • An installation of the latest (latest) connector version.

Install the connector using the Confluent CLI

To install the latest connector version, navigate to your Confluent Platform installation directory and run the following command:

confluent connect plugin install confluentinc/kafka-connect-mqtt:latest

You can install a specific version by replacing latest with a version number as shown in the following example:

confluent connect plugin install confluentinc/kafka-connect-mqtt:1.7.0

Install the connector manually

Download and extract the ZIP file for your connector and then follow the manual connector installation instructions.

Usage notes

If you want to change the topic take a look at the RegexRouter transformation which can be used to change the topic name it is sent to the MQTT Server.

This connector publishes using the Apache Kafka® topic name. If you need to publish to a different topic name use a transformation to change the topic name.

Quick start

The MQTT Sink connector is used to publish records in an Apache Kafka® topic to MQTT brokers. This quick start demonstrates how to configure an MQTT Sink connector for the Eclipse Mosquitto broker.

Set up Mosquitto broker and clients

In this step, an Eclipse Mosquitto broker is set up by using Docker images.

  1. Install the Mosquitto utilities for your operating system.

  2. Create a config file named mosquitto.conf for the broker with the following contents.

    persistence false
    log_dest stdout
    allow_anonymous true
    connection_messages true
    

    Tip

    To start a broker that requires clients to authenticate using a username and password, change allow_anonymous to false and add password_file /etc/mosquitto/passwd. Use mosquitto_passwd -c password username to store passwords in this file.

  3. Start the Docker container.

    docker run --name mosquitto -p 1881:1881 -v `pwd`/mosquitto.conf:/mosquitto/config/mosquitto.conf eclipse-mosquitto
    
  4. Start the Docker container using this command.

    docker run --name mosquitto -p 1881:1881 -v `pwd`/mosquitto.conf:/mosquitto/config/mosquitto.conf -v `pwd`/password:/etc/mosquitto/passwd eclipse-mosquitto
    
  5. Verify that the broker is running by publishing a message to it.

    mosquitto_pub -h localhost -p 1881 -t my-mqtt-topic -m "sample-msg-1"
    

    Tip

    If a username and password are needed to connect to the Mosquitto broker, use the -u and -P arguments of the mosquitt_pub to pass in the correct credentials.

  6. To subscribe to all messages on a Mosquitto topic, use the mosquitto_sub command.

    mosquitto_sub -h localhost -p 1881 -t my-mqtt-topic
    

Install the MQTT Connector Plugin

  1. Navigate to your Confluent Platform installation directory and run this command to install the latest version of the MQTT connector.

    confluent connect plugin install confluentinc/kafka-connect-mqtt:latest
    

    Tip

    To install a specific version of connector, replace latest with version number (for example, 1.1.0-preview) in the command above.

  2. Restart Connect to pick up the new plugin.

    confluent local services connect stop && confluent local services connect start
    
  3. Check if the MQTT plugin has been installed correctly and picked up by the plugin loader:

    curl -sS localhost:8083/connector-plugins | jq -c '.[] | select( .class | contains("Mqtt") )'
    

    This command should print the new MQTT connector plugins available to the worker.

    {"class": "io.confluent.connect.mqtt.MqttSinkConnector","type": "sink","version": "1.1.0-preview"}
    {"class": "io.confluent.connect.mqtt.MqttSourceConnector","type": "source","version": "1.1.0-preview"}
    

Configure the connector

  1. Create a configuration file named mqtt-sink-config.json with the following contents.

    {
        "name": "mqtt-sink",
        "config": {
            "connector.class": "io.confluent.connect.mqtt.MqttSinkConnector",
            "tasks.max": "1",
            "mqtt.server.uri": "tcp://127.0.0.1:1881",
            "topics":"kafka-topic-name",
            "mqtt.qos": "2",
            "mqtt.username": " Omit if MQTT broker supports anonymous mode ",
            "mqtt.password": " Omit if MQTT broker supports anonymous mode ",
            "key.converter": "org.apache.kafka.connect.storage.StringConverter",
            "value.converter": "org.apache.kafka.connect.storage.StringConverter",
            "confluent.topic.bootstrap.servers": "kafka:9092",
            "confluent.license": " Omit to enable trial mode ",
            "confluent.topic.replication.factor": "1"
        }
    }
    
  2. Run this command to start the MQTT sink connector using the Confluent CLI confluent local services connect connector load command.

    Caution

    You must include a double dash (--) between the topic name and your flag. For more information, see this post.

    confluent local services connect connector load mqtt-sink --config mqtt-sink-config.json
    

    Your output should resemble:

    {
        "name": "mqtt-sink",
        "config": {
            "connector.class": "io.confluent.connect.mqtt.MqttSinkConnector",
            "tasks.max": "1",
            "mqtt.server.uri": "tcp://127.0.0.1:1881",
            "topics":"kafka-topic-name",
            "mqtt.qos": "2",
            "mqtt.username": " ##### ",
            "mqtt.password": " [hidden] ",
            "key.converter": "org.apache.kafka.connect.storage.StringConverter",
            "value.converter": "org.apache.kafka.connect.storage.StringConverter",
            "confluent.topic.bootstrap.servers": "kafka:9092",
            "confluent.license": " [hidden] ",
            "confluent.topic.replication.factor": "1"
        }
        "tasks": [],
        "type": null
    }
    

Examples

Property-based example

This configuration is used typically along with standalone workers.

 name=MqttSinkConnector1
 connector.class=io.confluent.connect.mqtt.MqttSinkConnector
 tasks.max=1
 topics=< Required Configuration >
 mqtt.server.uri=< Required Configuration >

REST-based example

This configuration is used typically along with distributed workers. Write the following json to connector.json, configure all of the required values, and use the command below to post the configuration to one the distributed connect worker(s). Check here for more information about the Kafka Connect REST API

Connect Distributed REST example:

 {
   "config" : {
     "name" : "MqttSinkConnector1",
     "connector.class" : "io.confluent.connect.mqtt.MqttSinkConnector",
     "tasks.max" : "1",
     "topics" : "< Required Configuration >",
     "mqtt.server.uri" : "< Required Configuration >"
   }
 }

Use curl to post the configuration to one of the Kafka Connect Workers. Change http://localhost:8083/ the endpoint of one of your Kafka Connect worker(s).

  • Create a new connector:

    curl -s -X POST -H 'Content-Type: application/json' --data @connector.json http://localhost:8083/connectors
    
  • Update an existing connector:

    curl -s -X PUT -H 'Content-Type: application/json' --data @connector.json http://localhost:8083/connectors/MqttSinkConnector1/config