重要
このページの日本語コンテンツは古くなっている可能性があります。最新の英語版コンテンツをご覧になるには、こちらをクリックしてください。
Databricks Delta Lake Sink Connector for Confluent Cloud¶
注釈
Confluent Platform 用にコネクターをローカルにインストールする場合は、「Databricks Delta Lake Sink Connector for Confluent Platform」を参照してください。
Databricks Delta Lake Sink Connector は、Apache Kafka® のデータを定期的にポーリングしてデータを Amazon S3 ステージングバケットにコピーします。その後、それらのレコードを Databricks Delta Lake インスタンスにコミットします。
次の考慮事項に注意してください。
- コネクターは、アマゾンウェブサービス (AWS)でのみ利用できます。
- コネクターで行えるのはデータの追加のみです。
- データは Amazon S3 バケットにステージングされます。このバケット内のファイルを削除した場合は、"厳密に 1 回" のセマンティクス(EOS)は失われます。
- Amazon S3 バケット、Delta Lake インスタンス、Kafka クラスターは、同じリージョンに存在している必要があります。
- コネクターは、
partition
という名前のフィールドを追加します。Delta Lake テーブルには、INT 型で partition という名前のフィールド(partition INT
)を含める必要があります。 - Confluent Cloud と Confluent Cloud Enterprise では、組織はタスク 1 つとコネクター 1 つに制限されます。
詳細については、Cloud コネクターの制限事項 を参照してください。
機能¶
Databricks Delta Lake Sink Connector には、以下の機能があります。
- フラッシュ間隔を設定した厳密に 1 回のデリバリー: パーティショナーを使用してエクスポートされたレコードが、"厳密に 1 回" のセマンティクスで配信されます。コミットのタイミングはフラッシュ間隔の構成プロパティ(
flush.interval.ms
)に基づいています。 - サポートされるデータフォーマット: このコネクターは、Avro、JSON スキーマおよび Protobuf フォーマットの Kafka トピックからの入力データをサポートします。スキーマレジストリ ベースのフォーマット(Avro、JSON_SR(JSON スキーマ)、Protobuf など)を使用するには、Schema Registry を有効にしておく必要があります。詳細については、「スキーマレジストリ Enabled Environments」を参照してください。
- Automatically creates tables: If you do not provide a table name, the connector can create a table using the originating Kafka topic name (that is–the configuration property defaults to
${topic}
).
Connect 用の Confluent Cloud API の使用に関する詳細とサンプルについては、「Confluent Cloud API for Connect」セクションを参照してください。
詳細については、Cloud コネクターの制限事項 を参照してください。
構成プロパティの値と詳細については、「構成プロパティ」を参照してください。
クイックスタート¶
重要
コネクターを構成する前に、必ず「Databricks Delta Lake(AWS)のセットアップ」のタスクを確認して完了してください。
このクイックスタートを使用して、Confluent Cloud Databricks Delta Lake Sink Connector の利用を開始することができます。このクイックスタートでは、コネクターを選択し、データをストリーミングするようにコネクターを構成するための基本的な方法について説明します。
- 前提条件
- AWS 上の Confluent Cloud クラスターへのアクセスを許可されていること。
- Databricks Delta Lake と AWS CloudFormation の手順をすべて完了していること。「Databricks Delta Lake(AWS)のセットアップ」を参照してください。
- Confluent CLI がインストールされ、クラスター用に構成されていること。「Confluent CLI のインストール」を参照してください。
- スキーマレジストリ ベースのフォーマット(Avro、JSON_SR(JSON スキーマ)、Protobuf など)を使用するには、Schema Registry を有効にしておく必要があります。詳細については、「スキーマレジストリ Enabled Environments」を参照してください。
- ネットワークに関する考慮事項については、「Networking and DNS Considerations」を参照してください。静的なエグレス IP を使用する方法については、「静的なエグレス IP アドレス」を参照してください。
- バケットにアクセスするために構成された AWS S3 の IAM ポリシー。
- アクセスキー を構成した AWS アカウント。コネクターを設定するときに、これらのアクセスキーを使用します。
- Kafka クラスターの認証情報。次のいずれかの方法で認証情報を指定できます。
- 既存の サービスアカウント のリソース ID を入力する。
- コネクター用の Confluent Cloud サービスアカウント を作成します。「サービスアカウント」のドキュメントで、必要な ACL エントリを確認してください。一部のコネクターには固有の ACL 要件があります。
- Confluent Cloud の API キーとシークレットを作成する。キーとシークレットを作成するには、confluent api-key create を使用するか、コネクターのセットアップ時に Cloud Console で直接 API キーとシークレットを自動生成します。
Confluent Cloud Console の使用¶
ステップ 1: Confluent Cloud クラスターを起動します。¶
インストール手順については、「Quick Start for Confluent Cloud」を参照してください。
ステップ 2: コネクターを追加します。¶
左のナビゲーションメニューの Data integration をクリックし、Connectors をクリックします。クラスター内に既にコネクターがある場合は、+ Add connector をクリックします。
ステップ 4: コネクターの詳細を入力します。¶
注釈
- すべての 前提条件 を満たしていることを確認してください。
- アスタリスク ( * ) は必須項目であることを示しています。
Add Databricks Delta Lake Sink Connector 画面で、以下を実行します。
既に Kafka トピックを用意している場合は、Topics リストから接続するトピックを選択します。
新しいトピックを作成するには、+Add new topic をクリックします。
- Kafka Cluster credentials で Kafka クラスターの認証情報の指定方法を選択します。以下のいずれかのオプションを選択できます。
- Global Access: コネクターは、ユーザーがアクセス権限を持つすべての対象にアクセスできます。グローバルアクセスの場合、コネクターのアクセス権限は、ユーザーのアカウントにリンクされます。このオプションは本稼働環境では推奨されません。
- Granular access: コネクターのアクセスが制限されます。コネクターのアクセス権限は サービスアカウント から制御できます。本稼働環境にはこのオプションをお勧めします。
- Use an existing API key: 保存済みの API キーおよびシークレット部分を入力できます。API キーとシークレットを入力するか Cloud Console でこれらを生成することもできます。
- Continue をクリックします。
- 以下の Databricks Delta Lake 接続の詳細情報を入力します。これらのフィールドには、Databricks と AWS で取得した情報を使用します。Databricks Delta Lake のセットアップ手順 を参照してください。
- Delta Lake Host Name: Delta Lake との接続に使用されるホスト名。
- Delta Lake HTTP Path: Delta Lake との接続に使用される HTTP パス。
- Delta Lake Token: Delta Lake に JDBC を使用して接続する際に、ユーザーの認証に使用される個人アクセストークン。
- Delta Lake Catalog: 送信先データベースとテーブルが配置されている送信先カタログ。
- Delta Lake Database: 送信先テーブルが配置されている送信先データベース。
- S3 Staging Bucket Name フィールドに、Kafka からファイルが書き込まれてから Databricks Delta Lake テーブルにコピーされる S3 ステージングバケットを入力します。
- https://docs.aws.amazon.com/general/latest/gr/aws-sec-cred-types.html#access-keys-and-secret-access-keys にアクセスし、この Confluent コネクターへの S3 アクセスを提供するキー ID を選択します。Staging S3 Access Key ID フィールドに、アクセスキー ID を入力します。
- Staging S3 Secret Access Key フィールドに、S3 シークレットアクセスキーを入力します。
- Continue をクリックします。
注釈
Cloud Console に表示されない構成プロパティでは、デフォルト値が使用されます。すべてのプロパティの値と定義については、「構成プロパティ」を参照してください。
Input Kafka record value で Kafka 入力レコード値のフォーマット(Kafka トピックから送られるデータ)を AVRO、JSON_SR(JSON スキーマ)、または PROTOBUF から選択します。スキーマベースのメッセージフォーマット(Avro、JSON_SR(JSON スキーマ)、Protobuf など)を使用するには、有効なスキーマが Schema Registry に存在する必要があります。詳細については、「スキーマレジストリ Enabled Environments」を参照してください。
Show advanced configurations
Delta Lake Table Format: 送信先テーブルの名前のフォーマット文字列。元のトピック名を表すプレースホルダーとして
${topic}
を含めることができます。たとえば、orders
という名前の Kafka トピックに基づいてkafka-orders
という名前のテーブルを作成するには、このフィールドにkafka-${topic}
と入力します。Delta Lake Topic2Table Map: トピックとテーブルのマッピング。
Delta Lake Table Auto Create: 送信先テーブルが存在しない場合に、レコードスキーマに基づいてテーブルを作成するかどうかを指定します。
Delta Lake Tables Location: Delta Lake テーブルのデータが保管される場所。
Delta Lake Table2Partition Map: テーブルとパーティションフィールドのマッピング。
Flush Interval (ms): ファイルのコミットを定期的に呼び出す時間間隔(ミリ秒)。この構成により、ファイルのコミットが、構成された間隔で呼び出されようになります。
Transforms and Predicates については、Single Message Transforms(SMT) のドキュメントを参照してください。
すべてのプロパティの値と定義については、「構成プロパティ」を参照してください。
Continue をクリックします。
選択するトピックのパーティション数に基づいて、推奨タスク数が表示されます。
- 推奨されたタスク数を変更するには、Tasks フィールドに、コネクターで使用する タスク の数を入力します。
- Continue をクリックします。
接続の詳細情報を確認します。
Launch をクリックします。
コネクターのステータスが Provisioning から Running に変わります。
ステップ 5: S3 バケットを確認します。¶
レコードがステージング Amazon S3 バケットに取り込まれ、Databricks Delta Lake テーブルに取り込まれていることを確認します。
Connect 用の Confluent Cloud API の使用に関する詳細とサンプルについては、「Confluent Cloud API for Connect」セクションを参照してください。
ちなみに
コネクターを起動すると、デッドレターキューのトピックが自動的に作成されます。詳細については、「Confluent Cloud デッドレターキュー」を参照してください。
参考
フルマネージド型の Confluent Cloud コネクターが Confluent Cloud ksqlDB でどのように動作するかを示す例については、「Cloud ETL のデモ」を参照してください。この例では、Confluent CLI を使用して Confluent Cloud のリソースを管理する方法についても説明しています。
Confluent CLI を使用する場合¶
以下の手順に従うと、Confluent CLI を使用してコネクターをセットアップし、実行できます。
注釈
- すべての 前提条件 を満たしていることを確認してください。
- コマンド例では Confluent CLI バージョン 2 を使用しています。詳細については、「Confluent CLI v2 への移行」を参照してください。
ステップ 2: コネクターの必須の構成プロパティを表示します。¶
以下のコマンドを実行して、コネクターの必須プロパティを表示します。
confluent connect plugin describe <connector-catalog-name>
例:
confluent connect plugin describe DatabricksDeltaLakeSink
出力例:
Following are the required configs:
connector.class: DatabricksDeltaLakeSink
topics
input.data.format
name
kafka.auth.mode
kafka.api.key
kafka.api.secret
delta.lake.host.name
delta.lake.http.path
delta.lake.token
staging.s3.access.key.id
staging.s3.secret.access.key
staging.bucket.name
flush.interval.ms
ステップ 3: コネクターの構成ファイルを作成します。¶
コネクター構成プロパティを含む JSON ファイルを作成します。以下の例は、コネクターの必須プロパティとオプションのプロパティを示しています。
{
"name": "DatabricksDeltaLakeSinkConnector_0",
"config": {
"topics": "clickstreams, pageviews",
"input.data.format": "AVRO",
"connector.class": "DatabricksDeltaLakeSink",
"name": "DatabricksDeltaLakeSinkConnector_0",
"kafka.auth.mode": "KAFKA_API_KEY",
"kafka.api.key": "****************",
"kafka.api.secret": "**************************************************",
"delta.lake.host.name": "dbc-e12345cd-e12345ed.cloud.databricks.com",
"delta.lake.http.path": "sql/protocolv1/o/1234567891811460/0000-01234-str6jlpz",
"delta.lake.token": "************************************",
"delta.lake.topic2table.map": "pageviews:pageviews,clickstreams:clickstreams-test",
"delta.lake.table.auto.create": "false",
"staging.s3.access.key.id": "********************",
"staging.s3.secret.access.key": "****************************************",
"staging.bucket.name": "databricks0",
"flush.interval.ms": "100",
"tasks.max": "1"
}
}
以下の必須プロパティの定義に注意してください。
"name"
: 新しいコネクターの名前を設定します。"connector.class"
: コネクターのプラグイン名を指定します。"topics"
: 特定のトピック名を指定するか、複数のトピック名をコンマ区切りにしたリストを指定します。
"kafka.auth.mode"
: 使用するコネクターの認証モードを指定します。オプションはSERVICE_ACCOUNT
またはKAFKA_API_KEY
(デフォルト)です。API キーとシークレットを使用するには、構成プロパティkafka.api.key
とkafka.api.secret
を構成例(前述)のように指定します。サービスアカウント を使用するには、プロパティkafka.service.account.id=<service-account-resource-ID>
に リソース ID を指定します。使用できるサービスアカウントのリソース ID のリストを表示するには、次のコマンドを使用します。confluent iam service-account list
例:
confluent iam service-account list Id | Resource ID | Name | Description +---------+-------------+-------------------+------------------- 123456 | sa-l1r23m | sa-1 | Service account 1 789101 | sa-l4d56p | sa-2 | Service account 2
"input.data.format"
: Kafka 入力レコード値のフォーマット(Kafka トピックから送られるデータ)を設定します。指定可能なエントリは、AVRO、JSON_SR、および PROTOBUF です。スキーマベースのメッセージフォーマット(たとえば、Avro、JSON_SR(JSON スキーマ)、および Protobuf)を使用するには、Confluent Cloud Schema Registry を構成しておく必要があります。"delta.lake...."
: この情報をどこで取得できるかについては、Databricks Delta Lake のセットアップ手順 を参照してください。その他のプロパティの値と詳細については、「構成プロパティ」を参照してください。"staging...."
: これらのプロパティには、Databricks と AWS で取得した情報を使用します。Databricks Delta Lake のセットアップ手順 を参照してください。"flush.interval.ms"
: ファイルのコミットを定期的に呼び出す間隔(ミリ秒)。このプロパティにより、構成された間隔で確実にコネクターがファイルコミットを呼び出すようになります。コミットする時刻は00:00
UTC に合わせて調整されます。前回のコミット時刻やメッセージ数にかかわらず、スケジュールで指定された時刻にコミットが実行されます。この構成は、毎正時など、サーバーの現在時刻に基づいてデータをコミットする必要がある場合に役立ちます。使用されるデフォルト値は10000
ミリ秒(10 秒)です。"tasks.max"
: このコネクターで使用できる タスク の最大数を入力します。このコネクターは、コネクターインスタンスごとに 1 つのタスクの実行をサポートします。
Single Message Transforms: CLI を使用した SMT の追加の詳細については、Single Message Transforms(SMT) のドキュメントを参照してください。このコネクターでサポートされていない SMT のリストについては、「サポートされない変換」を参照してください。
構成プロパティの値と詳細については、「構成プロパティ」を参照してください。
ステップ 4: プロパティファイルを読み込み、コネクターを作成します。¶
以下のコマンドを入力して、構成を読み込み、コネクターを起動します。
confluent connect create --config <file-name>.json
例:
confluent connect create --config databricks-delta-lake-sink-config.json
出力例:
Created connector DatabricksDeltaLakeSinkConnector_0 lcc-ix4dl
ステップ 5: コネクターのステータスを確認します。¶
以下のコマンドを入力して、コネクターのステータスを確認します。
confluent connect list
出力例:
ID | Name | Status | Type
+-----------+------------------------------------+---------+------+
lcc-ix4dl | DatabricksDeltaLakeSinkConnector_0 | RUNNING | sink
ステップ 6: S3 バケットを確認します。¶
レコードがステージング Amazon S3 バケットに取り込まれ、Databricks Delta Lake テーブルに取り込まれていることを確認します。
Connect 用の Confluent Cloud API の使用に関する詳細とサンプルについては、「Confluent Cloud API for Connect」セクションを参照してください。
ちなみに
コネクターを起動すると、デッドレターキューのトピックが自動的に作成されます。詳細については、「Confluent Cloud デッドレターキュー」を参照してください。
構成プロパティ¶
このコネクターでは、以下のコネクター構成プロパティが使用されます。
データの取得元とするトピック(Which topics do you want to get data from?)¶
topics
特定のトピック名を指定するか、複数のトピック名をコンマ区切りにしたリストを指定します。
- 型: list
- 重要度: 高
入力メッセージ(Input messages)¶
input.data.format
Kafka 入力レコード値のフォーマットを設定します。指定可能なエントリは、AVRO、JSON_SR、または PROTOBUF です。スキーマベースのメッセージフォーマット(AVRO、JSON_SR、PROTOBUF など)を使用する場合は、Confluent Cloud Schema Registry を構成しておく必要がある点に注意してください。
- 型: string
- 重要度: 高
input.key.format
Sets the input Kafka record key format. Valid entries are AVRO, BYTES, JSON, JSON_SR, PROTOBUF, or STRING. Note that you need to have Confluent Cloud Schema Registry configured if using a schema-based message format like AVRO, JSON_SR, and PROTOBUF
- 型: string
- Default: JSON
- Valid Values: AVRO, BYTES, JSON, JSON_SR, PROTOBUF, STRING
- 重要度: 高
データへの接続方法(How should we connect to your data?)¶
name
コネクターの名前を設定します。
- 型: string
- 指定可能な値: 最大 64 文字の文字列
- 重要度: 高
Kafka クラスターの認証情報(Kafka Cluster credentials)¶
kafka.auth.mode
Kafka の認証モード。KAFKA_API_KEY または SERVICE_ACCOUNT を指定できます。デフォルトは KAFKA_API_KEY モードです。
- 型: string
- デフォルト: KAFKA_API_KEY
- 指定可能な値: KAFKA_API_KEY、SERVICE_ACCOUNT
- 重要度: 高
kafka.api.key
- 型: password
- 重要度: 高
kafka.service.account.id
Kafka クラスターとの通信用の API キーを生成するために使用されるサービスアカウント。
- 型: string
- 重要度: 高
kafka.api.secret
- 型: password
- 重要度: 高
Databricks Delta Lake への接続方法(How should we connect to your Databricks Delta Lake?)¶
delta.lake.host.name
Delta Lake との接続に使用されるホスト名。
- 型: string
- 重要度: 高
delta.lake.http.path
Delta Lake との接続に使用される HTTP パス。
- 型: string
- 重要度: 高
delta.lake.token
Delta Lake に JDBC を使用して接続する際に、ユーザーの認証に使用される個人アクセストークン。
- 型: password
- 重要度: 高
delta.lake.catalog
送信先データベースとテーブルが配置されている送信先カタログ。
- 型: string
- デフォルト: ""
- 重要度: 低
delta.lake.database
送信先テーブルが配置されている送信先データベース。
- 型: string
- デフォルト: default
- 重要度: 低
delta.lake.table.format
送信先テーブル名の形式制御文字列。送信元のトピック名を表すプレースホルダーとして「${topic}」を含めることができます。たとえば、トピック「orders」の場合、
kafka_${topic}
はテーブル名「kafka_orders」にマップされます。- 型: string
- デフォルト: ${topic}
- 重要度: 中
delta.lake.topic2table.map
トピックとテーブルのマッピング(省略可)。フォーマット: コンマ区切りのタプル。たとえば、<topic-1>:<table-1>,<topic-2>:<table-2>,... のように指定します。
- 型: string
- デフォルト: ""
- 重要度: 低
delta.lake.table.auto.create
送信先テーブルが存在しない場合に、レコードスキーマに基づいてテーブルを自動的に作成するかどうかを指定します。
- 型: boolean
- デフォルト: false
- 重要度: 中
delta.lake.tables.location
Delta Lake テーブルのデータが保管される場所。s3://<your-s3-bucket>/tmp/ を設定した場合、Delta Lake データは s3://<your-s3-bucket>/tmp/ に保管されます。Databricks Delta Lake インスタンスの AWS IAM に、指定したディレクトリにレコードを書き込むアクセス許可があることと、指定したディレクトリ(tmp など)が存在することを確認します。
- 型: string
- デフォルト: ""
- 重要度: 中
delta.lake.table2partition.map
Map of tables to partition fields (optional). Format: comma-separated tuples. For example: <table-1>:<partition-1>,<table-2>:<partition-2>,... Note that you can specify multiple partitions per table. Be sure to add a separate tuple for each partition. For example: <table-1>:<partition-1>, <table-1>:<partition-2>), <table-2>:<partition-3>
- 型: string
- デフォルト: ""
- 重要度: 低
Amazon S3 の詳細(Amazon S3 details)¶
staging.s3.access.key.id
- 型: password
- 重要度: 高
staging.s3.secret.access.key
- 型: password
- 重要度: 高
flush.interval.ms
ファイルのコミットを定期的に呼び出すための間隔(ミリ秒)。この構成により、ファイルのコミットが、構成された間隔で呼び出されようになります。コミットする時間は選択されたタイムゾーンの 00:00 に調整されます。前回コミットされた時間やメッセージ数に関係なく、スケジュールで指定された時間にコミットが実行されます。この構成は、毎正時など、サーバーの現在時刻に基づいてデータをコミットする必要がある場合に役立ちます。
- 型: long
- Default: 300000 (5 minutes)
- 重要度: 中
staging.bucket.name
Kafka からファイルが書き込まれてから Databricks Delta Lake テーブルにコピーされる S3 ステージングバケット。Confluent Cloud クラスターと同じリージョンに存在している必要があります。
- 型: string
- 重要度: 高
このコネクターのタスク数(Number of tasks for this connector)¶
tasks.max
- 型: int
- Valid Values: [1,...]
- 重要度: 高
次のステップ¶
参考
フルマネージド型の Confluent Cloud コネクターが Confluent Cloud ksqlDB でどのように動作するかを示す例については、「Cloud ETL のデモ」を参照してください。この例では、Confluent CLI を使用して Confluent Cloud のリソースを管理する方法についても説明しています。