はじめに¶
MQTT Proxy により、MQTT クライアントは MQTT 3.1.1 プロトコル を使用して、データを Apache Kafka® に直接パブリッシュすることができます。これらのクライアントは、MQTT プロトコルにより定義された 3 つのすべてのサービス品質(QoS)レベルで MQTT メッセージをパブリッシュすることができます。クライアントは暗号化接続および非暗号化接続を介して、これを行います。MQTT Proxy は、トランスポート層セキュリティ(TLS)による暗号化および HTTP 基本認証をサポートしています。
MQTT Proxy のすべてのインスタンスはステートレスであり、他のインスタンスから独立しています。これにより MQTT Proxy では、MQTT データの冗長な永続性を回避することができ、従来の MQTT ブローカーと比較すると、メッセージのパブリッシュにおけるラグの低減が見られます。MQTT メッセージを Kafka にパブリッシュするために、MQTT Proxy は、Kafka トピックへの MQTT トピックの単純なマッピングスキームを使用します。これは、正規表現に基づいています。
インストール¶
Confluent Platform の インストール手順 を参照してください。MQTT Proxy を起動する前に、Kafka を起動する必要があります。「Confluent Platform クイックスタート」では、これらのサービスをローカルでテスト用に起動する方法について説明しています。
MQTT Proxy クイックスタート¶
MQTT Proxy を使用して Kafka に対する最初の MQTT メッセージを生成するには、以下で説明する手順に従います。
- 前提条件
- Confluent Platform
- Confluent CLI (個別のインストールが必要)
依存関係の開始¶
MQTT Proxy を起動する前に Kafka クラスターを実行している必要があるため、最初に Kafka および ZooKeeper を起動します。Confluent CLI confluent local コマンドを使用すると、これらのサービスを 1 つのコマンドで開始できます。
confluent local services kafka start
各サービスの構成は、etc
下の対応するプロパティファイルから読み取られます。
注釈
各サービスを独自のターミナルで手動で起動するには、代わりに次のコマンドを実行します。
bin/zookeeper-server-start ./etc/kafka/zookeeper.properties
bin/kafka-server-start ./etc/kafka/server.properties
ここでも、これらのサービスを開始する方法について詳しくは、「Confluent Platform クイックスタート」を参照してください。
MQTT Proxy の構成¶
MQTT Proxy のすべての構成オプションは、「MQTT Proxy の構成オプション」に記載されています。以下では、ローカルノード上で MQTT Proxy が機能するための必要最小限のプロパティについて説明します。これらのプロパティは、Confluent Platform ディストリビューションに付属の kafka-mqtt-dev.properties
ファイル内で構成されます。このファイルには、MQTT Proxy において使用可能なすべての構成オプションのリストが記載されています。
topic.regex.list=temperature:.*temperature, brightness:.*brightness
listeners=0.0.0.0:1883
bootstrap.servers=PLAINTEXT://localhost:9092
confluent.topic.replication.factor=1
その他の MQTT Proxy 設定と同様に、上記のプロパティを変更するには、etc/confluent-kafka-mqtt
ディレクトリ内で kafka-mqtt-dev.properties
を編集します。
セキュリティ、認証、および暗号化の通信設定の詳細については、「通信セキュリティ設定」を参照してください。
Kafka トピックの作成¶
前述のトピックマッピングに基づいて、MQTT Proxy は temperature
および brightness
の Kafka トピックにメッセージをパブリッシュします。これらのトピックを作成するには、以下を実行します。
bin/kafka-topics --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic temperature
bin/kafka-topics --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic brightness
MQTT Proxy の起動¶
構成したら、MQTT Proxy を起動することができます。
bin/kafka-mqtt-start etc/confluent-kafka-mqtt/kafka-mqtt-dev.properties
Kafka へのデータのパブリッシュ¶
MQTT プロトコルをサポートする任意のクライアントを使用して、データを Kafka にパブリッシュすることができます。この例では、Eclipse Mosquitto MQTT クライアント を使用します。
MQTT クライアントのインストール¶
オペレーティングシステムによっては、以下のように mosquitto
をインストールすることを選択できます。
MacOS:
brew install mosquitto
Ubuntu 16:
sudo apt-get update
sudo apt-get install -y software-properties-common
sudo apt-add-repository ppa:mosquitto-dev/mosquitto-ppa
sudo apt-get install -y mosquitto-clients
RHEL 7:
sudo curl -o /etc/yum.repos.d/mqtt-rhel7.repo https://download.opensuse.org/repositories/home:/oojah:/mqtt/RedHat_RHEL-7/home:oojah:mqtt.repo
sudo yum -y install mosquitto-clients
その他のオペレーティングシステムの手順は、こちら で確認することができます。
MQTT メッセージのパブリッシュ¶
この例では、MQTT プロトコルが提供する最高位のサービス品質である QoS2 を使用します。
mosquitto_pub -h 0.0.0.0 -p 1883 -t car/engine/temperature -q 2 -m "190F"
mosquitto_pub -h 0.0.0.0 -p 1883 -t car/engine/temperature -q 2 -m "200F"
mosquitto_pub -h 0.0.0.0 -p 1883 -t car/engine/temperature -q 2 -m "210F"
Kafka でのメッセージの検証¶
以下に示すように、各 Kafka レコードのキーには MQTT トピック名が含まれており、値には MQTT ペイロードが含まれています。
bin/kafka-console-consumer --bootstrap-server localhost:9092 \
--topic temperature \
--property print.key=true \
--from-beginning
car/engine/temperature 190F
car/engine/temperature 200F
car/engine/temperature 210F
MQTT Proxy は、いくつかの追加の MQTT メタデータも Kafka レコードヘッダーとして格納します。
注釈
以下は、MQTT メッセージの継続的なフィードを生成するための、200 ms ごとにメッセージが生成される例です。
while true; do echo $(( $RANDOM % (231-180) + 180)); sleep .2; done | \
mosquitto_pub -h 0.0 .0.0 -p 1883 -t car/engine/temperature -q 2 -l
要件¶
- Kafka: 6.0.0-ccs
- MQTT 3.1.1 プロトコル をサポートする MQTT クライアント