You are viewing documentation for an older version of Confluent Platform. For the latest, click here.
MQTT Proxy enables MQTT clients to use the MQTT 3.1.1 protocol to publish data directly to Kafka. These clients can publish MQTT messages in all three Quality-of-Service (QoS) levels defined by the MQTT protocol. The clients do this over encrypted and unencrypted connections. MQTT Proxy supports encryption and basic authentication through Transport Layer Security (TLS).
Every instance of MQTT Proxy is stateless and independent of other instances. This allows MQTT Proxy to avoid redundant persistence of MQTT data and exhibit reduced lag in message publishing when compared to traditional MQTT brokers. To publish MQTT messages to Kafka, MQTT Proxy uses a simple mapping scheme of MQTT topics to Kafka topics that is based on regular expressions.
MQTT Proxy only supports devices publishing to Kafka via the MQTT Proxy. It does not support devices subscribing from Kafka via MQTT Proxy.
To produce your first MQTT messages to Kafka with MQTT Proxy follow the steps described below.
You must have a running Kafka cluster before you start MQTT Proxy, so first start Kafka and ZooKeeper. You can start these services in one command with Confluent CLI:
confluent start kafka
Each service reads its configuration from its property files under
To manually start each service in its own terminal, run instead:
bin/zookeeper-server-start ./etc/kafka/zookeeper.properties bin/kafka-server-start ./etc/kafka/server.properties
Again, see the Confluent Platform quickstart for a more detailed explanation of how to get these services up and running.
Configure MQTT Proxy¶
The full set of configuration options for MQTT Proxy are documented
here. Below is the bare minimum of required configs for MQTT Proxy to
work on a local node. These settings are present in the
kafka-mqtt-dev.properties file that
comes with your Confluent Platform distribution and lists all the available configuration options for MQTT Proxy:
topic.regex.list=temperature:.*temperature, brightness:.*brightness listeners=0.0.0.0:1883 bootstrap.servers=PLAINTEXT://localhost:9092 confluent.topic.replication.factor=1
To change the above properties, as well as any other MQTT Proxy setting, edit
kafka-mqtt-dev.properties inside the directory
Create Kafka topics¶
Based on topic mapping describe above, MQTT Proxy will publish messages into the Kafka
brightness. To create these topics, run:
bin/kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic temperature bin/kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic brightness
Start MQTT Proxy¶
Once configured, MQTT Proxy can be started:
Publishing data to Kafka¶
You can use any client that supports the MQTT protocol to publish data into Kafka. In this example, we use the Eclipse Mosquitto MQTT client.
Install MQTT client¶
Depending on your operating system, you may choose to install
mosquitto as follows:
brew install mosquitto
sudo apt-get update sudo apt-get install -y software-properties-common sudo apt-add-repository ppa:mosquitto-dev/mosquitto-ppa sudo apt-get install -y mosquitto-clients
sudo curl -o /etc/yum.repos.d/mqtt-rhel7.repo http://download.opensuse.org/repositories/home:/oojah:/mqtt/RedHat_RHEL-7/home:oojah:mqtt.repo sudo yum -y install mosquitto-clients
Instructions for more operating systems are available here.
Publish MQTT messages¶
This example uses QoS2, the highest quality of service supported by the MQTT protocol.
mosquitto_pub -h 0.0.0.0 -p 1883 -t car/engine/temperature -q 2 -m "190F" mosquitto_pub -h 0.0.0.0 -p 1883 -t car/engine/temperature -q 2 -m "200F" mosquitto_pub -h 0.0.0.0 -p 1883 -t car/engine/temperature -q 2 -m "210F"
Verify messages in Kafka¶
As you can see below, the key of each Kafka records contains the MQTT topic name and the value includes the MQTT payload.
kafka-console-consumer --bootstrap-server localhost:9092 \ --topic temperature \ --property print.key=true \ --from-beginning car/engine/temperature 190F car/engine/temperature 200F car/engine/temperature 210F
MQTT Proxy also stores a few additional MQTT metadata as Kafka record headers.
To produce a continuous feed of MQTT messages here’s an example that produces a message every 200ms:
while true; do echo $(( $RANDOM % (231-180) + 180)); sleep .2; done | \ mosquitto_pub -h 0.0 .0.0 -p 1883 -t car/engine/temperature -q 2 -l