はじめに

MQTT Proxy により、MQTT クライアントは MQTT 3.1.1 プロトコル を使用して、データを Apache Kafka® に直接パブリッシュすることができます。これらのクライアントは、MQTT プロトコルにより定義された 3 つのすべてのサービス品質(QoS)レベルで MQTT メッセージをパブリッシュすることができます。クライアントは暗号化接続および非暗号化接続を介して、これを行います。MQTT Proxy は、トランスポート層セキュリティ(TLS)による暗号化および HTTP 基本認証をサポートしています。

MQTT Proxy のすべてのインスタンスはステートレスであり、他のインスタンスから独立しています。これにより MQTT Proxy では、MQTT データの冗長な永続性を回避することができ、従来の MQTT ブローカーと比較すると、メッセージのパブリッシュにおけるラグの低減が見られます。MQTT メッセージを Kafka にパブリッシュするために、MQTT Proxy は、Kafka トピックへの MQTT トピックの単純なマッピングスキームを使用します。これは、正規表現に基づいています。

インストール

Confluent Platform の インストール手順 を参照してください。MQTT Proxy を起動する前に、Kafka を起動する必要があります。「Confluent Platform クイックスタート」では、これらのサービスをローカルでテスト用に起動する方法について説明しています。

MQTT Proxy クイックスタート

MQTT Proxy を使用して Kafka に対する最初の MQTT メッセージを生成するには、以下で説明する手順に従います。

前提条件

依存関係の開始

MQTT Proxy を起動する前に Kafka クラスターを実行している必要があるため、最初に Kafka および ZooKeeper を起動します。Confluent CLI confluent local コマンドを使用すると、これらのサービスを 1 つのコマンドで開始できます。

confluent local services kafka start

各サービスの構成は、etc 下の対応するプロパティファイルから読み取られます。

注釈

各サービスを独自のターミナルで手動で起動するには、代わりに次のコマンドを実行します。

bin/zookeeper-server-start ./etc/kafka/zookeeper.properties
bin/kafka-server-start ./etc/kafka/server.properties

ここでも、これらのサービスを開始する方法について詳しくは、「Confluent Platform クイックスタート」を参照してください。

MQTT Proxy の構成

MQTT Proxy のすべての構成オプションは、「MQTT Proxy の構成オプション」に記載されています。以下では、ローカルノード上で MQTT Proxy が機能するための必要最小限のプロパティについて説明します。これらのプロパティは、Confluent Platform ディストリビューションに付属の kafka-mqtt-dev.properties ファイル内で構成されます。このファイルには、MQTT Proxy において使用可能なすべての構成オプションのリストが記載されています。

topic.regex.list=temperature:.*temperature, brightness:.*brightness
listeners=0.0.0.0:1883
bootstrap.servers=PLAINTEXT://localhost:9092
confluent.topic.replication.factor=1

その他の MQTT Proxy 設定と同様に、上記のプロパティを変更するには、etc/confluent-kafka-mqtt ディレクトリ内で kafka-mqtt-dev.properties を編集します。

セキュリティ、認証、および暗号化の通信設定の詳細については、「通信セキュリティ設定」を参照してください。

Kafka トピックの作成

前述のトピックマッピングに基づいて、MQTT Proxy は temperature および brightness の Kafka トピックにメッセージをパブリッシュします。これらのトピックを作成するには、以下を実行します。

bin/kafka-topics --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic temperature
bin/kafka-topics --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic brightness

MQTT Proxy の起動

構成したら、MQTT Proxy を起動することができます。

bin/kafka-mqtt-start etc/confluent-kafka-mqtt/kafka-mqtt-dev.properties

Kafka へのデータのパブリッシュ

MQTT プロトコルをサポートする任意のクライアントを使用して、データを Kafka にパブリッシュすることができます。この例では、Eclipse Mosquitto MQTT クライアント を使用します。

MQTT クライアントのインストール

オペレーティングシステムによっては、以下のように mosquitto をインストールすることを選択できます。

MacOS:

brew install mosquitto

Ubuntu 16:

sudo apt-get update
sudo apt-get install -y software-properties-common
sudo apt-add-repository ppa:mosquitto-dev/mosquitto-ppa
sudo apt-get install -y mosquitto-clients

RHEL 7 および CentOS 7:

以下のコマンドを使用します。このチュートリアル を参照してください。

sudo yum -y install epel-release
sudo yum -y install mosquitto

その他のオペレーティングシステムの手順は、こちら で確認することができます。

MQTT メッセージのパブリッシュ

この例では、MQTT プロトコルが提供する最高位のサービス品質である QoS2 を使用します。

mosquitto_pub -h 0.0.0.0 -p 1883 -t car/engine/temperature -q 2 -m "190F"
mosquitto_pub -h 0.0.0.0 -p 1883 -t car/engine/temperature -q 2 -m "200F"
mosquitto_pub -h 0.0.0.0 -p 1883 -t car/engine/temperature -q 2 -m "210F"

Kafka でのメッセージの検証

以下に示すように、各 Kafka レコードのキーには MQTT トピック名が含まれており、値には MQTT ペイロードが含まれています。

bin/kafka-console-consumer --bootstrap-server localhost:9092 \
--topic temperature \
--property print.key=true \
--from-beginning
     car/engine/temperature    190F
     car/engine/temperature    200F
     car/engine/temperature    210F

MQTT Proxy は、いくつかの追加の MQTT メタデータも Kafka レコードヘッダーとして格納します。

注釈

以下は、MQTT メッセージの継続的なフィードを生成するための、200 ms ごとにメッセージが生成される例です。

while true; do echo $(( $RANDOM % (231-180) + 180)); sleep .2; done | \
    mosquitto_pub -h 0.0 .0.0 -p 1883 -t car/engine/temperature -q 2 -l

要件