重要

このページの日本語コンテンツは古くなっている可能性があります。最新の英語版コンテンツをご覧になるには、こちらをクリックしてください。

Databricks Delta Lake Sink Connector for Confluent Cloud

注釈

Confluent Platform 用にコネクターをローカルにインストールする場合は、「Databricks Delta Lake Sink Connector for Confluent Platform」を参照してください。

Databricks Delta Lake Sink Connector は、Apache Kafka® のデータを定期的にポーリングしてデータを Amazon S3 ステージングバケットにコピーします。その後、それらのレコードを Databricks Delta Lake インスタンスにコミットします。

次の考慮事項に注意してください。

  • コネクターは、アマゾンウェブサービス (AWS)でのみ利用できます。
  • コネクターで行えるのはデータの追加のみです。
  • データは Amazon S3 バケットにステージングされます。このバケット内のファイルを削除した場合は、"厳密に 1 回" のセマンティクス(EOS)は失われます。
  • Amazon S3 バケット、Delta Lake インスタンス、Kafka クラスターは、同じリージョンに存在している必要があります。
  • コネクターは、partition という名前のフィールドを追加します。Delta Lake テーブルには、INT 型で partition という名前のフィールド(partition INT)を含める必要があります。
  • Confluent Cloud と Confluent Cloud Enterprise では、組織はタスク 1 つとコネクター 1 つに制限されます。

詳細については、Cloud コネクターの制限事項 を参照してください。

機能

Databricks Delta Lake Sink Connector には、以下の機能があります。

  • フラッシュ間隔を設定した厳密に 1 回のデリバリー: パーティショナーを使用してエクスポートされたレコードが、"厳密に 1 回" のセマンティクスで配信されます。コミットのタイミングはフラッシュ間隔の構成プロパティ(flush.interval.ms)に基づいています。
  • サポートされるデータフォーマット: このコネクターは、Avro、JSON スキーマおよび Protobuf フォーマットの Kafka トピックからの入力データをサポートします。スキーマレジストリ ベースのフォーマット(Avro、JSON_SR(JSON スキーマ)、Protobuf など)を使用するには、Schema Registry を有効にしておく必要があります。詳細については、「スキーマレジストリ Enabled Environments」を参照してください。
  • Automatically creates tables: If you do not provide a table name, the connector can create a table using the originating Kafka topic name (that is–the configuration property defaults to ${topic}).

Connect 用の Confluent Cloud API の使用に関する詳細とサンプルについては、「Confluent Cloud API for Connect」セクションを参照してください。

詳細については、Cloud コネクターの制限事項 を参照してください。

構成プロパティの値と詳細については、「構成プロパティ」を参照してください。

クイックスタート

重要

コネクターを構成する前に、必ず「Databricks Delta Lake(AWS)のセットアップ」のタスクを確認して完了してください。

このクイックスタートを使用して、Confluent Cloud Databricks Delta Lake Sink Connector の利用を開始することができます。このクイックスタートでは、コネクターを選択し、データをストリーミングするようにコネクターを構成するための基本的な方法について説明します。

前提条件
  • Kafka クラスターの認証情報。次のいずれかの方法で認証情報を指定できます。
    • 既存の サービスアカウント のリソース ID を入力する。
    • コネクター用の Confluent Cloud サービスアカウント を作成します。「サービスアカウント」のドキュメントで、必要な ACL エントリを確認してください。一部のコネクターには固有の ACL 要件があります。
    • Confluent Cloud の API キーとシークレットを作成する。キーとシークレットを作成するには、confluent api-key create を使用するか、コネクターのセットアップ時に Cloud Console で直接 API キーとシークレットを自動生成します。

Confluent Cloud Console の使用

ステップ 1: Confluent Cloud クラスターを起動します。

インストール手順については、「Quick Start for Confluent Cloud」を参照してください。

ステップ 2: コネクターを追加します。

左のナビゲーションメニューの Data integration をクリックし、Connectors をクリックします。クラスター内に既にコネクターがある場合は、+ Add connector をクリックします。

ステップ 3: コネクターを選択します。

Click the Databricks Delta Lake Sink connector card.

Databricks Delta Lake Sink Connector Card

ステップ 4: コネクターの詳細を入力します。

注釈

  • すべての 前提条件 を満たしていることを確認してください。
  • アスタリスク ( * ) は必須項目であることを示しています。

Add Databricks Delta Lake Sink Connector 画面で、以下を実行します。

既に Kafka トピックを用意している場合は、Topics リストから接続するトピックを選択します。

新しいトピックを作成するには、+Add new topic をクリックします。

ステップ 5: S3 バケットを確認します。

レコードがステージング Amazon S3 バケットに取り込まれ、Databricks Delta Lake テーブルに取り込まれていることを確認します。

Connect 用の Confluent Cloud API の使用に関する詳細とサンプルについては、「Confluent Cloud API for Connect」セクションを参照してください。

ちなみに

コネクターを起動すると、デッドレターキューのトピックが自動的に作成されます。詳細については、「Confluent Cloud デッドレターキュー」を参照してください。

参考

フルマネージド型の Confluent Cloud コネクターが Confluent Cloud ksqlDB でどのように動作するかを示す例については、「Cloud ETL のデモ」を参照してください。この例では、Confluent CLI を使用して Confluent Cloud のリソースを管理する方法についても説明しています。

../../_images/topology.ja.png

Confluent CLI を使用する場合

以下の手順に従うと、Confluent CLI を使用してコネクターをセットアップし、実行できます。

注釈

  • すべての 前提条件 を満たしていることを確認してください。
  • コマンド例では Confluent CLI バージョン 2 を使用しています。詳細については、「Confluent CLI v2 への移行」を参照してください。

ステップ 1: 使用可能なコネクターをリスト表示します。

以下のコマンドを入力して、使用可能なコネクターをリスト表示します。

confluent connect plugin list

ステップ 2: コネクターの必須の構成プロパティを表示します。

以下のコマンドを実行して、コネクターの必須プロパティを表示します。

confluent connect plugin describe <connector-catalog-name>

例:

confluent connect plugin describe DatabricksDeltaLakeSink

出力例:

Following are the required configs:
connector.class: DatabricksDeltaLakeSink
topics
input.data.format
name
kafka.auth.mode
kafka.api.key
kafka.api.secret
delta.lake.host.name
delta.lake.http.path
delta.lake.token
staging.s3.access.key.id
staging.s3.secret.access.key
staging.bucket.name
flush.interval.ms

ステップ 3: コネクターの構成ファイルを作成します。

コネクター構成プロパティを含む JSON ファイルを作成します。以下の例は、コネクターの必須プロパティとオプションのプロパティを示しています。

{
  "name": "DatabricksDeltaLakeSinkConnector_0",
  "config": {
    "topics": "clickstreams, pageviews",
    "input.data.format": "AVRO",
    "connector.class": "DatabricksDeltaLakeSink",
    "name": "DatabricksDeltaLakeSinkConnector_0",
    "kafka.auth.mode": "KAFKA_API_KEY",
    "kafka.api.key": "****************",
    "kafka.api.secret": "**************************************************",
    "delta.lake.host.name": "dbc-e12345cd-e12345ed.cloud.databricks.com",
    "delta.lake.http.path": "sql/protocolv1/o/1234567891811460/0000-01234-str6jlpz",
    "delta.lake.token": "************************************",
    "delta.lake.topic2table.map": "pageviews:pageviews,clickstreams:clickstreams-test",
    "delta.lake.table.auto.create": "false",
    "staging.s3.access.key.id": "********************",
    "staging.s3.secret.access.key": "****************************************",
    "staging.bucket.name": "databricks0",
    "flush.interval.ms": "100",
    "tasks.max": "1"
  }
}

以下の必須プロパティの定義に注意してください。

  • "name": 新しいコネクターの名前を設定します。
  • "connector.class": コネクターのプラグイン名を指定します。
  • "topics": 特定のトピック名を指定するか、複数のトピック名をコンマ区切りにしたリストを指定します。
  • "kafka.auth.mode": 使用するコネクターの認証モードを指定します。オプションは SERVICE_ACCOUNT または KAFKA_API_KEY (デフォルト)です。API キーとシークレットを使用するには、構成プロパティ kafka.api.keykafka.api.secret を構成例(前述)のように指定します。サービスアカウント を使用するには、プロパティ kafka.service.account.id=<service-account-resource-ID>リソース ID を指定します。使用できるサービスアカウントのリソース ID のリストを表示するには、次のコマンドを使用します。

    confluent iam service-account list
    

    例:

    confluent iam service-account list
    
       Id     | Resource ID |       Name        |    Description
    +---------+-------------+-------------------+-------------------
       123456 | sa-l1r23m   | sa-1              | Service account 1
       789101 | sa-l4d56p   | sa-2              | Service account 2
    
  • "input.data.format": Kafka 入力レコード値のフォーマット(Kafka トピックから送られるデータ)を設定します。指定可能なエントリは、AVROJSON_SR、および PROTOBUF です。スキーマベースのメッセージフォーマット(たとえば、Avro、JSON_SR(JSON スキーマ)、および Protobuf)を使用するには、Confluent Cloud Schema Registry を構成しておく必要があります。

  • "delta.lake....": この情報をどこで取得できるかについては、Databricks Delta Lake のセットアップ手順 を参照してください。その他のプロパティの値と詳細については、「構成プロパティ」を参照してください。

  • "staging....": これらのプロパティには、Databricks と AWS で取得した情報を使用します。Databricks Delta Lake のセットアップ手順 を参照してください。

  • "flush.interval.ms": ファイルのコミットを定期的に呼び出す間隔(ミリ秒)。このプロパティにより、構成された間隔で確実にコネクターがファイルコミットを呼び出すようになります。コミットする時刻は 00:00 UTC に合わせて調整されます。前回のコミット時刻やメッセージ数にかかわらず、スケジュールで指定された時刻にコミットが実行されます。この構成は、毎正時など、サーバーの現在時刻に基づいてデータをコミットする必要がある場合に役立ちます。使用されるデフォルト値は 10000 ミリ秒(10 秒)です。

  • "tasks.max": このコネクターで使用できる タスク の最大数を入力します。このコネクターは、コネクターインスタンスごとに 1 つのタスクの実行をサポートします。

Single Message Transforms: CLI を使用した SMT の追加の詳細については、Single Message Transforms(SMT) のドキュメントを参照してください。このコネクターでサポートされていない SMT のリストについては、「サポートされない変換」を参照してください。

構成プロパティの値と詳細については、「構成プロパティ」を参照してください。

ステップ 4: プロパティファイルを読み込み、コネクターを作成します。

以下のコマンドを入力して、構成を読み込み、コネクターを起動します。

confluent connect create --config <file-name>.json

例:

confluent connect create --config databricks-delta-lake-sink-config.json

出力例:

Created connector DatabricksDeltaLakeSinkConnector_0 lcc-ix4dl

ステップ 5: コネクターのステータスを確認します。

以下のコマンドを入力して、コネクターのステータスを確認します。

confluent connect list

出力例:

ID          |       Name                         | Status  | Type
+-----------+------------------------------------+---------+------+
lcc-ix4dl   | DatabricksDeltaLakeSinkConnector_0 | RUNNING | sink

ステップ 6: S3 バケットを確認します。

レコードがステージング Amazon S3 バケットに取り込まれ、Databricks Delta Lake テーブルに取り込まれていることを確認します。

Connect 用の Confluent Cloud API の使用に関する詳細とサンプルについては、「Confluent Cloud API for Connect」セクションを参照してください。

ちなみに

コネクターを起動すると、デッドレターキューのトピックが自動的に作成されます。詳細については、「Confluent Cloud デッドレターキュー」を参照してください。

構成プロパティ

このコネクターでは、以下のコネクター構成プロパティが使用されます。

データの取得元とするトピック(Which topics do you want to get data from?)

topics

特定のトピック名を指定するか、複数のトピック名をコンマ区切りにしたリストを指定します。

  • 型: list
  • 重要度: 高

入力メッセージ(Input messages)

input.data.format

Kafka 入力レコード値のフォーマットを設定します。指定可能なエントリは、AVRO、JSON_SR、または PROTOBUF です。スキーマベースのメッセージフォーマット(AVRO、JSON_SR、PROTOBUF など)を使用する場合は、Confluent Cloud Schema Registry を構成しておく必要がある点に注意してください。

  • 型: string
  • 重要度: 高
input.key.format

Sets the input Kafka record key format. Valid entries are AVRO, BYTES, JSON, JSON_SR, PROTOBUF, or STRING. Note that you need to have Confluent Cloud Schema Registry configured if using a schema-based message format like AVRO, JSON_SR, and PROTOBUF

  • 型: string
  • Default: JSON
  • Valid Values: AVRO, BYTES, JSON, JSON_SR, PROTOBUF, STRING
  • 重要度: 高

データへの接続方法(How should we connect to your data?)

name

コネクターの名前を設定します。

  • 型: string
  • 指定可能な値: 最大 64 文字の文字列
  • 重要度: 高

Kafka クラスターの認証情報(Kafka Cluster credentials)

kafka.auth.mode

Kafka の認証モード。KAFKA_API_KEY または SERVICE_ACCOUNT を指定できます。デフォルトは KAFKA_API_KEY モードです。

  • 型: string
  • デフォルト: KAFKA_API_KEY
  • 指定可能な値: KAFKA_API_KEY、SERVICE_ACCOUNT
  • 重要度: 高
kafka.api.key
  • 型: password
  • 重要度: 高
kafka.service.account.id

Kafka クラスターとの通信用の API キーを生成するために使用されるサービスアカウント。

  • 型: string
  • 重要度: 高
kafka.api.secret
  • 型: password
  • 重要度: 高

Databricks Delta Lake への接続方法(How should we connect to your Databricks Delta Lake?)

delta.lake.host.name

Delta Lake との接続に使用されるホスト名。

  • 型: string
  • 重要度: 高
delta.lake.http.path

Delta Lake との接続に使用される HTTP パス。

  • 型: string
  • 重要度: 高
delta.lake.token

Delta Lake に JDBC を使用して接続する際に、ユーザーの認証に使用される個人アクセストークン。

  • 型: password
  • 重要度: 高
delta.lake.catalog

送信先データベースとテーブルが配置されている送信先カタログ。

  • 型: string
  • デフォルト: ""
  • 重要度: 低
delta.lake.database

送信先テーブルが配置されている送信先データベース。

  • 型: string
  • デフォルト: default
  • 重要度: 低
delta.lake.table.format

送信先テーブル名の形式制御文字列。送信元のトピック名を表すプレースホルダーとして「${topic}」を含めることができます。たとえば、トピック「orders」の場合、kafka_${topic} はテーブル名「kafka_orders」にマップされます。

  • 型: string
  • デフォルト: ${topic}
  • 重要度: 中
delta.lake.topic2table.map

トピックとテーブルのマッピング(省略可)。フォーマット: コンマ区切りのタプル。たとえば、<topic-1>:<table-1>,<topic-2>:<table-2>,... のように指定します。

  • 型: string
  • デフォルト: ""
  • 重要度: 低
delta.lake.table.auto.create

送信先テーブルが存在しない場合に、レコードスキーマに基づいてテーブルを自動的に作成するかどうかを指定します。

  • 型: boolean
  • デフォルト: false
  • 重要度: 中
delta.lake.tables.location

Delta Lake テーブルのデータが保管される場所。s3://<your-s3-bucket>/tmp/ を設定した場合、Delta Lake データは s3://<your-s3-bucket>/tmp/ に保管されます。Databricks Delta Lake インスタンスの AWS IAM に、指定したディレクトリにレコードを書き込むアクセス許可があることと、指定したディレクトリ(tmp など)が存在することを確認します。

  • 型: string
  • デフォルト: ""
  • 重要度: 中
delta.lake.table2partition.map

Map of tables to partition fields (optional). Format: comma-separated tuples. For example: <table-1>:<partition-1>,<table-2>:<partition-2>,... Note that you can specify multiple partitions per table. Be sure to add a separate tuple for each partition. For example: <table-1>:<partition-1>, <table-1>:<partition-2>), <table-2>:<partition-3>

  • 型: string
  • デフォルト: ""
  • 重要度: 低

Amazon S3 の詳細(Amazon S3 details)

staging.s3.access.key.id
  • 型: password
  • 重要度: 高
staging.s3.secret.access.key
  • 型: password
  • 重要度: 高
flush.interval.ms

ファイルのコミットを定期的に呼び出すための間隔(ミリ秒)。この構成により、ファイルのコミットが、構成された間隔で呼び出されようになります。コミットする時間は選択されたタイムゾーンの 00:00 に調整されます。前回コミットされた時間やメッセージ数に関係なく、スケジュールで指定された時間にコミットが実行されます。この構成は、毎正時など、サーバーの現在時刻に基づいてデータをコミットする必要がある場合に役立ちます。

  • 型: long
  • Default: 300000 (5 minutes)
  • 重要度: 中
staging.bucket.name

Kafka からファイルが書き込まれてから Databricks Delta Lake テーブルにコピーされる S3 ステージングバケット。Confluent Cloud クラスターと同じリージョンに存在している必要があります。

  • 型: string
  • 重要度: 高

このコネクターのタスク数(Number of tasks for this connector)

tasks.max
  • 型: int
  • Valid Values: [1,...]
  • 重要度: 高

次のステップ

参考

フルマネージド型の Confluent Cloud コネクターが Confluent Cloud ksqlDB でどのように動作するかを示す例については、「Cloud ETL のデモ」を参照してください。この例では、Confluent CLI を使用して Confluent Cloud のリソースを管理する方法についても説明しています。

../../_images/topology.ja.png