Azure Cosmos DB Sink Connector for Confluent Cloud¶
The Microsoft Azure Cosmos Sink connector for Confluent Cloud writes data to a Microsoft Azure Cosmos database. The connector polls data from Apache Kafka® and writes to database containers.
機能¶
Azure Cosmos Sink Connector は、次の機能をサポートしています。
- トピックマッピング: Kafka トピックを Azure Cosmos DB コンテナーにマッピングします。
- 複数のキーの処理方法:
FullKeyStrategy
: 生成される ID は Kafka レコードキーです。デフォルトではこの方法が使用されます。KafkaMetadataStrategy
: 生成される ID は、Kafka トピック、パーティション、オフセットが連結されたものです。たとえば、${topic}-${partition}-${offset}
のようになります。ProvidedInKeyStrategy
: 生成される ID は、キーオブジェクトで見つかったid
フィールドです。ProvidedInValueStrategy
: 生成される ID は、値オブジェクトで見つかったid
フィールドです。どのレコードにも(小文字の)id
フィールドが必要です。これは、Azure Cosmos DB の要件です。小文字の id の前提条件 を参照してください。
以下は、それぞれの処理方法と Azure Cosmos に生成される id
の例です。
クイックスタート¶
このクイックスタートを使用して、Confluent Cloud Azure Cosmos DB Sink Connector の利用を開始することができます。このクイックスタートでは、コネクターを選択し、Azure Cosmos DB コンテナーに Kafka イベントをストリーミングするようにコネクターを構成するための基本的な方法について説明します。
- 前提条件
Azure 上の Confluent Cloud クラスターへのアクセスを許可されていること。
Confluent CLI がインストールされ、クラスター用に構成されていること。「Confluent CLI のインストール」を参照してください。
スキーマレジストリ ベースのフォーマット(Avro、JSON_SR(JSON スキーマ)、Protobuf など)を使用するには、Schema Registry を有効にしておく必要があります。詳細については、「環境の制限」を参照してください。
シンクコネクターを作成する前に、Confluent Cloud クラスター上にソース Kafka トピックが 1 つ以上存在している必要があります。
Azure Cosmos DB と Kafka クラスターは、同じリージョンに存在している必要があります。
Azure Cosmos DB では、どのレコードにも
id
フィールドが必要です。「ID の処理方法」で、それぞれの方法がどのように機能するか、例を参照してください。ID を生成する処理方法として、以下が用意されています。FullKeyStrategy
: 生成される ID は Kafka レコードキーです。デフォルトではこの方法が使用されます。KafkaMetadataStrategy
: 生成される ID は、Kafka トピック、パーティション、オフセットが連結されたものです。たとえば、${topic}-${partition}-${offset}
のようになります。ProvidedInKeyStrategy
: 生成される ID は、キーオブジェクトで見つかったid
フィールドです。ProvidedInValueStrategy
: 生成される ID は、値オブジェクトで見つかったid
フィールドです。この ID の処理方法を選択した場合は、id
という名前の新規フィールドを作成する必要があります。以下の ksqlDB ステートメント も使用できます。以下の例は、orders
という名前のトピックを使用します。CREATE STREAM ORDERS_STREAM WITH ( KAFKA_TOPIC = 'orders', VALUE_FORMAT = 'AVRO' ); CREATE STREAM ORDER_AUGMENTED AS SELECT ORDERID AS `id`, ORDERTIME, ITEMID, ORDERUNITS, ADDRESS FROM ORDERS_STREAM;
注釈
- コネクターでは、
id
に基づくUpsert
がサポートされています。 - コネクターでは、tombstone レコードの
Delete
はサポートされていません。
Confluent Cloud Console の使用¶
ステップ 1: Confluent Cloud クラスターを起動します。¶
インストール手順については、「Confluent Cloud を使用した Apache Kafka のクイックスタート」を参照してください。
ステップ 2: コネクターを追加します。¶
左のナビゲーションメニューの Data integration をクリックし、Connectors をクリックします。クラスター内に既にコネクターがある場合は、+ Add connector をクリックします。
Step 4: Enter the connector details.¶
注釈
- Ensure you have all your prerequisites completed.
- アスタリスク( * )は必須項目であることを示しています。
At the Add Azure Cosmos DB Sink Connector screen, complete the following:
If you've already populated your Kafka topics, select the topic(s) you want to connect from the Topics list.
To create a new topic, click +Add new topic.
- Select the way you want to provide Kafka Cluster credentials. You can
choose one of the following options:
- Global Access: Allows your connector to access everything you have access to. With global access, connector access will be linked to your account. This option is not recommended for production.
- Granular access: Limits the access for your connector. You will be able to manage connector access through a service account. This option is recommended for production.
- Use an existing API key: Allows you to enter an API key and secret part you have stored. You can enter an API key and secret (or generate these in the Cloud Console).
- Click Continue.
- Cosmos DB 接続の詳細情報を入力します。
- Cosmos Endpoint: Cosmos endpoint URL. For example,
https://connect-cosmosdb.documents.azure.com:443/
. - Cosmos Connection Key: The Cosmos connection master (primary) key.
- Cosmos Database Name: Cosmos データベースの名前。
- Cosmos Endpoint: Cosmos endpoint URL. For example,
- Click Continue.
注釈
Configuration properties that are not shown in the Cloud Console use the default values. See 構成プロパティ for all property values and definitions.
Input Kafka record value で、Kafka 入力レコード値のフォーマット(Kafka トピックから送られるデータ)を AVRO、PROTOBUF、JSON_SR(JSON スキーマ)、または JSON(スキーマレス)から選択します。スキーマベースのメッセージフォーマット(Avro、JSON_SR(JSON スキーマ)、Protobuf など)を使用するには、有効なスキーマが Schema Registry に存在する必要があります。詳細については、「環境の制限」を参照してください。
In the Topic-Container Map field, input a comma-delimited list of Kafka topics mapped to Cosmos DB containers–the mapping between Kafka topics and Azure Cosmos DB containers. For example,
topic#container1,topic2#container2
.Show advanced configurations
Id Strategy: The IdStrategy class name to use for generating a unique document ID:
FullKeyStrategy
: 生成される ID は Kafka レコードキーです。KafkaMetadataStrategy
: 生成される ID は、Kafka トピック、パーティション、オフセットが連結されたものです。たとえば、${topic}-${partition}-${offset}
のようになります。ProvidedInKeyStrategy
: 生成される ID は、キーオブジェクトで見つかったid
フィールドです。ProvidedInValueStrategy
: 生成される ID は、値オブジェクトで見つかったid
フィールドです。どのレコードにも(小文字の)id
フィールドが必要です。これは、Azure Cosmos DB の要件です。小文字の id の前提条件 を参照してください。
Transforms and Predicates: 詳細については、Single Message Transforms(SMT) のドキュメントを参照してください。
See 構成プロパティ for all property values and definitions.
Click Continue.
Based on the number of topic partitions you select, you will be provided with a recommended number of tasks.
- To change the number of recommended tasks, enter the number of tasks for the connector to use in the Tasks field.
- Click Continue.
Verify the connection details.
Click Launch.
コネクターのステータスが Provisioning から Running に変わります。
ステップ 5: レコードを確認します。¶
レコードが Azure Cosmos データベースに生成されていることを確認します。
Connect 用の Confluent Cloud API の使用に関する詳細とサンプルについては、「Confluent Cloud API for Connect」セクションを参照してください。
ちなみに
コネクターを起動すると、デッドレターキューのトピックが自動的に作成されます。詳細については、「Confluent Cloud デッドレターキュー」を参照してください。
Confluent CLI の使用¶
以下の手順に従うと、Confluent CLI を使用してコネクターをセットアップし、実行できます。
注釈
- すべての 前提条件 を満たしていることを確認してください。
- コマンド例では Confluent CLI バージョン 2 を使用しています。詳細については、「Confluent CLI v2 への移行 <https://docs.confluent.io/confluent-cli/current/migrate.html#cli-migrate>`__」を参照してください。
ステップ 2: コネクターの必須の構成プロパティを表示します。¶
以下のコマンドを実行して、コネクターの必須プロパティを表示します。
confluent connect plugin describe <connector-catalog-name>
例:
confluent connect plugin describe CosmosDbSink
出力例:
Following are the required configs:
connector.class: CosmosDbSink
name
input.data.format
kafka.auth.mode
kafka.api.key
kafka.api.secret
connect.cosmos.connection.endpoint
connect.cosmos.master.key
connect.cosmos.databasename
connect.cosmos.containers.topicmap
tasks.max
topics
ステップ 3: コネクターの構成ファイルを作成します。¶
コネクター構成プロパティを含む JSON ファイルを作成します。以下の例は、コネクターの必須プロパティを示しています。
{
"name": "CosmosDbSinkConnector_0",
"config": {
"connector.class": "CosmosDbSink",
"name": "CosmosDbSinkConnector_0",
"input.data.format": "AVRO",
"kafka.auth.mode": "KAFKA_API_KEY",
"kafka.api.key": "****************",
"kafka.api.secret": "**********************************************",
"topics": "pageviews",
"connect.cosmos.connection.endpoint": "https://myaccount.documents.azure.com:443/",
"connect.cosmos.master.key": "****************************************",
"connect.cosmos.databasename": "myDBname",
"connect.cosmos.containers.topicmap": "pageviews#Container2",
"cosmos.id.strategy": "FullKeyStrategy",
"tasks.max": "1"
}
}
以下のプロパティ定義に注意してください。
"connector.class"
: コネクターのプラグイン名を指定します。"input.data.format"
: Kafka 入力レコード値のフォーマット(Kafka トピックから送られるデータ)を設定します。指定可能なエントリは、AVRO、JSON_SR、PROTOBUF、または JSON です。スキーマベースのメッセージフォーマット(たとえば、Avro、JSON_SR(JSON スキーマ)、および Protobuf)を使用するには、Confluent Cloud Schema Registry を構成しておく必要があります。"name"
: 新しいコネクターの名前を設定します。
"kafka.auth.mode"
: 使用するコネクターの認証モードを指定します。オプションはSERVICE_ACCOUNT
またはKAFKA_API_KEY
(デフォルト)です。API キーとシークレットを使用するには、構成プロパティkafka.api.key
とkafka.api.secret
を構成例(前述)のように指定します。サービスアカウント を使用するには、プロパティkafka.service.account.id=<service-account-resource-ID>
に リソース ID を指定します。使用できるサービスアカウントのリソース ID のリストを表示するには、次のコマンドを使用します。confluent iam service-account list
例:
confluent iam service-account list Id | Resource ID | Name | Description +---------+-------------+-------------------+------------------- 123456 | sa-l1r23m | sa-1 | Service account 1 789101 | sa-l4d56p | sa-2 | Service account 2
"connect.cosmos.connection.endpoint"
:https://ccloud-cosmos-db-1.documents.azure.com:443/
というフォーマットを使用する URI。"connect.cosmos.master.key"
: Azure Cosmos マスターキー。"connect.cosmos.databasename"
: Cosmos DB の名前。"connect.cosmos.containers.topicmap"
: Cosmos DB コンテナーにマッピングされる Kafka トピックのコンマ区切りのリスト。Kafka トピックと Azure Cosmos DB コンテナーのマッピングです。たとえば、topic#container1,topic2#container2
のように指定します。(省略可能)
"cosmos.id.strategy"
: デフォルトはFullKeyStrategy
です。以下のいずれかの処理方法を入力します。FullKeyStrategy
: 生成される ID は Kafka レコードキーです。KafkaMetadataStrategy
: 生成される ID は、Kafka トピック、パーティション、オフセットが連結されたものです。たとえば、${topic}-${partition}-${offset}
のようになります。ProvidedInKeyStrategy
: 生成される ID は、キーオブジェクトで見つかったid
フィールドです。どのレコードにも(小文字の)id
フィールドが必要です。これは、Azure Cosmos DB の要件です。小文字の id の前提条件 を参照してください。ProvidedInValueStrategy
: 生成される ID は、値オブジェクトで見つかったid
フィールドです。どのレコードにも(小文字の)id
フィールドが必要です。これは、Azure Cosmos DB の要件です。小文字の id の前提条件 を参照してください。
「ID の処理方法」で、それぞれの方法がどのように機能するか、例を参照してください。
"tasks"
: このコネクターで使用する タスク の数。Confluent Cloud と Confluent Cloud Enterprise では、組織はタスク 1 つとコネクター 1 つに制限されます。このコネクターは期間限定で無料で使用できます。
Single Message Transforms: CLI を使用する SMT の追加の詳細については、Single Message Transforms(SMT) のドキュメントを参照してください。
See 構成プロパティ for all property values and descriptions.
ステップ 4: プロパティファイルを読み込み、コネクターを作成します。¶
以下のコマンドを入力して、構成を読み込み、コネクターを起動します。
confluent connect create --config <file-name>.json
例:
confluent connect create --config azure-cosmos-sink-config.json
出力例:
Created connector CosmosDbSinkConnector_0 lcc-do6vzd
ステップ 4: コネクターのステータスを確認します。¶
以下のコマンドを入力して、コネクターのステータスを確認します。
confluent connect list
出力例:
ID | Name | Status | Type | Trace
+------------+-------------------------------+---------+------+-------+
lcc-do6vzd | CosmosDbSinkConnector_0 | RUNNING | sink | |
ステップ 5: レコードを確認します。¶
レコードがエンドポイントに取り込まれていることを確認します。
Connect 用の Confluent Cloud API の使用に関する詳細とサンプルについては、「Confluent Cloud API for Connect」セクションを参照してください。
ちなみに
コネクターを起動すると、デッドレターキューのトピックが自動的に作成されます。詳細については、「Confluent Cloud デッドレターキュー」を参照してください。
構成プロパティ¶
Use the following configuration properties with this connector.
How should we connect to your data?¶
name
Sets a name for your connector.
- 型: string
- Valid Values: A string at most 64 characters long
- 重要度: 高
Input messages¶
input.data.format
Sets the input Kafka record value format. Valid entries are AVRO, JSON_SR, PROTOBUF, or JSON. Note that you need to have Confluent Cloud Schema Registry configured if using a schema-based message format like AVRO, JSON_SR, and PROTOBUF.
- 型: string
- 重要度: 高
Kafka Cluster credentials¶
kafka.auth.mode
Kafka Authentication mode. It can be one of KAFKA_API_KEY or SERVICE_ACCOUNT. It defaults to KAFKA_API_KEY mode.
- 型: string
- Default: KAFKA_API_KEY
- Valid Values: SERVICE_ACCOUNT, KAFKA_API_KEY
- 重要度: 高
kafka.api.key
- Type: password
- 重要度: 高
kafka.service.account.id
The Service Account that will be used to generate the API keys to communicate with Kafka Cluster.
- 型: string
- 重要度: 高
kafka.api.secret
- Type: password
- 重要度: 高
Which topics do you want to get data from?¶
topics
Identifies the topic name or a comma-separated list of topic names.
- Type: list
- 重要度: 高
How should we connect to your Cosmos DB database?¶
connect.cosmos.connection.endpoint
Cosmos endpoint URL. For example: https://connect-cosmosdb.documents.azure.com:443/.
- 型: string
- 重要度: 高
connect.cosmos.master.key
Cosmos connection master (primary) key.
- Type: password
- 重要度: 高
connect.cosmos.databasename
Cosmos target database to write records into.
- 型: string
- 重要度: 高
connect.cosmos.containers.topicmap
A comma delimited list of Kafka topics mapped to Cosmos containers. For example: topic1#con1,topic2#con2.
- 型: string
- 重要度: 高
Database details¶
cosmos.id.strategy
The IdStrategy class name to use for generating a unique document id (id).
FullKeyStrategy
uses the full record key as ID.KafkaMetadataStrategy
uses a concatenation of the kafka topic, partition, and offset as ID, with dashes as separator. i.e.${topic}-${partition}-${offset}
.ProvidedInKeyStrategy
andProvidedInValueStrategy
use theid
field found in the key and value objects respectively as ID.- 型: string
- Default: FullKeyStrategy
- Valid Values: KafkaMetadataStrategy, FullKeyStrategy, ProvidedInKeyStrategy, ProvidedInValueStrategy
- Importance: low
Number of tasks for this connector¶
tasks.max
- Type: int
- Valid Values: [1,...]
- 重要度: 高
次のステップ¶
参考
フルマネージド型の Confluent Cloud コネクターが Confluent Cloud ksqlDB でどのように動作するかを示す例については、「Cloud ETL のデモ」を参照してください。この例では、Confluent CLI を使用して Confluent Cloud のリソースを管理する方法についても説明しています。