RabbitMQ Source Connector for Confluent Cloud¶
注釈
If you are installing the connector locally for Confluent Platform, see RabbitMQ Source Connector for Confluent Platform.
フルマネージド型の RabbitMQ Source Connector は、AMQP プロトコルを使用して RabbitMQ サーバーと通信します。RabbitMQ Source Connector では、1 つまたは複数の RabbitMQ キューからデータが読み取られ、Apache Kafka® のトピックに保存されます。
機能¶
RabbitMQ Source Connector には、以下の機能があります。
- 少なくとも 1 回のデリバリー: レコードが Kafka のトピックに少なくとも 1 回は配信されることが保証されます。
- 複数のタスクのサポート: 1 つまたは複数のタスクの実行をサポートしています。タスクが多いほどパフォーマンスが向上する可能性があります。
Confluent Cloud API for Connect の詳細と使用例については、「Confluent Cloud API for Connect」セクションを参照してください。
クイックスタート¶
このクイックスタートを使用して、Confluent Cloud RabbitMQ Source Connector の利用を開始することができます。このクイックスタートでは、コネクターを選択し、RabbitMQ のキューからデータを読み取って Apache Kafka® トピックに保存するようにコネクターを構成する方法について説明します。
- 前提条件
- アマゾンウェブサービス ( AWS )、Microsoft Azure ( Azure )、または Google Cloud Platform ( GCP )上の Confluent Cloud クラスターへのアクセスを許可されていること。
- RabbitMQ ホストサーバー、キュー、およびホストのセキュリティ情報へのアクセスが許可されていること。
- コネクターを起動する前に、トピックが 1 つ以上存在している必要があります。
- Confluent CLI がインストールされ、クラスター用に構成されていること。「Confluent CLI のインストール」を参照してください。
- スキーマレジストリ ベースのフォーマット(Avro、JSON_SR(JSON スキーマ)、Protobuf など)を使用するには、Schema Registry を有効にしておく必要があります。詳細については、「環境の制限」を参照してください。
- ネットワークに関する考慮事項については、「ネットワークアクセス」を参照してください。静的なエグレス IP を使用する方法については、「静的なエグレス IP アドレス」を参照してください。
- Kafka クラスターの認証情報。次のいずれかの方法で認証情報を指定できます。
- 既存の サービスアカウント のリソース ID を入力する。
- コネクター用の Confluent Cloud サービスアカウント を作成する。サービスアカウントのドキュメント で、必要な ACL エントリを確認してください。一部のコネクターには固有の ACL 要件があります。
- Confluent Cloud の API キーとシークレットを作成する。キーとシークレットを作成するには、confluent api-key create を使用するか、コネクターのセットアップ時に Cloud Console で直接 API キーとシークレットを自動生成します。
詳しくは、Cloud コネクターの制限事項 を参照してください。
注釈
このソースコネクターで使用される output.data.format
構成はありません。このコネクターのデフォルトの値が ByteArrayConverter
、デフォルトのキーが StringConverter
であるためです。このコネクターで、他のコンバーターは使用できません。
Confluent Cloud Console の使用¶
ステップ 1: Confluent Cloud クラスターを起動します。¶
インストール手順については、「Confluent Cloud を使用した Apache Kafka のクイックスタート」を参照してください。
ステップ 2: コネクターを追加します。¶
左のナビゲーションメニューの Data integration をクリックし、Connectors をクリックします。クラスター内に既にコネクターがある場合は、+ Add connector をクリックします。
ステップ 4: 接続をセットアップします。¶
以下の手順を実行して、Continue をクリックします。
注釈
- すべての 前提条件 を満たしていることを確認してください。
- アスタリスク( * )は必須項目であることを示しています。
- コネクターの 名前 を入力します。
- Kafka Cluster credentials で Kafka クラスターの認証情報の指定方法を選択します。サービスアカウントのリソース ID を選択するか、API キーとシークレットを入力できます(または、Cloud Console でこれらを生成します)。
- topic name にトピック名を入力します。データの送信先とするトピックの名前を入力してください。コネクターを起動する前に、トピックが 1 つ以上存在している必要があります。
- connection details に、接続の詳細情報を追加します。
- RabbitMQ host: 接続先 RabbitMQ ホストサーバーのアドレス。例:
192.168.1.99
。RabbitMQ のユーザー名とパスワードを追加します。 - RabbitMQ virtual host: RabbitMQ で作成 された仮想ホストの名前。
- RabbitMQ port: コネクターがサーバーへの接続に使用するサーバーポート。デフォルト値は
5672
です。
- RabbitMQ host: 接続先 RabbitMQ ホストサーバーのアドレス。例:
- RabbitMQ details で、詳細情報を追加します。
- RabbitMQ queue(s): RabbitMQ ` キューの名前 <https://www.rabbitmq.com/queues.html#names>`__。読み取る複数の RabbitMQ キューを指定できます。
- RabbitMQ batch size: コネクターがバッチで処理して Kafka トピックに返すレコードの最大数。デフォルトのプロパティ値は
1024
件です。 - RabbitMQ backoff time (in milliseconds): RabbitMQ キューからレコードが返されない場合に待機する時間の長さ(ミリ秒)。デフォルトのプロパティ値は
100
ミリ秒です。
- Security プロトコルを選択します(省略可): 暗号化も認証もされていない接続を使用する場合は、
PLAINTEXT
を選択します。セキュアな接続を使用するには、SSL
を選択します。デフォルト値はPLAINTEXT
です。SSL
を選択すると、Key Store および Trust Store のファイルアップロードボタンが、次のプロパティとともに表示されます。- SSL key password: キーストア内のプライベートキーのパスワード。
- Key Store password: キーストアのパスワード。
- Key Store type: キーストアファイルのフォーマット。デフォルト値は
JKS
です。 - Trust Store password: トラストストアのパスワード。PEM ファイルではサポートされていません。
- Trust Store type: トラストストアファイルのフォーマット。デフォルト値は
JKS
です。
- このコネクターで使用する タスク の数を入力します。コネクターは、1 つまたは複数のタスクの実行をサポートしています。タスクが多いほどパフォーマンスが向上する可能性があります。
- Transforms and Predicates: 詳細については、Single Message Transforms(SMT) のドキュメントを参照してください。
See 構成プロパティ for all property values and definitions.
ステップ 5: コネクターを起動します。¶
実行中の構成をプレビューして、接続の詳細情報を確認します。プロパティの構成に問題がないことが確認できたら、Launch をクリックします。
ちなみに
コネクターの出力のプレビューについては、「コネクターのデータプレビュー」を参照してください。
ステップ 6: コネクターのステータスを確認します。¶
コネクターのステータスが Provisioning から Running に変わります。ステータスが変わるまで数分かかる場合があります。
ステップ 7: Kafka トピックを確認します。¶
コネクターが実行中になったら、メッセージが Kafka トピックに取り込まれていることを確認します。
Confluent Cloud API for Connect の詳細と使用例については、「Confluent Cloud API for Connect」セクションを参照してください。
参考
フルマネージド型の Confluent Cloud コネクターが Confluent Cloud ksqlDB でどのように動作するかを示す例については、「Cloud ETL のデモ」を参照してください。この例では、Confluent CLI を使用して Confluent Cloud のリソースを管理する方法についても説明しています。
Confluent CLI の使用¶
以下の手順に従うと、Confluent CLI を使用してコネクターをセットアップし、実行できます。
注釈
- すべての 前提条件 を満たしていることを確認してください。
- コマンド例では Confluent CLI バージョン 2 を使用しています。詳細については、「Confluent CLI v2 への移行 <https://docs.confluent.io/confluent-cli/current/migrate.html#cli-migrate>`__」を参照してください。
ステップ 2: コネクターの必須の構成プロパティを表示します。¶
以下のコマンドを実行して、コネクターの必須プロパティを表示します。
confluent connect plugin describe <connector-catalog-name>
例:
confluent plugin describe RabbitMQSource
出力例:
Following are the required configs:
connector.class: RabbitMQSource
name
kafka.api.key
kafka.api.secret
kafka.topic
rabbitmq.host
rabbitmq.username
rabbitmq.password
rabbitmq.queue
tasks.max
ステップ 3: コネクターの構成ファイルを作成します。¶
コネクター構成プロパティを含む JSON ファイルを作成します。以下の例は、コネクターの必須プロパティを示しています。
{
"connector.class": "RabbitMQSource",
"name": "RabbitMQSource_0",
"kafka.auth.mode": "KAFKA_API_KEY",
"kafka.api.key": "<my-kafka-api-key>",
"kafka.api.secret" : "<my-kafka-api-secret>",
"kafka.topic" : "topic_0"
"rabbitmq.host" : "192.168.1.99",
"rabbitmq.username" : "<username>",
"rabbitmq.password": "<password>",
"rabbitmq.queue": "<queue-name>",
"tasks.max" : "1"
}
以下のプロパティ定義に注意してください。
"name"
: 新しいコネクターの名前を設定します。"connector.class"
: コネクターのプラグイン名を指定します。
"kafka.auth.mode"
: 使用するコネクターの認証モードを指定します。オプションはSERVICE_ACCOUNT
またはKAFKA_API_KEY
(デフォルト)です。API キーとシークレットを使用するには、構成プロパティkafka.api.key
とkafka.api.secret
を構成例(前述)のように指定します。サービスアカウント を使用するには、プロパティkafka.service.account.id=<service-account-resource-ID>
に リソース ID を指定します。使用できるサービスアカウントのリソース ID のリストを表示するには、次のコマンドを使用します。confluent iam service-account list
例:
confluent iam service-account list Id | Resource ID | Name | Description +---------+-------------+-------------------+------------------- 123456 | sa-l1r23m | sa-1 | Service account 1 789101 | sa-l4d56p | sa-2 | Service account 2
"kafka.topic"
: データの送信先とするトピック名を入力します。コネクターを起動する前に、トピックが 1 つ以上存在している必要があります。"rabbitmq...."
: RabbitMQ 認証とキューの詳細を入力します。コンマ区切りのリストを使用して、読み取る複数のrabbitmq.queue
名を指定できます。"tasks.max"
: このコネクターで使用する タスク の数を入力します。コネクターは、1 つまたは複数のタスクの実行をサポートしています。タスクが多いほどパフォーマンスが向上する可能性があります。
Single Message Transforms: CLI を使用した SMT の追加の詳細については、Single Message Transforms(SMT) のドキュメントを参照してください。
See 構成プロパティ for all property values and definitions.
ステップ 4: プロパティファイルを読み込み、コネクターを作成します。¶
以下のコマンドを入力して、構成を読み込み、コネクターを起動します。
confluent connect create --config <file-name>.json
例:
confluent connect create --config rabbitmq-source.json
出力例:
Created connector RabbitMQSourceConnector_0 lcc-ix4dl
ステップ 5: コネクターのステータスを確認します。¶
以下のコマンドを入力して、コネクターのステータスを確認します。
confluent connect plugin list
出力例:
ID | Name | Status | Type
+-----------+---------------------------+---------+-------+
lcc-ix4dl | RabbitMQSourceConnector_0 | RUNNING | source
ステップ 6: Kafka トピックを確認します。¶
コネクターが実行中になったら、メッセージが Kafka トピックに取り込まれていることを確認します。
Confluent Cloud API for Connect の詳細と使用例については、「Confluent Cloud API for Connect」セクションを参照してください。
構成プロパティ¶
Use the following configuration properties with this connector.
How should we connect to your data?¶
name
Sets a name for your connector.
- 型: string
- Valid Values: A string at most 64 characters long
- 重要度: 高
Kafka Cluster credentials¶
kafka.auth.mode
Kafka Authentication mode. It can be one of KAFKA_API_KEY or SERVICE_ACCOUNT. It defaults to KAFKA_API_KEY mode.
- 型: string
- Default: KAFKA_API_KEY
- Valid Values: SERVICE_ACCOUNT, KAFKA_API_KEY
- 重要度: 高
kafka.api.key
- 型: password
- 重要度: 高
kafka.service.account.id
The Service Account that will be used to generate the API keys to communicate with Kafka Cluster.
- 型: string
- 重要度: 高
kafka.api.secret
- 型: password
- 重要度: 高
Which topic do you want to send data to?¶
kafka.topic
Identifies the topic name to write the data to.
- 型: string
- 重要度: 高
How should we connect to RabbitMQ?¶
rabbitmq.host
RabbitMQ host to connect to.
- 型: string
- 重要度: 高
rabbitmq.username
Username to authenticate with RabbitMQ.
- 型: string
- 重要度: 高
rabbitmq.password
Password to authenticate with RabbitMQ.
- 型: password
- 重要度: 高
rabbitmq.virtual.host
The virtual host to use when connecting to the RabbitMQ broker.
- 型: string
- デフォルト: /
- 重要度: 高
rabbitmq.port
RabbitMQ port to connect to. This should be set in accordance with chosen Security protocol.
- 型: int
- デフォルト: 5672
- Valid Values: [0,...]
- 重要度: 高
RabbitMQ details¶
rabbitmq.queue
RabbitMQ queue(s) to read from.
- 型: list
- 重要度: 高
batch.size
The maximum number of records to return to Connect for each poll, if there are more than this number of records already available from RabbitMQ.
- 型: int
- デフォルト: 1024
- Valid Values: [1,...]
- 重要度: 中
backoff.time.ms
The number of milliseconds to wait when no records are returned from the RabbitMQ queue.
- 型: int
- デフォルト: 100
- Valid Values: [1,...]
- 重要度: 中
Security¶
rabbitmq.security.protocol
The security protocol to use when connecting to RabbitMQ. Valid values are PLAINTEXT and SSL.
- 型: string
- デフォルト: PLAINTEXT
- 重要度: 高
rabbitmq.https.ssl.key.password
The password of the private key in the key store file. This is optional for client.
- 型: password
- 重要度: 高
rabbitmq.https.ssl.keystorefile
The key store containing server certificate. Only required if using SSL.
- 型: password
- 重要度: 高
rabbitmq.https.ssl.keystore.password
The store password for the key store file. This is optional for a client and is only needed if key store is configured.
- 型: password
- 重要度: 高
rabbitmq.https.ssl.keystore.type
The file format of the key store file. This is optional for client.
- 型: string
- デフォルト: JKS
- 重要度: 中
rabbitmq.https.ssl.truststorefile
The trust store containing server CA certificate. Only required if using SSL.
- 型: password
- 重要度: 高
rabbitmq.https.ssl.truststore.password
The password for the trust store file. If a password is not set, trust store file configured will still be used, but integrity checking is disabled. Trust store password is not supported for PEM format.
- 型: password
- 重要度: 高
rabbitmq.https.ssl.truststore.type
The file format of the trust store file.
- 型: string
- デフォルト: JKS
- 重要度: 中
Number of tasks for this connector¶
tasks.max
- 型: int
- Valid Values: [1,...]
- 重要度: 高
次のステップ¶
参考
フルマネージド型の Confluent Cloud コネクターが Confluent Cloud ksqlDB でどのように動作するかを示す例については、「Cloud ETL のデモ」を参照してください。この例では、Confluent CLI を使用して Confluent Cloud のリソースを管理する方法についても説明しています。