.. _kafka_mqtt_intro: |mqtt| for |cp| =============== |mqtt| enables MQTT clients to use the `MQTT 3.1.1 protocol `__ to publish data directly to |ak-tm|. These clients can publish MQTT messages in all three Quality-of-Service (QoS) levels defined by the MQTT protocol. The clients do this over encrypted and unencrypted connections. |mqtt| supports encryption and HTTP Basic authentication through Transport Layer Security (TLS). Every instance of |mqtt| is stateless and independent of other instances. This allows |mqtt| to avoid redundant persistence of MQTT data and exhibit reduced lag in message publishing when compared to traditional MQTT brokers. To publish MQTT messages to |ak|, |mqtt| uses a simple mapping scheme of MQTT topics to |ak| topics that is based on regular expressions. Installation ------------ .. include:: ../.hidden/docs-common/home/includes/cloud-platform-cta.rst |mqtt| Quick start ------------------ To produce your first MQTT messages to |ak| with |mqtt| follow the steps described below. .. include:: ../includes/install-cli-prereqs.rst Start dependencies ~~~~~~~~~~~~~~~~~~ You must have a running |ak| cluster before you start |mqtt|, so first start a |ak| controller and then start |ak|. Each service reads its configuration from its property files under ``etc``. .. ifconfig:: platform_docs See the :ref:`Confluent Platform quickstart` for a more detailed explanation of how to get these services up and running. Configure |mqtt| ~~~~~~~~~~~~~~~~ The full set of configuration options for |mqtt| are documented :ref:`kafka_mqtt_configuration_options`. The minimum required properties for |mqtt| to work on a local node are provided below. These properties are configured in the ``kafka-mqtt-dev.properties`` file that comes with your |cp| distribution and lists all the available configuration options for |mqtt|. .. sourcecode:: properties topic.regex.list=temperature:.*temperature, brightness:.*brightness listeners=0.0.0.0:1883 bootstrap.servers=PLAINTEXT://localhost:9092 confluent.topic.replication.factor=1 To change the above properties, as well as any other |mqtt| setting, edit ``kafka-mqtt-dev.properties`` inside the directory ``etc/confluent-kafka-mqtt``. For information about communication settings for security, authentication, and encryption, see :ref:`mqtt_proxy-security-settings`. Create |ak| topics ~~~~~~~~~~~~~~~~~~~ Based on topic mapping describe above, |mqtt| will publish messages into the |ak| topics ``temperature`` and ``brightness``. To create these topics, run: .. sourcecode:: bash bin/kafka-topics --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic temperature bin/kafka-topics --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic brightness Start |mqtt| ~~~~~~~~~~~~ Once configured, |mqtt| can be started: .. sourcecode:: bash bin/kafka-mqtt-start etc/confluent-kafka-mqtt/kafka-mqtt-dev.properties Publishing data to |ak| ~~~~~~~~~~~~~~~~~~~~~~~~ You can use any client that supports the MQTT protocol to publish data into |ak|. In this example, we use the `Eclipse Mosquitto MQTT client `__. Install MQTT client ******************* Depending on your operating system, you may choose to install ``mosquitto`` as follows: *MacOS:* .. sourcecode:: bash brew install mosquitto *Ubuntu 16:* .. sourcecode:: bash sudo apt-get update sudo apt-get install -y software-properties-common sudo apt-add-repository ppa:mosquitto-dev/mosquitto-ppa sudo apt-get install -y mosquitto-clients *RHEL 7 and CentOS 7:* Use the following commands, as described in `this tutorial `__. .. sourcecode:: bash sudo yum -y install epel-release sudo yum -y install mosquitto Instructions for more operating systems are available `here `__. Publish MQTT messages ~~~~~~~~~~~~~~~~~~~~~ This example uses QoS2, the highest quality of service supported by the MQTT protocol. .. sourcecode:: bash mosquitto_pub -h 0.0.0.0 -p 1883 -t car/engine/temperature -q 2 -m "190F" mosquitto_pub -h 0.0.0.0 -p 1883 -t car/engine/temperature -q 2 -m "200F" mosquitto_pub -h 0.0.0.0 -p 1883 -t car/engine/temperature -q 2 -m "210F" Verify messages in |ak| ~~~~~~~~~~~~~~~~~~~~~~~~ As you can see below, the key of each |ak| records contains the MQTT topic name and the value includes the MQTT payload. .. sourcecode:: bash bin/kafka-console-consumer --bootstrap-server localhost:9092 \ --topic temperature \ --property print.key=true \ --from-beginning car/engine/temperature 190F car/engine/temperature 200F car/engine/temperature 210F |mqtt| also stores a few additional MQTT metadata as |ak| record headers. .. note:: To produce a continuous feed of MQTT messages here's an example that produces a message every 200ms: .. sourcecode:: bash while true; do echo $(( $RANDOM % (231-180) + 180)); sleep .2; done | \ mosquitto_pub -h 0.0 .0.0 -p 1883 -t car/engine/temperature -q 2 -l Requirements ------------ - Kafka: 6.0.0-ccs - MQTT Clients supporting the `MQTT 3.1.1 protocol `_