重要
このページの日本語コンテンツは古くなっている可能性があります。最新の英語版コンテンツをご覧になるには、こちらをクリックしてください。
Azure Cosmos DB Sink Connector for Confluent Cloud¶
The Microsoft Azure Cosmos DB Sink connector for Confluent Cloud writes data to a Microsoft Azure Cosmos database. The connector polls data from Apache Kafka® and writes to database containers.
機能¶
The Azure Cosmos DB Sink connector supports the following features:
- トピックマッピング: Kafka トピックを Azure Cosmos DB コンテナーにマッピングします。
- 複数のキーの処理方法:
FullKeyStrategy
: 生成される ID は Kafka レコードキーです。デフォルトではこの方法が使用されます。KafkaMetadataStrategy
: 生成される ID は、Kafka トピック、パーティション、オフセットが連結されたものです。たとえば、${topic}-${partition}-${offset}
のようになります。ProvidedInKeyStrategy
: 生成される ID は、キーオブジェクトで見つかったid
フィールドです。ProvidedInValueStrategy
: 生成される ID は、値オブジェクトで見つかったid
フィールドです。どのレコードにも(小文字の)id
フィールドが必要です。これは、Azure Cosmos DB の要件です。小文字の id の前提条件 を参照してください。
以下は、それぞれの処理方法と Azure Cosmos に生成される id
の例です。
Connect 用の Confluent Cloud API の使用に関する詳細とサンプルについては、「Confluent Cloud API for Connect」セクションを参照してください。
制限¶
以下の情報を確認してください。
- コネクターの制限事項については、Azure Cosmos DB Sink Connector の制限事項を参照してください。
- 1 つ以上の Single Message Transforms(SMT)を使用する場合は、「SMT の制限」を参照してください。
- Confluent Cloud Schema Registry を使用する場合は、「スキーマレジストリ Enabled Environments」を参照してください。
クイックスタート¶
このクイックスタートを使用して、Confluent Cloud Azure Cosmos DB Sink Connector の利用を開始することができます。このクイックスタートでは、コネクターを選択し、Azure Cosmos DB コンテナーに Kafka イベントをストリーミングするようにコネクターを構成するための基本的な方法について説明します。
- 前提条件
Azure 上の Confluent Cloud クラスターへのアクセスを許可されていること。
Confluent CLI がインストールされ、クラスター用に構成されていること。「Confluent CLI のインストール」を参照してください。
スキーマレジストリ ベースのフォーマット(Avro、JSON_SR(JSON スキーマ)、Protobuf など)を使用するには、Schema Registry を有効にしておく必要があります。詳細については、「スキーマレジストリ Enabled Environments」を参照してください。
シンクコネクターを作成する前に、Confluent Cloud クラスター上にソース Kafka トピックが 1 つ以上存在している必要があります。
Azure Cosmos DB と Kafka クラスターは、同じリージョンに存在している必要があります。
Azure Cosmos DB では、どのレコードにも
id
フィールドが必要です。「ID の処理方法」で、それぞれの方法がどのように機能するか、例を参照してください。ID を生成する処理方法として、以下が用意されています。FullKeyStrategy
: 生成される ID は Kafka レコードキーです。デフォルトではこの方法が使用されます。KafkaMetadataStrategy
: 生成される ID は、Kafka トピック、パーティション、オフセットが連結されたものです。たとえば、${topic}-${partition}-${offset}
のようになります。ProvidedInKeyStrategy
: 生成される ID は、キーオブジェクトで見つかったid
フィールドです。ProvidedInValueStrategy
: 生成される ID は、値オブジェクトで見つかったid
フィールドです。この ID の処理方法を選択した場合は、id
という名前の新規フィールドを作成する必要があります。以下の ksqlDB ステートメント も使用できます。以下の例は、orders
という名前のトピックを使用します。CREATE STREAM ORDERS_STREAM WITH ( KAFKA_TOPIC = 'orders', VALUE_FORMAT = 'AVRO' ); CREATE STREAM ORDER_AUGMENTED AS SELECT ORDERID AS `id`, ORDERTIME, ITEMID, ORDERUNITS, ADDRESS FROM ORDERS_STREAM;
注釈
- コネクターでは、
id
に基づくUpsert
がサポートされています。 - コネクターでは、tombstone レコードの
Delete
はサポートされていません。
Confluent Cloud Console の使用¶
ステップ 1: Confluent Cloud クラスターを起動します。¶
インストール手順については、「Quick Start for Confluent Cloud」を参照してください。
ステップ 2: コネクターを追加します。¶
左のナビゲーションメニューの Data integration をクリックし、Connectors をクリックします。クラスター内に既にコネクターがある場合は、+ Add connector をクリックします。
ステップ 4: コネクターの詳細情報を入力します。¶
注釈
- すべての 前提条件 を満たしていることを確認してください。
- アスタリスク( * )は必須項目であることを示しています。
Add Azure Cosmos DB Sink Connector 画面で、以下を実行します。
既に Kafka トピックを用意している場合は、Topics リストから接続するトピックを選択します。
新しいトピックを作成するには、+Add new topic をクリックします。
- Kafka Cluster credentials で Kafka クラスターの認証情報の指定方法を選択します。以下のいずれかのオプションを選択できます。
- Global Access: コネクターは、ユーザーがアクセス権限を持つすべての対象にアクセスできます。グローバルアクセスの場合、コネクターのアクセス権限は、ユーザーのアカウントにリンクされます。このオプションは本稼働環境では推奨されません。
- Granular access: コネクターのアクセスが制限されます。コネクターのアクセス権限は サービスアカウント から制御できます。本稼働環境にはこのオプションをお勧めします。
- Use an existing API key: 保存済みの API キーおよびシークレット部分を入力できます。API キーとシークレットを入力するか Cloud Console でこれらを生成することもできます。
- Continue をクリックします。
- Cosmos DB 接続の詳細情報を入力します。
- Cosmos Endpoint: Cosmos エンドポイント URL。たとえば、
https://connect-cosmosdb.documents.azure.com:443/
のように指定します。 - Cosmos Connection Key: Cosmos 接続マスター(プライマリ)キー。
- Cosmos Database Name: Cosmos データベースの名前。
- Cosmos Endpoint: Cosmos エンドポイント URL。たとえば、
- Continue をクリックします。
注釈
Cloud Console に表示されない構成プロパティでは、デフォルト値が使用されます。すべてのプロパティの値と定義については、「構成プロパティ」を参照してください。
Input Kafka record value で、Kafka 入力レコード値のフォーマット(Kafka トピックから送られるデータ)を AVRO、PROTOBUF、JSON_SR(JSON スキーマ)、または JSON(スキーマレス)から選択します。スキーマベースのメッセージフォーマット(Avro、JSON_SR(JSON スキーマ)、Protobuf など)を使用するには、有効なスキーマが Schema Registry に存在する必要があります。詳細については、「スキーマレジストリ Enabled Environments」を参照してください。
Topic-Container Map フィールドに、Cosmos DB コンテナーにマップされる Kafka トピックのコンマ区切りのリストを入力します(Kafka トピックと Azure Cosmos DB コンテナーのマッピング)。たとえば、
topic#container1,topic2#container2
のように指定します。Show advanced configurations
Id Strategy: 一意のドキュメント ID を生成するために使用する IdStrategy クラス名。
FullKeyStrategy
: 生成される ID は Kafka レコードキーです。KafkaMetadataStrategy
: 生成される ID は、Kafka トピック、パーティション、オフセットが連結されたものです。たとえば、${topic}-${partition}-${offset}
のようになります。ProvidedInKeyStrategy
: 生成される ID は、キーオブジェクトで見つかったid
フィールドです。ProvidedInValueStrategy
: 生成される ID は、値オブジェクトで見つかったid
フィールドです。どのレコードにも(小文字の)id
フィールドが必要です。これは、Azure Cosmos DB の要件です。小文字の id の前提条件 を参照してください。
Transforms and Predicates: 詳細については、Single Message Transforms(SMT) のドキュメントを参照してください。
すべてのプロパティの値と定義については、「構成プロパティ」を参照してください。
Continue をクリックします。
選択するトピックのパーティション数に基づいて、推奨タスク数が表示されます。
- 推奨されたタスク数を変更するには、Tasks フィールドに、コネクターで使用する タスク の数を入力します。
- Continue をクリックします。
接続の詳細情報を確認します。
Launch をクリックします。
コネクターのステータスが Provisioning から Running に変わります。
ステップ 5: レコードを確認します。¶
レコードが Azure Cosmos データベースに生成されていることを確認します。
Connect 用の Confluent Cloud API の使用に関する詳細とサンプルについては、「Confluent Cloud API for Connect」セクションを参照してください。
ちなみに
コネクターを起動すると、デッドレターキューのトピックが自動的に作成されます。詳細については、「Confluent Cloud デッドレターキュー」を参照してください。
Confluent CLI の使用¶
以下の手順に従うと、Confluent CLI を使用してコネクターをセットアップし、実行できます。
注釈
- すべての 前提条件 を満たしていることを確認してください。
- コマンド例では Confluent CLI バージョン 2 を使用しています。詳細については、「Confluent CLI v2 への移行 <https://docs.confluent.io/confluent-cli/current/migrate.html#cli-migrate>`__」を参照してください。
ステップ 2: コネクターの必須の構成プロパティを表示します。¶
以下のコマンドを実行して、コネクターの必須プロパティを表示します。
confluent connect plugin describe <connector-catalog-name>
例:
confluent connect plugin describe CosmosDbSink
出力例:
Following are the required configs:
connector.class: CosmosDbSink
name
input.data.format
kafka.auth.mode
kafka.api.key
kafka.api.secret
connect.cosmos.connection.endpoint
connect.cosmos.master.key
connect.cosmos.databasename
connect.cosmos.containers.topicmap
tasks.max
topics
ステップ 3: コネクターの構成ファイルを作成します。¶
コネクター構成プロパティを含む JSON ファイルを作成します。以下の例は、コネクターの必須プロパティを示しています。
{
"name": "CosmosDbSinkConnector_0",
"config": {
"connector.class": "CosmosDbSink",
"name": "CosmosDbSinkConnector_0",
"input.data.format": "AVRO",
"kafka.auth.mode": "KAFKA_API_KEY",
"kafka.api.key": "****************",
"kafka.api.secret": "**********************************************",
"topics": "pageviews",
"connect.cosmos.connection.endpoint": "https://myaccount.documents.azure.com:443/",
"connect.cosmos.master.key": "****************************************",
"connect.cosmos.databasename": "myDBname",
"connect.cosmos.containers.topicmap": "pageviews#Container2",
"cosmos.id.strategy": "FullKeyStrategy",
"tasks.max": "1"
}
}
以下のプロパティ定義に注意してください。
"connector.class"
: コネクターのプラグイン名を指定します。"input.data.format"
: Kafka 入力レコード値のフォーマット(Kafka トピックから送られるデータ)を設定します。指定可能なエントリは、AVRO、JSON_SR、PROTOBUF、または JSON です。スキーマベースのメッセージフォーマット(たとえば、Avro、JSON_SR(JSON スキーマ)、および Protobuf)を使用するには、Confluent Cloud Schema Registry を構成しておく必要があります。"name"
: 新しいコネクターの名前を設定します。
"kafka.auth.mode"
: 使用するコネクターの認証モードを指定します。オプションはSERVICE_ACCOUNT
またはKAFKA_API_KEY
(デフォルト)です。API キーとシークレットを使用するには、構成プロパティkafka.api.key
とkafka.api.secret
を構成例(前述)のように指定します。サービスアカウント を使用するには、プロパティkafka.service.account.id=<service-account-resource-ID>
に リソース ID を指定します。使用できるサービスアカウントのリソース ID のリストを表示するには、次のコマンドを使用します。confluent iam service-account list
例:
confluent iam service-account list Id | Resource ID | Name | Description +---------+-------------+-------------------+------------------- 123456 | sa-l1r23m | sa-1 | Service account 1 789101 | sa-l4d56p | sa-2 | Service account 2
"connect.cosmos.connection.endpoint"
:https://ccloud-cosmos-db-1.documents.azure.com:443/
というフォーマットを使用する URI。"connect.cosmos.master.key"
: Azure Cosmos マスターキー。"connect.cosmos.databasename"
: Cosmos DB の名前。"connect.cosmos.containers.topicmap"
: Cosmos DB コンテナーにマッピングされる Kafka トピックのコンマ区切りのリスト。Kafka トピックと Azure Cosmos DB コンテナーのマッピングです。たとえば、topic#container1,topic2#container2
のように指定します。(省略可能)
"cosmos.id.strategy"
: デフォルトはFullKeyStrategy
です。以下のいずれかの処理方法を入力します。FullKeyStrategy
: 生成される ID は Kafka レコードキーです。KafkaMetadataStrategy
: 生成される ID は、Kafka トピック、パーティション、オフセットが連結されたものです。たとえば、${topic}-${partition}-${offset}
のようになります。ProvidedInKeyStrategy
: 生成される ID は、キーオブジェクトで見つかったid
フィールドです。どのレコードにも(小文字の)id
フィールドが必要です。これは、Azure Cosmos DB の要件です。小文字の id の前提条件 を参照してください。ProvidedInValueStrategy
: 生成される ID は、値オブジェクトで見つかったid
フィールドです。どのレコードにも(小文字の)id
フィールドが必要です。これは、Azure Cosmos DB の要件です。小文字の id の前提条件 を参照してください。
「ID の処理方法」で、それぞれの方法がどのように機能するか、例を参照してください。
"tasks"
: このコネクターで使用する タスク の数。Confluent Cloud と Confluent Cloud Enterprise では、組織はタスク 1 つとコネクター 1 つに制限されます。このコネクターは期間限定で無料で使用できます。
Single Message Transforms: CLI を使用する SMT の追加の詳細については、Single Message Transforms(SMT) のドキュメントを参照してください。
すべてのプロパティの値と説明については、「構成プロパティ」を参照してください。
ステップ 4: プロパティファイルを読み込み、コネクターを作成します。¶
以下のコマンドを入力して、構成を読み込み、コネクターを起動します。
confluent connect create --config <file-name>.json
例:
confluent connect create --config azure-cosmos-sink-config.json
出力例:
Created connector CosmosDbSinkConnector_0 lcc-do6vzd
ステップ 4: コネクターのステータスを確認します。¶
以下のコマンドを入力して、コネクターのステータスを確認します。
confluent connect list
出力例:
ID | Name | Status | Type | Trace
+------------+-------------------------------+---------+------+-------+
lcc-do6vzd | CosmosDbSinkConnector_0 | RUNNING | sink | |
ステップ 5: レコードを確認します。¶
レコードがエンドポイントに取り込まれていることを確認します。
Connect 用の Confluent Cloud API の使用に関する詳細とサンプルについては、「Confluent Cloud API for Connect」セクションを参照してください。
ちなみに
コネクターを起動すると、デッドレターキューのトピックが自動的に作成されます。詳細については、「Confluent Cloud デッドレターキュー」を参照してください。
構成プロパティ¶
このコネクターでは、以下のコネクター構成プロパティを使用します。
データへの接続方法(How should we connect to your data?)¶
name
コネクターの名前を設定します。
- 型: string
- 指定可能な値: 最大 64 文字の文字列
- 重要度: 高
入力メッセージ(Input messages)¶
input.data.format
Kafka 入力レコード値のフォーマットを設定します。指定可能なエントリは、AVRO、JSON_SR、PROTOBUF、または JSON です。スキーマベースのメッセージフォーマット(AVRO、JSON_SR、PROTOBUF など)を使用する場合は、Confluent Cloud Schema Registry を構成しておく必要がある点に注意してください。
- 型: string
- 重要度: 高
Kafka クラスターの認証情報(Kafka Cluster credentials)¶
kafka.auth.mode
Kafka の認証モード。KAFKA_API_KEY または SERVICE_ACCOUNT を指定できます。デフォルトは KAFKA_API_KEY モードです。
- 型: string
- デフォルト: KAFKA_API_KEY
- 指定可能な値: KAFKA_API_KEY、SERVICE_ACCOUNT
- 重要度: 高
kafka.api.key
- 型: password
- 重要度: 高
kafka.service.account.id
Kafka クラスターとの通信用の API キーを生成するために使用されるサービスアカウント。
- 型: string
- 重要度: 高
kafka.api.secret
- 型: password
- 重要度: 高
データの取得元とするトピック(Which topics do you want to get data from?)¶
topics
特定のトピック名を指定するか、複数のトピック名をコンマ区切りにしたリストを指定します。
- 型: list
- 重要度: 高
How should we connect to your Azure Cosmos DB?¶
connect.cosmos.connection.endpoint
Cosmos エンドポイント URL。たとえば、https://connect-cosmosdb.documents.azure.com:443/ のように指定します。
- 型: string
- 重要度: 高
connect.cosmos.master.key
Cosmos 接続マスター(プライマリ)キー。
- 型: password
- 重要度: 高
connect.cosmos.databasename
レコードを書き込む Cosmos ターゲットデータベース。
- 型: string
- 重要度: 高
connect.cosmos.containers.topicmap
Cosmos コンテナーにマップされる Kafka トピックのコンマ区切りのリスト。たとえば、「topic1#con1,topic2#con2」のように指定します。
- 型: string
- 重要度: 高
データベースの詳細(Database details)¶
cosmos.id.strategy
一意のドキュメント ID(id)を生成するために使用する IdStrategy クラス名。
FullKeyStrategy
では、ID として完全なレコードキーを使用します。KafkaMetadataStrategy
では、ID として、Kafka のトピック、パーティション、オフセットをダッシュで区切って連結したものを使用します。たとえば、${topic}-${partition}-${offset}
のように指定します。ProvidedInKeyStrategy
とProvidedInValueStrategy
では、ID として、キーおよび値オブジェクトで見つかったid
フィールドを使用します。- 型: string
- デフォルト: FullKeyStrategy
- 指定可能な値: FullKeyStrategy、KafkaMetadataStrategy、ProvidedInKeyStrategy、ProvidedInValueStrategy
- 重要度: 低
このコネクターのタスク数(Number of tasks for this connector)¶
tasks.max
- 型: int
- 指定可能な値: [1,...]
- 重要度: 高
次のステップ¶
参考
フルマネージド型の Confluent Cloud コネクターが Confluent Cloud ksqlDB でどのように動作するかを示す例については、「Cloud ETL のデモ」を参照してください。この例では、Confluent CLI を使用して Confluent Cloud のリソースを管理する方法についても説明しています。