MQTT Source Connector for Confluent Platform¶
Kafka Connect MQTT Source Connector は、MQTT ブローカーに接続して、指定されたトピックをサブスクライブします。SSL がサポートされています。SSL キーと証明書の作成方法については、「SSL キーと証明書の作成」を参照してください。関連する構成プロパティについては、「MQTT Source Connector の構成リファレンス」を参照してください。
機能¶
少なくとも 1 回のデリバリー¶
このコネクターによって、レコードが Kafka のトピックに少なくとも 1 回は配信されることが保証されます。コネクターを再起動した場合は、Kafka のトピックに重複レコードが存在している可能性があります。
複数のタスク¶
MQTT Source Connector は、1 つまたは複数のタスクの実行をサポートしています。タスクの数は tasks.max
構成パラメーターで指定できます。これにより、複数のファイルを解析する必要がある場合に、パフォーマンスが大幅に向上する可能性があります。
構成プロパティ¶
このソースコネクターの構成プロパティの網羅的なリストについては、「MQTT Source Connector 構成プロパティ」を参照してください。
注釈
Kafka Connect を Confluent Cloud に接続する方法の例については、「分散クラスター」を参照してください。
使用上の注意¶
トピックを変更する場合は、 RegexRouter 変換を参照してください。これを使用して、Apache Kafka® に書き込まれる前にトピック名を変更できます。
このコネクターの出力は、着信メッセージのすべてのプロパティを含むエンベロープです。メッセージの本文は、バイトとして値に書き込まれます。キーは、メッセージが書き込まれたトピックです。
例¶
プロパティベースの例¶
この構成は、通常、 スタンドアロンワーカー と併せて使用されます。
name=MqttSourceConnector1
connector.class=io.confluent.connect.mqtt.MqttSourceConnector
tasks.max=1
mqtt.server.uri=< Required Configuration >
mqtt.topics=< Required Configuration >
REST ベースの例¶
この構成は通常、 分散ワーカー と併せて使用します。以下の json を connector.json
に書き込んで必要な値をすべて構成し、下記のコマンドを使用して分散コネクトワーカーのいずれかに構成をポストします。
Kafka Connect REST API の詳細については、 こちら を参照してください。
Connect 分散 REST の例 :
{
"config" : {
"name" : "MqttSourceConnector1",
"connector.class" : "io.confluent.connect.mqtt.MqttSourceConnector",
"tasks.max" : "1",
"mqtt.server.uri" : "< Required Configuration >",
"mqtt.topics" : "< Required Configuration >"
}
}
いずれかの Kafka Connect ワーカーに構成をポストするには、curl を使用します。http://localhost:8083/ は、Kafka Connect ワーカーのいずれかのエンドポイントに変更してください。
新しいコネクターの作成:
curl -s -X POST -H 'Content-Type: application/json' --data @connector.json http://localhost:8083/connectors
既存のコネクターのアップデート:
curl -s -X PUT -H 'Content-Type: application/json' --data @connector.json http://localhost:8083/connectors/MqttSourceConnector1/config