重要

このページの日本語コンテンツは古くなっている可能性があります。最新の英語版コンテンツをご覧になるには、こちらをクリックしてください。

MQTT Sink Connector for Confluent Cloud

注釈

Confluent Platform 用のコネクターをローカルにインストールする場合は、「MQTT Sink Connector for Confluent Platform」を参照してください。

フルマネージド型の Kafka Connect MQTT Sink Connector を使用すると、Apache Kafka® から MQTT ブローカーへのデータのストリーミングが可能になります。

機能

MQTT Sink Connector には、以下の機能があります。

  • 少なくとも 1 回のデリバリー: レコードが MQTT のトピックに少なくとも 1 回は配信されることが保証されます。
  • 複数のタスクのサポート: 1 つまたは複数のタスクの実行をサポートしています。タスクが多いほどパフォーマンスが向上する可能性があります。
  • スキーマ: このコネクターは、Avro、JSON スキーマ、および Protobuf 入力データフォーマットをサポートします。スキーマレジストリ ベースのフォーマットを使用するには、Schema Registry を有効にしておく必要があります。このコネクターではバイトスキーマと文字列スキーマのみがサポートされる点に注意してください。構造体はサポートされません。構造体型のスキーマが必要な場合は、構造体データをバイトとして格納し、コネクターでバイトを選択できます。

Confluent Cloud API for Connect の詳細と使用例については、「Confluent Cloud API for Connect」セクションを参照してください。

制限

以下の情報を確認してください。

クイックスタート

このクイックスタートを使用して、Confluent Cloud MQTT Sink Connector の利用を開始することができます。このクイックスタートでは、コネクターを選択して、イベントが MQTT ブローカーにストリーミングされるように構成するための基本的な方法について説明します。

前提条件
  • アマゾンウェブサービス (AWS)、Microsoft Azure (Azure)、または Google Cloud で Confluent Cloud クラスターへのアクセスを許可されていること。
  • MQTT ブローカーへのアクセス。
  • Confluent CLI がインストールされ、クラスター用に構成されていること。「Confluent CLI のインストール」を参照してください。
  • スキーマレジストリ ベースのフォーマット(Avro、JSON_SR(JSON スキーマ)、Protobuf など)を使用するには、Schema Registry を有効にしておく必要があります。詳細については、「スキーマレジストリ Enabled Environments」を参照してください。
  • ネットワークに関する考慮事項については、「Networking and DNS Considerations」を参照してください。静的なエグレス IP を使用する方法については、「静的なエグレス IP アドレス」を参照してください。
  • スキーマレジストリ ベースのフォーマット(Avro、JSON_SR(JSON スキーマ)、Protobuf など)を使用するには、Schema Registry を有効にしておく必要があります。詳細については、「スキーマレジストリ Enabled Environments」を参照してください。
  • Kafka クラスターの認証情報。次のいずれかの方法で認証情報を指定できます。
    • 既存の サービスアカウント のリソース ID を入力する。
    • コネクター用の Confluent Cloud サービスアカウント を作成する。サービスアカウントのドキュメント で、必要な ACL エントリを確認してください。一部のコネクターには固有の ACL 要件があります。
    • Confluent Cloud の API キーとシークレットを作成する。キーとシークレットを作成するには、confluent api-key create を使用するか、コネクターのセットアップ時に Cloud Console で直接 API キーとシークレットを自動生成します。

Confluent Cloud Console の使用

ステップ 1: Confluent Cloud クラスターを起動します。

インストール手順については、「Quick Start for Confluent Cloud」を参照してください。

ステップ 2: コネクターを追加します。

左のナビゲーションメニューの Data integration をクリックし、Connectors をクリックします。クラスター内に既にコネクターがある場合は、+ Add connector をクリックします。

ステップ 3: コネクターを選択します。

Click the MQTT Sink connector card.

MQTT Sink Connector Card

ステップ 4: コネクターの詳細情報を入力します。

注釈

  • すべての 前提条件 を満たしていることを確認してください。
  • アスタリスク( * )は必須項目であることを示しています。

Add MQTT Sink Connector 画面で、以下を実行します。

既に Kafka トピックを用意している場合は、Topics リストから接続するトピックを選択します。

新しいトピックを作成するには、+Add new topic をクリックします。

ステップ 5: ブローカーで結果を確認します。

新しいレコードが MQTT ブローカーに追加されていることを確認します。

Confluent Cloud API for Connect の詳細と使用例については、「Confluent Cloud API for Connect」セクションを参照してください。

ちなみに

コネクターを起動すると、デッドレターキューのトピックが自動的に作成されます。詳細については、「Confluent Cloud デッドレターキュー」を参照してください。

参考

フルマネージド型の Confluent Cloud コネクターが Confluent Cloud ksqlDB でどのように動作するかを示す例については、「Cloud ETL のデモ」を参照してください。この例では、Confluent CLI を使用して Confluent Cloud のリソースを管理する方法についても説明しています。

../_images/topology.ja.png

Confluent CLI の使用

以下の手順に従うと、Confluent CLI を使用してコネクターをセットアップし、実行できます。

注釈

ステップ 1: 使用可能なコネクターをリスト表示します。

以下のコマンドを入力して、使用可能なコネクターをリスト表示します。

confluent connect plugin list

ステップ 2: コネクターの必須の構成プロパティを表示します。

以下のコマンドを実行して、コネクターの必須プロパティを表示します。

confluent connect plugin describe <connector-catalog-name>

例:

confluent connect plugin describe MySqlSink

出力例:

Following are the required configs:
connector.class: MqttSink
name
input.data.format
kafka.auth.mode
kafka.auth.mode
kafka.api.key
kafka.api.secret
mqtt.server.uri
tasks.max
topics

ステップ 3: コネクターの構成ファイルを作成します。

コネクター構成プロパティを含む JSON ファイルを作成します。以下の例は、コネクターの必須プロパティを示しています。

{
  "connector.class": "MqttSink",
  "name": "MqttSink_0",
  "input.data.format": "AVRO",
  "kafka.auth.mode": "KAFKA_API_KEY",
  "kafka.api.key": "<my-kafka-api-key>",
  "kafka.api.secret": "<my-kafka-api-secret>",
  "mqtt.server.uri" : ""tcp://192.0.0.1:1881",
  "topics" : "kafka_topic_0",
  "tasks.max" : "1"
}

以下のプロパティ定義に注意してください。

  • "name": 新しいコネクターの名前を設定します。
  • "connector.class": コネクターのプラグイン名を指定します。
  • "input.data.format": AVRO、BYTES、JSON、JSON_SR(JSON スキーマ)、または PROTOBUF をサポートします。スキーマベースのメッセージフォーマットを使用するには、有効なスキーマが Schema Registry に存在する必要があります。
  • "kafka.auth.mode": 使用するコネクターの認証モードを指定します。オプションは SERVICE_ACCOUNT または KAFKA_API_KEY (デフォルト)です。API キーとシークレットを使用するには、構成プロパティ kafka.api.keykafka.api.secret を構成例(前述)のように指定します。サービスアカウント を使用するには、プロパティ kafka.service.account.id=<service-account-resource-ID>リソース ID を指定します。使用できるサービスアカウントのリソース ID のリストを表示するには、次のコマンドを使用します。

    confluent iam service-account list
    

    例:

    confluent iam service-account list
    
       Id     | Resource ID |       Name        |    Description
    +---------+-------------+-------------------+-------------------
       123456 | sa-l1r23m   | sa-1              | Service account 1
       789101 | sa-l4d56p   | sa-2              | Service account 2
    
  • "mqtt.server.uri": MQTT ブローカーの URI。<PROTOCOL>//:URI というフォーマットを使用する必要があります。サポートされるプロトコルは、TCP、SSL、WS、および WSS です。TLS 接続の場合は、認証情報を追加で指定し、キーストアファイルとトラストストアファイルをアップロードする必要があります。これらのプロパティの値と定義については、MQTT Sink の構成プロパティ を参照してください。

    注釈

    MQTT ブローカーで匿名モードがサポートされていない場合は、次の 2 つのプロパティを追加する必要があります。

    • "mqtt.username": "<mqtt_broker_username>"
    • "mqtt.password": "<user_password>"
  • "topics": MQTT ブローカーに送るデータがある Kafka トピック名(または、コンマで区切った複数のトピック名)。

  • "tasks.max": このコネクターで使用する タスク の数を入力します。このコネクターは複数のタスクに対応しています。タスクが多いほどパフォーマンスが向上する可能性があります。

注釈

データの送信先 MQTT トピック名は Kafka トピック名と同じです。

Single Message Transforms: CLI を使用した SMT の追加の詳細については、Single Message Transforms(SMT) のドキュメントを参照してください。

すべてのプロパティの値と定義については、「構成プロパティ」を参照してください。

ステップ 4: 構成ファイルを読み込み、コネクターを作成します。

以下のコマンドを入力して、構成を読み込み、コネクターを起動します。

confluent connect create --config <file-name>.json

例:

confluent connect create --config mqtt-server-sink-config.json

出力例:

Created connector MqttSink_0 lcc-ix4dl

ステップ 5: コネクターのステータスを確認します。

以下のコマンドを入力して、コネクターのステータスを確認します。

confluent connect plugin list

出力例:

ID          |       Name   | Status  | Type
+-----------+--------------+---------+------+
lcc-ix4dl   | MqttSink_0   | RUNNING | sink

ステップ 6: データベースで結果を確認します。

新しいレコードが MQTT データベースに追加されていることを確認します。

Confluent Cloud API for Connect の詳細と使用例については、「Confluent Cloud API for Connect」セクションを参照してください。

ちなみに

コネクターを起動すると、デッドレターキューのトピックが自動的に作成されます。詳細については、「Confluent Cloud デッドレターキュー」を参照してください。

構成プロパティ

このコネクターでは、以下のコネクター構成プロパティを使用します。

データへの接続方法(How should we connect to your data?)

name

コネクターの名前を設定します。

  • 型: string
  • 指定可能な値: 最大 64 文字の文字列
  • 重要度: 高

データの取得元とするトピック(Which topics do you want to get data from?)

topics

特定のトピック名を指定するか、複数のトピック名をコンマ区切りにしたリストを指定します。

  • 型: list
  • 重要度: 高

入力メッセージ(Input messages)

input.data.format

Kafka 入力レコード値のフォーマットを設定します。指定可能なエントリは、AVRO、JSON_SR、PROTOBUF、JSON、または BYTES です。スキーマベースのメッセージフォーマット(AVRO、JSON_SR、PROTOBUF など)を使用する場合は、Confluent Cloud Schema Registry を構成しておく必要がある点に注意してください。

  • 型: string
  • 重要度: 高

Kafka クラスターの認証情報(Kafka Cluster credentials)

kafka.auth.mode

Kafka の認証モード。KAFKA_API_KEY または SERVICE_ACCOUNT を指定できます。デフォルトは KAFKA_API_KEY モードです。

  • 型: string
  • デフォルト: KAFKA_API_KEY
  • 指定可能な値: KAFKA_API_KEY、SERVICE_ACCOUNT
  • 重要度: 高
kafka.api.key
  • 型: password
  • 重要度: 高
kafka.service.account.id

Kafka クラスターとの通信用の API キーを生成するために使用されるサービスアカウント。

  • 型: string
  • 重要度: 高
kafka.api.secret
  • 型: password
  • 重要度: 高

MQTT ブローカーへの接続方法(How should we connect to MQTT Broker?)

mqtt.server.uri

MQTT ブローカーの URI。<PROTOCOL>//:URI というフォーマットで指定する必要があります。サポートされるプロトコルは、tcp、ssl、ws、および wss です。TLS を使用する接続については、キーストアとトラストストアを指定する必要があることに注意してください。

  • 型: list
  • 重要度: 高
mqtt.username

接続に使用するユーザー名、またはユーザー名が必須でない場合は空白にします。

  • 型: string
  • 重要度: 高
mqtt.password

接続に使用するパスワード、またはパスワードが必須でない場合は空白にします。

  • 型: password
  • デフォルト: [hidden]
  • 重要度: 高

MQTT へのセキュアな接続(MQTT secure connection)

mqtt.ssl.key.store.file

サーバーでの認証に使用されるプライベートキーを含む Java キーストアファイルの場所。

  • 型: password
  • デフォルト: [hidden]
  • 重要度: 低
mqtt.ssl.key.store.password

Java キーストアファイルを開くために使用されるパスワード。

  • 型: password
  • デフォルト: [hidden]
  • 重要度: 中
mqtt.ssl.key.password

Java キーストアに格納されるクライアント証明書のパスワード。

  • 型: password
  • デフォルト: [hidden]
  • 重要度: 高
mqtt.ssl.trust.store.file

サーバーとの SSL 接続の検証に要求される証明書が含まれる Java トラストストアファイルの場所。

  • 型: password
  • デフォルト: [hidden]
  • 重要度: 中
mqtt.ssl.trust.store.password

Java トラストストアファイルを開くために使用されるパスワード。

  • 型: password
  • デフォルト: [hidden]
  • 重要度: 中

接続の詳細(Connection Details)

mqtt.clean.session.enabled

クライアントとサーバーのステートを再起動および再接続後に引き継ぐ必要があるかどうかを設定します。再接続後に未受信のメッセージを受信するには、QOS を 1 以上に設定する必要があります。

  • 型: boolean
  • デフォルト: false
  • 重要度: 中
mqtt.connect.timeout.seconds

接続のタイムアウト値(秒)を設定します。

  • 型: int
  • デフォルト: 30
  • 重要度: 中
mqtt.keepalive.interval.seconds

この値は、秒単位で、メッセージの送受信の最長間隔を定義します。指定の期間中にデータ関連のメッセージがない場合、クライアントから非常に小さな ping メッセージが送信され、サーバーから確認応答が返されます。

  • 型: int
  • デフォルト: 60
  • 重要度: 中
max.retry.time.ms

失敗した操作(MQTT ブローカーとの接続およびレコードのパブリッシュ)に対してコネクターがバックオフおよび再試行に費やす最大時間(ミリ秒)。

  • 型: int
  • デフォルト: 30000(30 秒)
  • 重要度: 中
mqtt.retained.enabled

将来のクライアント用にメッセージを保持する場合は true を設定します。

  • 型: boolean
  • デフォルト: true
  • 重要度: 中
mqtt.qos

MQTT ブローカーにメッセージを書き込む際の QOS レベル。

  • 型: int
  • デフォルト: 0
  • 重要度: 中

このコネクターのタスク数(Number of tasks for this connector)

tasks.max
  • 型: int
  • 指定可能な値: [1、…]
  • 重要度: 高

次のステップ

参考

フルマネージド型の Confluent Cloud コネクターが Confluent Cloud ksqlDB でどのように動作するかを示す例については、「Cloud ETL のデモ」を参照してください。この例では、Confluent CLI を使用して Confluent Cloud のリソースを管理する方法についても説明しています。

../_images/topology.ja.png