Introduction to MQTT Proxy

MQTT Proxy enables MQTT clients to use the MQTT 3.1.1 protocol to publish data directly to Apache Kafka®. These clients can publish MQTT messages in all three Quality-of-Service (QoS) levels defined by the MQTT protocol. The clients do this over encrypted and unencrypted connections. MQTT Proxy supports encryption and HTTP Basic authentication through Transport Layer Security (TLS).

Every instance of MQTT Proxy is stateless and independent of other instances. This allows MQTT Proxy to avoid redundant persistence of MQTT data and exhibit reduced lag in message publishing when compared to traditional MQTT brokers. To publish MQTT messages to Kafka, MQTT Proxy uses a simple mapping scheme of MQTT topics to Kafka topics that is based on regular expressions.

Installation

Ready to get started?

MQTT Proxy Quick start

To produce your first MQTT messages to Kafka with MQTT Proxy follow the steps described below.

Prerequisites

Start dependencies

You must have a running Kafka cluster before you start MQTT Proxy, so first start Kafka and ZooKeeper. You can start these services in one command with Confluent CLI confluent local commands:

confluent local services kafka start

Each service reads its configuration from its property files under etc.

Note

To manually start each service in its own terminal, run instead:

bin/zookeeper-server-start ./etc/kafka/zookeeper.properties
bin/kafka-server-start ./etc/kafka/server.properties

Again, see the Confluent Platform quickstart for a more detailed explanation of how to get these services up and running.

Configure MQTT Proxy

The full set of configuration options for MQTT Proxy are documented MQTT Proxy Configuration Options. The minimum required properties for MQTT Proxy to work on a local node are provided below. These properties are configured in the kafka-mqtt-dev.properties file that comes with your Confluent Platform distribution and lists all the available configuration options for MQTT Proxy.

topic.regex.list=temperature:.*temperature, brightness:.*brightness
listeners=0.0.0.0:1883
bootstrap.servers=PLAINTEXT://localhost:9092
confluent.topic.replication.factor=1

To change the above properties, as well as any other MQTT Proxy setting, edit kafka-mqtt-dev.properties inside the directory etc/confluent-kafka-mqtt.

For information about communication settings for security, authentication, and encryption, see Communication Security Settings for MQTT Proxy.

Create Kafka topics

Based on topic mapping describe above, MQTT Proxy will publish messages into the Kafka topics temperature and brightness. To create these topics, run:

bin/kafka-topics --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic temperature
bin/kafka-topics --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic brightness

Start MQTT Proxy

Once configured, MQTT Proxy can be started:

bin/kafka-mqtt-start etc/confluent-kafka-mqtt/kafka-mqtt-dev.properties

Publishing data to Kafka

You can use any client that supports the MQTT protocol to publish data into Kafka. In this example, we use the Eclipse Mosquitto MQTT client.

Install MQTT client

Depending on your operating system, you may choose to install mosquitto as follows:

MacOS:

brew install mosquitto

Ubuntu 16:

sudo apt-get update
sudo apt-get install -y software-properties-common
sudo apt-add-repository ppa:mosquitto-dev/mosquitto-ppa
sudo apt-get install -y mosquitto-clients

RHEL 7 and CentOS 7:

Use the following commands, as described in this tutorial.

sudo yum -y install epel-release
sudo yum -y install mosquitto

Instructions for more operating systems are available here.

Publish MQTT messages

This example uses QoS2, the highest quality of service supported by the MQTT protocol.

mosquitto_pub -h 0.0.0.0 -p 1883 -t car/engine/temperature -q 2 -m "190F"
mosquitto_pub -h 0.0.0.0 -p 1883 -t car/engine/temperature -q 2 -m "200F"
mosquitto_pub -h 0.0.0.0 -p 1883 -t car/engine/temperature -q 2 -m "210F"

Verify messages in Kafka

As you can see below, the key of each Kafka records contains the MQTT topic name and the value includes the MQTT payload.

bin/kafka-console-consumer --bootstrap-server localhost:9092 \
--topic temperature \
--property print.key=true \
--from-beginning
     car/engine/temperature    190F
     car/engine/temperature    200F
     car/engine/temperature    210F

MQTT Proxy also stores a few additional MQTT metadata as Kafka record headers.

Note

To produce a continuous feed of MQTT messages here’s an example that produces a message every 200ms:

while true; do echo $(( $RANDOM % (231-180) + 180)); sleep .2; done | \
    mosquitto_pub -h 0.0 .0.0 -p 1883 -t car/engine/temperature -q 2 -l

Requirements