重要
このページの日本語コンテンツは古くなっている可能性があります。最新の英語版コンテンツをご覧になるには、こちらをクリックしてください。
Azure Cosmos DB Source Connector for Confluent Cloud¶
フルマネージド型の Microsoft Azure Cosmos DB Source Connector for Confluent Cloud は、Azure Cosmos データベースからレコードを読み取り、Confluent Cloud の Apache Kafka® トピックにデータを書き込みます。
機能¶
Azure Cosmos DB Source Connector は、次の機能をサポートしています。
- トピックからコンテナーへのマッピング: コネクターでは、コンテナー(テーブル)を個々の Kafka トピックにマップできます(例:
topic1#con1,topic2#con2
)。 - 少なくとも 1 回のデリバリー: コネクターによって、Kafka のトピックからのレコードが少なくとも 1 回は配信されることが保証されます。
- 複数のタスクのサポート: 1 つまたは複数のタスクの実行をサポートしています。タスクが多いほどパフォーマンスが向上する可能性があります。1 つのコンテナー(テーブル)が 1 つのタスクにより処理できることに注意してください。
Confluent Cloud API for Connect の詳細と使用例については、「Confluent Cloud API for Connect」セクションを参照してください。
制限¶
以下の情報を確認してください。
- コネクターの制限事項については、Azure CosmosDB Source Connector の制限事項を参照してください。
- 1 つ以上の Single Message Transforms(SMT)を使用する場合は、「SMT の制限」を参照してください。
- Confluent Cloud Schema Registry を使用する場合は、「スキーマレジストリ Enabled Environments」を参照してください。
クイックスタート¶
このクイックスタートを使用して、Confluent Cloud Azure Cosmos DB Source Connector の利用を開始することができます。このクイックスタートでは、コネクターを選択し、データベースから Kafka にイベントをストリーミングするようにコネクターを構成するための基本的な方法について説明します。
- 前提条件
アマゾンウェブサービス (AWS)、Microsoft Azure (Azure)、または Google Cloud Platform (GCP)上の Confluent Cloud クラスターへのアクセスを許可されていること。
Confluent CLI がインストールされ、クラスター用に構成されていること。「Confluent CLI のインストール」を参照してください。
スキーマレジストリ ベースのフォーマット(Avro、JSON_SR(JSON スキーマ)、Protobuf など)を使用するには、Schema Registry を有効にしておく必要があります。
Azure Cosmos のデータへの読み取りアクセスを許可されていること。詳細については、「Azure Cosmos DB のデータへのアクセスをセキュリティで保護する」を参照してください。
Azure Cosmos DB が Core(SQL)API を使用するように構成されていること。
Core(SQL)API の選択¶
Confluent Cloud Console を使用する場合¶
ステップ 1: Confluent Cloud クラスターを起動します。¶
インストール手順については、「Quick Start for Confluent Cloud」を参照してください。
ステップ 2: コネクターを追加します。¶
左のナビゲーションメニューの Data integration をクリックし、Connectors をクリックします。クラスター内に既にコネクターがある場合は、+ Add connector をクリックします。
ステップ 4: 接続をセットアップします。¶
注釈
- すべての 前提条件 を満たしていることを確認してください。
- アスタリスク( * )は必須項目であることを示しています。
- 1 つまたは複数のトピックを選択します。
- Name にコネクター名を入力します。
- Kafka Cluster credentials で Kafka クラスターの認証情報の指定方法を選択します。サービスアカウントのリソース ID を選択するか、API キーとシークレットを入力できます(または、Cloud Console でこれらを生成します)。
- データベース接続の詳細情報を入力します。
- Cosmos Endpoint: Azure Cosmos データベースエンドポイント URL。たとえば、
https://confluent-azure-cosmosdb.documents.azure.com:443/
のように指定します。 - Cosmos Connection Key: Azure Cosmos データベース接続の プライマリキー。
- Cosmos Database name: コネクターがデータを読み取る Azure Cosmos データベースの名前。
- Cosmos Endpoint: Azure Cosmos データベースエンドポイント URL。たとえば、
- Topic-Container Map に Cosmos コンテナーにマップされる Kafka トピックのコンマ区切りのリストを入力します。たとえば、
topic1#con1,topic2#con2
のように指定します。このフィールドでは、正規表現パターン*[\\w.-]+ *#[^,]+(, *[\\w.-]+ *#[^,]+)*
を使用できます。 - コネクタータスクの詳細情報を入力します。
- Task timeout: コネクターのタスクが、Kafka に送信する前にドキュメントを読み取る時間の最大値(ミリ秒)。デフォルトは
5000
ミリ秒です。 - Task reader buffer size: コネクターのタスクが、Kafka にドキュメントを送信する前にバッファする最大バッファサイズ(バイト)。デフォルトは
10000
バイトです。 - Task batch size: コネクターで、Kafka に送信する前にバッチ処理されるドキュメントの最大数。デフォルトは
100
です。 - Task poll interval: コネクターソースのタスクが変更をポーリングする間隔(ミリ秒)。デフォルトは
1000
ミリ秒です。
- Task timeout: コネクターのタスクが、Kafka に送信する前にドキュメントを読み取る時間の最大値(ミリ秒)。デフォルトは
- 出力メッセージ(レコード)の詳細情報を設定します。
- Output Kafka record value format で、Kafka 出力レコード値のフォーマット(Kafka トピックに送られるデータ)を AVRO、JSON_SR(JSON スキーマ)、PROTOBUF、または JSON(スキーマレス)から選択します。スキーマレジストリ ベースのフォーマット(Avro、JSON_SR(JSON スキーマ)、Protobuf など)を使用するには、Schema Registry を有効にしておく必要があります。詳細については、「スキーマレジストリ Enabled Environments」を参照してください。
- Kafka message key enabled: Kafka メッセージキーを設定するかどうかを指定します。デフォルトは
id
です。 - Kafka message key field: デフォルトキー
id
が使用されない場合に Kafka メッセージキーで使用されるドキュメントフィールド。
- タスク の数を入力します。コネクターは、1 つまたは複数のタスクの実行をサポートしています。タスクが多いほどパフォーマンスが向上する可能性があります。1 つのコンテナー(テーブル)が 1 つのタスクにより処理できることに注意してください。
- Transforms and Predicates: 詳細については、Single Message Transforms(SMT) のドキュメントを参照してください。
すべてのプロパティの値と説明については、「構成プロパティ」を参照してください。
ステップ 5: コネクターを起動します。¶
接続の詳細情報を確認し、Launch をクリックします。
ステップ 6: コネクターのステータスを確認します。¶
コネクターのステータスが Provisioning から Running に変わります。
ステップ 7: ファイルを確認します。¶
データが Kafka で生成されていることを確認します。
Confluent Cloud API for Connect の詳細と使用例については、「Confluent Cloud API for Connect」セクションを参照してください。
ちなみに
コネクターを起動すると、デッドレターキューのトピックが自動的に作成されます。詳細については、「Confluent Cloud デッドレターキュー」を参照してください。
Confluent CLI の使用¶
Confluent CLI でコネクターをセットアップして実行するには、次の手順を実行します。
注釈
すべての 前提条件 を満たしていることを確認してください。
ステップ 2: コネクターの必須の構成プロパティを表示します。¶
以下のコマンドを実行して、コネクターの必須プロパティを表示します。
confluent connect plugin describe <connector-catalog-name>
例:
confluent connect plugin describe CosmosDbSource
出力例:
The following are required configs:
connector.class : CosmosDbSource
name : ["name" is required]
kafka.api.key : ["kafka.api.key" is required when "kafka.auth.mode==KAFKA_API_KEY"]
kafka.api.secret : ["kafka.api.secret" is required when "kafka.auth.mode==KAFKA_API_KEY"]
connect.cosmos.connection.endpoint
connect.cosmos.master.key
connect.cosmos.databasename
connect.cosmos.containers.topicmap
output.data.format
tasks.max
ステップ 3: コネクターの構成ファイルを作成します。¶
コネクター構成プロパティを含む JSON ファイルを作成します。以下の例は、コネクターの必須プロパティを示しています。
{
"name": "CosmosDbSourceConnector_0",
"config": {
"connector.class": "CosmosDbSource",
"name": "CosmosDbSourceConnector_0",
"connect.cosmos.connection.endpoint": "https://confluent-azure-cosmosdb.documents.azure.com:443/",
"connect.cosmos.master.key": "****************************************",
"connect.cosmos.databasename": "ToDoList",
"connect.cosmos.containers.topicmap": "Kafka-Items#Items",
"output.data.format": "AVRO",
"connect.cosmos.messagekey.enabled": "true",
"kafka.auth.mode": "KAFKA_API_KEY",
"kafka.api.key": "****************",
"kafka.api.secret": "**********************************",
"tasks.max": "1"
}
}
以下のプロパティ定義に注意してください。
"connector.class"
: コネクターのプラグイン名を指定します。"name"
: 新しいコネクターの名前を設定します。"connect.cosmos.containers.topicmap"
: Cosmos コンテナーにマップされる Kafka トピックのコンマ区切りのリストを入力します。たとえば、topic1#con1,topic2#con2
のように指定します。このフィールドでは、正規表現パターン*[\\w.-]+ *#[^,]+(, *[\\w.-]+ *#[^,]+)*
を使用できます。"output.data.format"
(Kafka トピックに送られるデータ): AVRO、JSON_SR(JSON スキーマ)、PROTOBUF、または JSON(スキーマレス)をサポートしています。スキーマレジストリ ベースのフォーマット(Avro、JSON_SR(JSON スキーマ)、Protobuf など)を使用するには、Schema Registry を有効にしておく必要があります。詳細については、「スキーマレジストリ Enabled Environments」を参照してください。"connect.cosmos.messagekey.enabled"
: Kafka メッセージキーを設定するかどうかを指定します。デフォルトはid
です。メッセージキーに別のフィールドを設定するには、構成プロパティconnect.cosmos.messagekey.field
を追加します。
"kafka.auth.mode"
: 使用するコネクターの認証モードを指定します。オプションはSERVICE_ACCOUNT
またはKAFKA_API_KEY
(デフォルト)です。API キーとシークレットを使用するには、構成プロパティkafka.api.key
とkafka.api.secret
を構成例(前述)のように指定します。サービスアカウント を使用するには、プロパティkafka.service.account.id=<service-account-resource-ID>
に リソース ID を指定します。使用できるサービスアカウントのリソース ID のリストを表示するには、次のコマンドを使用します。confluent iam service-account list
例:
confluent iam service-account list Id | Resource ID | Name | Description +---------+-------------+-------------------+------------------- 123456 | sa-l1r23m | sa-1 | Service account 1 789101 | sa-l4d56p | sa-2 | Service account 2
"tasks.max"
: このコネクターで使用できる タスク の最大数を入力します。タスクが多いほどパフォーマンスが向上する可能性があります。
Single Message Transforms: CLI を使用した SMT の追加の詳細については、Single Message Transforms(SMT) のドキュメントを参照してください。
すべてのプロパティの値と説明については、「構成プロパティ」を参照してください。
ステップ 3: プロパティファイルを読み込み、コネクターを作成します。¶
以下のコマンドを入力して、構成を読み込み、コネクターを起動します。
confluent connect create --config <file-name>.json
例:
confluent connect create --config azure-cosmos-source-config.json
出力例:
Created connector CosmosDbSourceConnector_0 lcc-do6vzd
ステップ 4: コネクターのステータスを確認します。¶
以下のコマンドを入力して、コネクターのステータスを確認します。
confluent connect list
出力例:
ID | Name | Status | Type | Trace
+------------+----------------------------+---------+--------+-------+
lcc-do6vzd |CosmosDbSourceConnector_0 | RUNNING | Source | |
ステップ 5: ファイルを確認します。¶
データが Kafka で生成されていることを確認します。
Confluent Cloud API for Connect の詳細と使用例については、「Confluent Cloud API for Connect」セクションを参照してください。
ちなみに
コネクターを起動すると、デッドレターキューのトピックが自動的に作成されます。詳細については、「Confluent Cloud デッドレターキュー」を参照してください。
構成プロパティ¶
このコネクターでは、以下のコネクター構成プロパティを使用します。
データへの接続方法(How should we connect to your data?)¶
name
コネクターの名前を設定します。
- 型: string
- 指定可能な値: 最大 64 文字の文字列
- 重要度: 高
Kafka クラスターの認証情報(Kafka Cluster credentials)¶
kafka.auth.mode
Kafka の認証モード。KAFKA_API_KEY または SERVICE_ACCOUNT を指定できます。デフォルトは KAFKA_API_KEY モードです。
- 型: string
- デフォルト: KAFKA_API_KEY
- 指定可能な値: SERVICE_ACCOUNT、KAFKA_API_KEY
- 重要度: 高
kafka.api.key
- 型: password
- 重要度: 高
kafka.service.account.id
Kafka クラスターとの通信用の API キーを生成するために使用されるサービスアカウント。
- 型: string
- 重要度: 高
kafka.api.secret
- 型: password
- 重要度: 高
Azure Cosmos DB への接続方法(How should we connect to your Azure Cosmos DB?)¶
connect.cosmos.connection.endpoint
Cosmos エンドポイント URL。
- 型: string
- 重要度: 高
connect.cosmos.master.key
Cosmos 接続マスター(プライマリ)キー。
- 型: password
- 重要度: 高
connect.cosmos.databasename
読み取り元データベースの名前。
- 型: string
- 重要度: 高
データベースの詳細(Database details)¶
connect.cosmos.containers.topicmap
Cosmos コンテナーにマップされる Kafka のトピックのコンマ区切りのリスト。たとえば、「topic1#con1,topic2#con2」のように指定します。
- 型: string
- 指定可能な値: 正規表現
*[\w.-]+ *#[^,]+(, *[\w.-]+ *#[^,]+)*
に一致することが必要 - 重要度: 高
接続の詳細(Connection details)¶
connect.cosmos.task.timeout
ソースのタスクで、Kafka に送信する前にドキュメントの読み取りに使用される時間の最大値(ミリ秒)。
- 型: int
- デフォルト: 5000
- 重要度: 低
connect.cosmos.task.buffer.size
ソースのタスクで、Kafka に送信する前にバッファ処理されるドキュメントのコンテナーの最大サイズ(バイト)。
- 型: int
- デフォルト: 10000
- 指定可能な値: [1,...,1000000]
- 重要度: 低
connect.cosmos.task.batch.size
ソースのタスクで、Kafka に送信する前にバッファ処理されるドキュメントの最大数。
- 型: int
- デフォルト: 100
- 指定可能な値: [1,...]
- 重要度: 低
connect.cosmos.task.poll.interval
ソースのタスクが変更をポーリングする間隔(ミリ秒)。
- 型: int
- デフォルト: 1000
- 重要度: 低
出力メッセージ(Output messages)¶
output.data.format
Kafka 出力レコード値のフォーマットを設定します。指定可能なエントリは、AVRO、JSON_SR、PROTOBUF、または JSON です。スキーマベースのメッセージフォーマット(AVRO、JSON_SR、PROTOBUF など)を使用する場合は、Confluent Cloud Schema Registry を構成しておく必要がある点に注意してください。
- 型: string
- 重要度: 高
connect.cosmos.messagekey.enabled
Kafka メッセージキーを設定するかどうかを指定します。
- 型: boolean
- デフォルト: true
- 重要度: 高
connect.cosmos.messagekey.field
メッセージキーとして使用するドキュメントフィールド。
- 型: string
- デフォルト: id
- 重要度: 高
このコネクターのタスク数(Number of tasks for this connector)¶
tasks.max
- 型: int
- 指定可能な値: [1,...]
- 重要度: 高
次のステップ¶
参考
フルマネージド型の Confluent Cloud コネクターが Confluent Cloud ksqlDB でどのように動作するかを示す例については、「Cloud ETL のデモ」を参照してください。この例では、Confluent CLI を使用して Confluent Cloud のリソースを管理する方法についても説明しています。