Databricks Delta Lake Sink Connector for Confluent Cloud¶
注釈
If you are installing the connector locally for Confluent Platform, see Databricks Delta Lake Sink Connector for Confluent Platform.
Databricks Delta Lake Sink Connector は、Apache Kafka® のデータを定期的にポーリングしてデータを Amazon S3 ステージングバケットにコピーします。その後、それらのレコードを Databricks Delta Lake インスタンスにコミットします。
次の考慮事項に注意してください。
- コネクターは、アマゾンウェブサービス (AWS)でのみ利用できます。
- コネクターで行えるのはデータの追加のみです。
- データは Amazon S3 バケットにステージングされます。このバケット内のファイルを削除した場合は、"厳密に 1 回" のセマンティクス(EOS)は失われます。
- Amazon S3 バケット、Delta Lake インスタンス、Kafka クラスターは、同じリージョンに存在している必要があります。
- コネクターは、
partition
という名前のフィールドを追加します。Delta Lake テーブルには、INT 型で partition という名前のフィールド(partition INT
)を含める必要があります。 - Confluent Cloud と Confluent Cloud Enterprise では、組織はタスク 1 つとコネクター 1 つに制限されます。
詳細については、Cloud コネクターの制限事項 を参照してください。
機能¶
Databricks Delta Lake Sink Connector には、以下の機能があります。
- フラッシュ間隔を設定した厳密に 1 回のデリバリー: パーティショナーを使用してエクスポートされたレコードが、"厳密に 1 回" のセマンティクスで配信されます。コミットのタイミングはフラッシュ間隔の構成プロパティ(
flush.interval.ms
)に基づいています。 - サポートされるデータフォーマット: このコネクターは、Avro、JSON スキーマおよび Protobuf フォーマットの Kafka トピックからの入力データをサポートします。スキーマレジストリ ベースのフォーマット(Avro、JSON_SR(JSON スキーマ)、Protobuf など)を使用するには、Schema Registry を有効にしておく必要があります。詳細については、「環境の制限」を参照してください。
- トピックの自動作成: テーブル名を指定しない場合、このコネクターは送信元の Kafka トピック名を使用してトピックを作成できます(つまり、構成プロパティのデフォルトが
${topic}
)。
Connect 用の Confluent Cloud API の使用に関する詳細とサンプルについては、「Confluent Cloud API for Connect」セクションを参照してください。
詳細については、Cloud コネクターの制限事項 を参照してください。
構成プロパティの値と詳細については、「構成プロパティ」を参照してください。
クイックスタート¶
重要
コネクターを構成する前に、必ず「Databricks Delta Lake(AWS)のセットアップ」のタスクを確認して完了してください。
このクイックスタートを使用して、Confluent Cloud Databricks Delta Lake Sink Connector の利用を開始することができます。このクイックスタートでは、コネクターを選択し、データをストリーミングするようにコネクターを構成するための基本的な方法について説明します。
- 前提条件
- AWS 上の Confluent Cloud クラスターへのアクセスを許可されていること。
- Databricks Delta Lake と AWS CloudFormation の手順をすべて完了していること。「Databricks Delta Lake(AWS)のセットアップ」を参照してください。
- Confluent CLI がインストールされ、クラスター用に構成されていること。「Confluent CLI のインストール」を参照してください。
- スキーマレジストリ ベースのフォーマット(Avro、JSON_SR(JSON スキーマ)、Protobuf など)を使用するには、Schema Registry を有効にしておく必要があります。詳細については、「環境の制限」を参照してください。
- ネットワークに関する考慮事項については、「ネットワークアクセス」を参照してください。静的なエグレス IP を使用する方法については、「静的なエグレス IP アドレス」を参照してください。
- バケットにアクセスするために構成された AWS S3 の IAM ポリシー。
- アクセスキー を構成した AWS アカウント。コネクターを設定するときに、これらのアクセスキーを使用します。
- Kafka クラスターの認証情報。次のいずれかの方法で認証情報を指定できます。
- 既存の サービスアカウント のリソース ID を入力する。
- コネクター用の Confluent Cloud サービスアカウント を作成します。「サービスアカウント」のドキュメントで、必要な ACL エントリを確認してください。一部のコネクターには固有の ACL 要件があります。
- Confluent Cloud の API キーとシークレットを作成する。キーとシークレットを作成するには、confluent api-key create を使用するか、コネクターのセットアップ時に Cloud Console で直接 API キーとシークレットを自動生成します。
Confluent Cloud Console の使用¶
ステップ 1: Confluent Cloud クラスターを起動します。¶
インストール手順については、「Confluent Cloud を使用した Apache Kafka のクイックスタート」を参照してください。
ステップ 2: コネクターを追加します。¶
左のナビゲーションメニューの Data integration をクリックし、Connectors をクリックします。クラスター内に既にコネクターがある場合は、+ Add connector をクリックします。
ステップ 4: コネクターの詳細を入力します。¶
注釈
- Ensure you have all your prerequisites completed.
- アスタリスク ( * ) は必須項目であることを示しています。
At the Add Databricks Delta Lake Sink Connector screen, complete the following:
If you've already populated your Kafka topics, select the topic(s) you want to connect from the Topics list.
To create a new topic, click +Add new topic.
- Select the way you want to provide Kafka Cluster credentials. You
can choose one of the following options:
- Global Access: Allows your connector to access everything you have access to. With global access, connector access will be linked to your account. This option is not recommended for production.
- Granular access: Limits the access for your connector. You will be able to manage connector access through a service account. This option is recommended for production.
- Use an existing API key: Allows you to enter an API key and secret part you have stored. You can enter an API key and secret (or generate these in the Cloud Console).
- Click Continue.
- Enter the following Databricks Delta Lake connection details. These fields use information you
get from Databricks and AWS. See the Databricks Delta Lake setup
procedure.
- Delta Lake Host Name: The host name used to connect to Delta Lake.
- Delta Lake HTTP Path: The HTTP path used to connect to Delta Lake.
- Delta Lake Token: The personal access token used to authenticate the user when connecting to Delta Lake using JDBC.
- Delta Lake Catalog: The destination catalog under which the destination database and tables are located.
- Delta Lake Database: The destination database under which the destination tables are located.
- In the S3 Staging Bucket Name field, enter the S3 staging bucket where files get written to from Kafka and subsequently copied into the Databricks Delta Lake table.
- Visit your https://docs.aws.amazon.com/general/latest/gr/aws-sec-cred-types.html#access-keys-and-secret-access-keys and select a Key ID to provide S3 access to this Confluent connector. Enter the Access Key ID in the In the Staging S3 Access Key ID field.
- In the Staging S3 Secret Access Key field, enter your S3 secret access key.
- Click Continue.
注釈
Configuration properties that are not shown in the Cloud Console use the default values. See 構成プロパティ for all property values and definitions.
Input Kafka record value で Kafka 入力レコード値のフォーマット(Kafka トピックから送られるデータ)を AVRO、JSON_SR(JSON スキーマ)、または PROTOBUF から選択します。スキーマベースのメッセージフォーマット(Avro、JSON_SR(JSON スキーマ)、Protobuf など)を使用するには、有効なスキーマが Schema Registry に存在する必要があります。詳細については、「環境の制限」を参照してください。
Show advanced configurations
Delta Lake Table Format: A format string for the destination table name, which may contain
${topic}
as a placeholder for the originating topic name. For example, to create a table namedkafka-orders
based on a Kafka topic namedorders
, you would enterkafka-${topic}
in this field.Delta Lake Topic2Table Map: Map of topics to tables
Delta Lake Table Auto Create: Specifies whether to create the destination table based on record schema if it does not exist.
Delta Lake Tables Location: The underlying location where the data in the Delta Lake table(s) is stored.
Delta Lake Table2Partition Map: Map of tables to partition fields.
Flush Interval (ms): The time interval in milliseconds to periodically invoke file commits. This configuration ensures that file commits are invoked at every configured interval.
For Transforms and Predicates, see the Single Message Transforms (SMT) documentation for details.
See 構成プロパティ for all property values and definitions.
Click Continue.
Based on the number of topic partitions you select, you will be provided with a recommended number of tasks.
- To change the number of recommended tasks, enter the number of tasks for the connector to use in the Tasks field.
- Click Continue.
Verify the connection details.
Click Launch.
コネクターのステータスが Provisioning から Running に変わります。
Step 5: Check the S3 bucket.¶
レコードがステージング Amazon S3 バケットに取り込まれ、Databricks Delta Lake テーブルに取り込まれていることを確認します。
Connect 用の Confluent Cloud API の使用に関する詳細とサンプルについては、「Confluent Cloud API for Connect」セクションを参照してください。
ちなみに
コネクターを起動すると、デッドレターキューのトピックが自動的に作成されます。詳細については、「Confluent Cloud デッドレターキュー」を参照してください。
参考
フルマネージド型の Confluent Cloud コネクターが Confluent Cloud ksqlDB でどのように動作するかを示す例については、「Cloud ETL のデモ」を参照してください。この例では、Confluent CLI を使用して Confluent Cloud のリソースを管理する方法についても説明しています。
Confluent CLI を使用する場合¶
以下の手順に従うと、Confluent CLI を使用してコネクターをセットアップし、実行できます。
注釈
- すべての 前提条件 を満たしていることを確認してください。
- コマンド例では Confluent CLI バージョン 2 を使用しています。詳細については、「Confluent CLI v2 への移行」を参照してください。
ステップ 2: コネクターの必須の構成プロパティを表示します。¶
以下のコマンドを実行して、コネクターの必須プロパティを表示します。
confluent connect plugin describe <connector-catalog-name>
例:
confluent connect plugin describe DatabricksDeltaLakeSink
出力例:
Following are the required configs:
connector.class: DatabricksDeltaLakeSink
topics
input.data.format
name
kafka.auth.mode
kafka.api.key
kafka.api.secret
delta.lake.host.name
delta.lake.http.path
delta.lake.token
staging.s3.access.key.id
staging.s3.secret.access.key
staging.bucket.name
flush.interval.ms
ステップ 3: コネクターの構成ファイルを作成します。¶
コネクター構成プロパティを含む JSON ファイルを作成します。以下の例は、コネクターの必須プロパティとオプションのプロパティを示しています。
{
"name": "DatabricksDeltaLakeSinkConnector_0",
"config": {
"topics": "clickstreams, pageviews",
"input.data.format": "AVRO",
"connector.class": "DatabricksDeltaLakeSink",
"name": "DatabricksDeltaLakeSinkConnector_0",
"kafka.auth.mode": "KAFKA_API_KEY",
"kafka.api.key": "****************",
"kafka.api.secret": "**************************************************",
"delta.lake.host.name": "dbc-e12345cd-e12345ed.cloud.databricks.com",
"delta.lake.http.path": "sql/protocolv1/o/1234567891811460/0000-01234-str6jlpz",
"delta.lake.token": "************************************",
"delta.lake.topic2table.map": "pageviews:pageviews,clickstreams:clickstreams-test",
"delta.lake.table.auto.create": "false",
"staging.s3.access.key.id": "********************",
"staging.s3.secret.access.key": "****************************************",
"staging.bucket.name": "databricks0",
"flush.interval.ms": "100",
"tasks.max": "1"
}
}
以下の必須プロパティの定義に注意してください。
"name"
: 新しいコネクターの名前を設定します。"connector.class"
: コネクターのプラグイン名を指定します。"topics"
: 特定のトピック名を指定するか、複数のトピック名をコンマ区切りにしたリストを指定します。
"kafka.auth.mode"
: 使用するコネクターの認証モードを指定します。オプションはSERVICE_ACCOUNT
またはKAFKA_API_KEY
(デフォルト)です。API キーとシークレットを使用するには、構成プロパティkafka.api.key
とkafka.api.secret
を構成例(前述)のように指定します。サービスアカウント を使用するには、プロパティkafka.service.account.id=<service-account-resource-ID>
に リソース ID を指定します。使用できるサービスアカウントのリソース ID のリストを表示するには、次のコマンドを使用します。confluent iam service-account list
例:
confluent iam service-account list Id | Resource ID | Name | Description +---------+-------------+-------------------+------------------- 123456 | sa-l1r23m | sa-1 | Service account 1 789101 | sa-l4d56p | sa-2 | Service account 2
"input.data.format"
: Kafka 入力レコード値のフォーマット(Kafka トピックから送られるデータ)を設定します。指定可能なエントリは、AVRO、JSON_SR、および PROTOBUF です。スキーマベースのメッセージフォーマット(たとえば、Avro、JSON_SR(JSON スキーマ)、および Protobuf)を使用するには、Confluent Cloud Schema Registry を構成しておく必要があります。"delta.lake...."
: この情報をどこで取得できるかについては、Databricks Delta Lake のセットアップ手順 を参照してください。その他のプロパティの値と詳細については、「構成プロパティ」を参照してください。"staging...."
: これらのプロパティには、Databricks と AWS で取得した情報を使用します。Databricks Delta Lake のセットアップ手順 を参照してください。"flush.interval.ms"
: ファイルのコミットを定期的に呼び出す間隔(ミリ秒)。このプロパティにより、構成された間隔で確実にコネクターがファイルコミットを呼び出すようになります。コミットする時刻は00:00
UTC に合わせて調整されます。前回のコミット時刻やメッセージ数にかかわらず、スケジュールで指定された時刻にコミットが実行されます。この構成は、毎正時など、サーバーの現在時刻に基づいてデータをコミットする必要がある場合に役立ちます。使用されるデフォルト値は10000
ミリ秒(10 秒)です。"tasks.max"
: このコネクターで使用できる タスク の最大数を入力します。このコネクターは、コネクターインスタンスごとに 1 つのタスクの実行をサポートします。
Single Message Transforms: CLI を使用した SMT の追加の詳細については、Single Message Transforms(SMT) のドキュメントを参照してください。このコネクターでサポートされていない SMT のリストについては、「サポートされない変換」を参照してください。
構成プロパティの値と詳細については、「構成プロパティ」を参照してください。
ステップ 4: プロパティファイルを読み込み、コネクターを作成します。¶
以下のコマンドを入力して、構成を読み込み、コネクターを起動します。
confluent connect create --config <file-name>.json
例:
confluent connect create --config databricks-delta-lake-sink-config.json
出力例:
Created connector DatabricksDeltaLakeSinkConnector_0 lcc-ix4dl
ステップ 5: コネクターのステータスを確認します。¶
以下のコマンドを入力して、コネクターのステータスを確認します。
confluent connect list
出力例:
ID | Name | Status | Type
+-----------+------------------------------------+---------+------+
lcc-ix4dl | DatabricksDeltaLakeSinkConnector_0 | RUNNING | sink
ステップ 6: S3 バケットを確認します。¶
レコードがステージング Amazon S3 バケットに取り込まれ、Databricks Delta Lake テーブルに取り込まれていることを確認します。
Connect 用の Confluent Cloud API の使用に関する詳細とサンプルについては、「Confluent Cloud API for Connect」セクションを参照してください。
ちなみに
コネクターを起動すると、デッドレターキューのトピックが自動的に作成されます。詳細については、「Confluent Cloud デッドレターキュー」を参照してください。
構成プロパティ¶
The following connector configuration properties are used for this connector.
Which topics do you want to get data from?¶
topics
Identifies the topic name or a comma-separated list of topic names.
- Type: list
- Importance: high
Input messages¶
input.data.format
Sets the input Kafka record value format. Valid entries are AVRO, JSON_SR, or PROTOBUF. Note that you need to have Confluent Cloud Schema Registry configured if using a schema-based message format like AVRO, JSON_SR, and PROTOBUF.
- Type: string
- Importance: high
How should we connect to your data?¶
name
Sets a name for your connector.
- Type: string
- Valid Values: A string at most 64 characters long
- Importance: high
Kafka Cluster credentials¶
kafka.auth.mode
Kafka Authentication mode. It can be one of KAFKA_API_KEY or SERVICE_ACCOUNT. It defaults to KAFKA_API_KEY mode.
- Type: string
- Default: KAFKA_API_KEY
- Valid Values: SERVICE_ACCOUNT, KAFKA_API_KEY
- Importance: high
kafka.api.key
- Type: password
- Importance: high
kafka.service.account.id
The Service Account that will be used to generate the API keys to communicate with Kafka Cluster.
- Type: string
- Importance: high
kafka.api.secret
- Type: password
- Importance: high
How should we connect to your Databricks Delta Lake?¶
delta.lake.host.name
The host name used to connect to Delta Lake.
- Type: string
- Importance: high
delta.lake.http.path
The HTTP path used to connect to Delta Lake.
- Type: string
- Importance: high
delta.lake.token
The personal access token used to authenticate the user when connecting to Delta Lake via JDBC.
- Type: password
- Importance: high
delta.lake.catalog
The destination catalog under which the destination database and tables are located.
- Type: string
- Default: ""
- Importance: low
delta.lake.database
The destination database under which the destination tables are located.
- Type: string
- Default: default
- Importance: low
delta.lake.table.format
A format string for the destination table name, which may contain '${topic}' as a placeholder for the originating topic name. For example,
kafka_${topic}
for the topic 'orders' will map to the table name 'kafka_orders'.- Type: string
- Default: ${topic}
- Importance: medium
delta.lake.topic2table.map
Map of topics to tables (optional). Format: comma-seperated tuples, e.g. <topic-1>:<table-1>,<topic-2>:<table-2>,...
- Type: string
- Default: ""
- Importance: low
delta.lake.table.auto.create
Whether to automatically create the destination table based on record schema if it does not exist.
- Type: boolean
- Default: false
- Importance: medium
delta.lake.tables.location
The underlying location where the data in the Delta Lake table(s) is stored.
- Type: string
- Default: ""
- Importance: medium
Amazon S3 details¶
staging.s3.access.key.id
- Type: password
- Importance: high
staging.s3.secret.access.key
- Type: password
- Importance: high
flush.interval.ms
The time interval in milliseconds to periodically invoke file commits. This configuration ensures that file commits are invoked at every configured interval. Time of commit will be adjusted to 00:00 of selected timezone. The commit will be performed at the scheduled time, regardless of the previous commit time or number of messages. This configuration is useful when you have to commit your data based on current server time, for example at the beginning of every hour.
- Type: long
- Default: 10000 (10 seconds)
- Importance: medium
staging.bucket.name
The S3 staging bucket where files get written to from Kafka and subsequently copied into the Databricks Delta Lake table. Must be in the same region as your Confluent Cloud cluster.
- Type: string
- Importance: high
Number of tasks for this connector¶
tasks.max
- Type: int
- Valid Values: [1,...,1]
- Importance: high
次のステップ¶
参考
フルマネージド型の Confluent Cloud コネクターが Confluent Cloud ksqlDB でどのように動作するかを示す例については、「Cloud ETL のデモ」を参照してください。この例では、Confluent CLI を使用して Confluent Cloud のリソースを管理する方法についても説明しています。