重要
このページの日本語コンテンツは古くなっている可能性があります。最新の英語版コンテンツをご覧になるには、こちらをクリックしてください。
Azure Synapse Analytics Sink Connector for Confluent Cloud¶
注釈
Confluent Platform 用にコネクターをローカルにインストールする場合は、「Azure Synapse Analytics Sink Connector for Confluent Platform」を参照してください。
Azure Synapse Analytics Sink Connector を使用すると、Apache Kafka® トピックから Azure Synapse Analytics にデータをエクスポートできます。コネクターによって、Kafka からデータがポーリングされ、トピックのサブスクリプションに基づいてデータウェアハウスにデータが書き込まれます。テーブルの自動作成と、制限付きの自動進化もサポートされています。このコネクターは、Azure Synapse Analytics SQL プール と互換性があります。
機能¶
Azure Synapse Analytics Sink Connector は、次の機能をサポートしています。
少なくとも 1 回のデリバリー: コネクターによって、Kafka のトピックからのレコードが少なくとも 1 回は配信されることが保証されます。
複数のタスクのサポート: このコネクターは、1 つまたは複数のタスクの実行をサポートしています。タスクが多いほどパフォーマンスが向上する可能性があります。
自動作成と自動進化のサポート:
Auto create table (
auto.create
)を有効にすると、送信先テーブルが存在しない場合に、コネクターでテーブルを作成できます。コネクターは、テーブル定義のベースとしてレコードスキーマを使用します。テーブルは、レコードがトピックから消費される際に作成されます。Auto add columns (
auto.evolve
)を有効にすると、存在しない列がある新しいレコードでは、コネクターが送信先テーブルに対してalter
コマンドを発行して、制限付きの自動進化を実行できます。コネクターは、新しいレコードに列のみを追加します。既存のレコードでは、新しい列の値は"null"
となります。重要
スキーマ進化の後方互換性を維持するため、レコードスキーマの新規フィールドは、省略可能にするか、デフォルト値を設定する必要があります。
サポートされるデータフォーマット: このコネクターは、Avro、JSON スキーマ(JSON_SR)、および Protobuf の入力フォーマットをサポートします。これらの スキーマレジストリ ベースのフォーマットを使用するには、Schema Registry を有効にしておく必要があります。
Connect 用の Confluent Cloud API の使用に関する詳細とサンプルについては、「Confluent Cloud API for Connect」セクションを参照してください。
制限¶
以下の情報を確認してください。
- コネクターの制限事項については、Azure Synapse Analytics Sink Connector の制限事項を参照してください。
- 1 つ以上の Single Message Transforms(SMT)を使用する場合は、「SMT の制限」を参照してください。
- Confluent Cloud Schema Registry を使用する場合は、「スキーマレジストリ Enabled Environments」を参照してください。
クイックスタート¶
このクイックスタートを使用して、フルマネージド型の Azure Synapse Analytics Sink Connector の利用を開始することができます。このクイックスタートでは、コネクターを選択し、イベントをストリーミングするようにコネクターを構成するための基本的な方法について説明します。
- 前提条件
- Microsoft Azure (Azure)上の Confluent Cloud クラスターへのアクセスを許可されていること。
- コネクターの構成で使用する、認可された SQL データウェアハウスユーザーとパスワード。
- Confluent CLI がインストールされ、クラスター用に構成されていること。「Confluent CLI のインストール」を参照してください。
- スキーマレジストリ ベースのフォーマット(Avro、JSON_SR(JSON スキーマ)、Protobuf など)を使用するには、Schema Registry を有効にしておく必要があります。詳細については、「スキーマレジストリ Enabled Environments」を参照してください。
- シンクコネクターを作成する前に、Confluent Cloud クラスター上にソース Kafka トピックが 1 つ以上存在している必要があります。
Confluent Cloud Console の使用¶
ステップ 1: Confluent Cloud クラスターを起動します。¶
インストール手順については、「Quick Start for Confluent Cloud」を参照してください。
ステップ 2: コネクターを追加します。¶
左のナビゲーションメニューの Data integration をクリックし、Connectors をクリックします。クラスター内に既にコネクターがある場合は、+ Add connector をクリックします。
ステップ 4: コネクターの詳細情報を入力します。¶
注釈
- すべての 前提条件 を満たしていることを確認してください。
- アスタリスク( * )は必須項目であることを示しています。
Add Azure Synapse Analytics Sink Connector 画面で、以下を実行します。
既に Kafka トピックを用意している場合は、Topics リストから接続するトピックを選択します。
新しいトピックを作成するには、+Add new topic をクリックします。
- Kafka Cluster credentials で Kafka クラスターの認証情報の指定方法を選択します。以下のいずれかのオプションを選択できます。
- Global Access: コネクターは、ユーザーがアクセス権限を持つすべての対象にアクセスできます。グローバルアクセスの場合、コネクターのアクセス権限は、ユーザーのアカウントにリンクされます。このオプションは本稼働環境では推奨されません。
- Granular access: コネクターのアクセスが制限されます。コネクターのアクセス権限は サービスアカウント から制御できます。本稼働環境にはこのオプションをお勧めします。
- Use an existing API key: 保存済みの API キーおよびシークレット部分を入力できます。API キーとシークレットを入力するか Cloud Console でこれらを生成することもできます。
- Continue をクリックします。
- SQL Server Name に Azure SQL サーバー名を入力します。SQL データウェアハウスサーバー名のフォーマットは、
<my_server_name>.database.windows.net
です。 - SQL login フィールドに、専用の SQL プール(または SQL データベース)のログインを入力します。
- Login password に、SQL ログインに関連付けられているパスワードを入力します。
- Dedicated SQL pool フィールドに、専用の SQL プールの名前を入力します。
注釈
Cloud Console に表示されない構成プロパティでは、デフォルト値が使用されます。すべてのプロパティの値と定義については、「構成プロパティ」を参照してください。
Input Kafka record value で Kafka 入力レコード値のフォーマット(Kafka トピックから送られるデータ)を AVRO、JSON_SR(JSON スキーマ)、または PROTOBUF から選択します。スキーマベースのメッセージフォーマット(Avro、JSON_SR(JSON スキーマ)、Protobuf など)を使用するには、有効なスキーマが Schema Registry に存在する必要があります。
Show advanced configurations
Table name format: 送信先テーブルの名前のフォーマット文字列。元のトピック名を表すプレースホルダーとして
${topic}
を含めることができます。たとえば、orders
という名前の Kafka トピックに基づいてkafka-orders
という名前のテーブルを作成するには、このフィールドにkafka-${topic}
と入力します。Database timezone: コネクターで時間ベースの値を挿入する場合に使用する必要がある JDBC タイムゾーンの名前。
Batch size: 送信先テーブルへの挿入で、バッチ処理を試みる際のレコード数を指定します(可能な場合)。
Auto create table: 送信先テーブルが存在しない場合に、
CREATE
を発行して、テーブルを自動的に作成するかどうかを指定します。Auto add columns: レコードスキーマに関連する列がテーブルスキーマに存在しない場合に、
ALTER
を発行して、列を自動的に追加するかどうかを指定します。When to quote SQL identifiers: SQL ステートメントでテーブル名、列名、その他の識別子をいつクォートするかを指定します。
Fields included: レコード値フィールド名のコンマ区切りリスト。空の場合は、レコード値からすべてのフィールドが使用されます。リストを設定した場合は、目的のフィールドのフィルター処理に使用されます。
すべてのプロパティの値と定義については、「構成プロパティ」を参照してください。
Continue をクリックします。
選択するトピックのパーティション数に基づいて、推奨タスク数が表示されます。
- 推奨されたタスク数を変更するには、Tasks フィールドに、コネクターで使用する タスク の数を入力します。
- Continue をクリックします。
接続の詳細情報を確認します。
Launch をクリックします。
コネクターのステータスが Provisioning から Running に変わります。
ステップ 5: レコードを確認します。¶
データが Kafka からデータウェアハウスにエクスポートされていることを確認します。
Connect 用の Confluent Cloud API の使用に関する詳細とサンプルについては、「Confluent Cloud API for Connect」セクションを参照してください。
ちなみに
コネクターを起動すると、デッドレターキューのトピックが自動的に作成されます。詳細については、「Confluent Cloud デッドレターキュー」を参照してください。
Confluent CLI の使用¶
以下の手順に従うと、Confluent CLI を使用してコネクターをセットアップし、実行できます。
注釈
- すべての 前提条件 を満たしていることを確認してください。
- コマンド例では Confluent CLI バージョン 2 を使用しています。詳細については、「Confluent CLI v2 への移行 <https://docs.confluent.io/confluent-cli/current/migrate.html#cli-migrate>`__」を参照してください。
ステップ 2: コネクターの必須の構成プロパティを表示します。¶
以下のコマンドを実行して、コネクターの必須プロパティを表示します。
confluent connect plugin describe <connector-catalog-name>
例:
confluent connect plugin describe AzureSqlDwSink
出力例:
Following are the required configs:
connector.class: AzureSqlDwSink
input.data.format
name
kafka.auth.mode
kafka.api.key
kafka.api.secret
azure.sql.dw.server.name
azure.sql.dw.user
azure.sql.dw.password
azure.sql.dw.database.name
tasks.max
topics
ステップ 3: コネクターの構成ファイルを作成します。¶
コネクター構成プロパティを含む JSON ファイルを作成します。以下の例は、コネクターの必須プロパティを示しています。
{
"name": "AzureSqlDwSinkConnector_0",
"config": {
"topics": "pageviews",
"input.data.format": "AVRO",
"connector.class": "AzureSqlDwSink",
"name": "AzureSqlDwSinkConnector_0",
"kafka.auth.mode": "KAFKA_API_KEY",
"kafka.api.key": "<my-kafka-api-key>",
"kafka.api.secret": "<my-kafka-api-secret>",
"azure.sql.dw.server.name": "azure-sql-dw-sink.db.windows.net",
"azure.sql.dw.user": "<db_user>",
"azure.sql.dw.password": "**************",
"azure.sql.dw.database.name": "<db_name>",
"db.timezone": "UTC",
"auto.create": "true",
"auto.evolve": "true",
"tasks.max": "1"
}
}
以下のプロパティ定義に注意してください。
"name"
: 新しいコネクターの名前を設定します。"topics"
: 特定のトピック名を指定するか、複数のトピック名をコンマ区切りにしたリストを指定します。"input.data.format"
: Kafka 入力レコード値のフォーマット(Kafka トピックから送られるデータ)を設定します。指定可能なエントリは、AVRO、JSON_SR、および PROTOBUF です。スキーマベースのメッセージフォーマット(Avro、JSON_SR(JSON スキーマ)、Protobuf など)を使用する場合は、Schema Registry を構成しておく必要があります。"connector.class"
: コネクターのプラグイン名を指定します。
"kafka.auth.mode"
: 使用するコネクターの認証モードを指定します。オプションはSERVICE_ACCOUNT
またはKAFKA_API_KEY
(デフォルト)です。API キーとシークレットを使用するには、構成プロパティkafka.api.key
とkafka.api.secret
を構成例(前述)のように指定します。サービスアカウント を使用するには、プロパティkafka.service.account.id=<service-account-resource-ID>
に リソース ID を指定します。使用できるサービスアカウントのリソース ID のリストを表示するには、次のコマンドを使用します。confluent iam service-account list
例:
confluent iam service-account list Id | Resource ID | Name | Description +---------+-------------+-------------------+------------------- 123456 | sa-l1r23m | sa-1 | Service account 1 789101 | sa-l4d56p | sa-2 | Service account 2
"azure.sql.<>"
: Azure SQL データウェアハウス接続の詳細情報を入力します。Azure SQL データウェアハウスサーバー名は、<my_server_name>.db.windows.net
というフォーマットになります。"db.timezone"
: 有効なデータベースのタイムゾーン を入力します。デフォルトはUTC
です。"auto.create"
:true
に設定すると、送信先テーブルが存在しない場合に、コネクターがテーブルを作成します。コネクターでは、テーブル定義のベースとしてレコードスキーマが使用されます。テーブルは、レコードがトピックから消費される際に作成されます。"auto.evolve"
:true
に設定すると、コネクターでは制限付きで自動進化を実行できます。存在しない列がある新しいレコードでは、コネクターが送信先テーブルに対してalter
コマンドを発行します。コネクターは、新しいレコードに列のみを追加します。既存のレコードでは、新しい列の値は"null"
となります。"tasks.max"
: このコネクターで使用できる タスク の最大数を入力します。タスクが多いほどパフォーマンスが向上する可能性があります。
Single Message Transforms: CLI を使用する SMT の追加の詳細については、Single Message Transforms(SMT) のドキュメントを参照してください。
すべてのプロパティの値と説明については、「構成プロパティ」を参照してください。
ステップ 4: プロパティファイルを読み込み、コネクターを作成します。¶
以下のコマンドを入力して、構成を読み込み、コネクターを起動します。
confluent connect create --config <file-name>.json
例:
confluent connect create --config azure-synapse-analytics-sink-config.json
出力例:
Created connector AzureSqlDwSinkConnector_0 lcc-do6vzd
ステップ 5: コネクターのステータスを確認します。¶
以下のコマンドを入力して、コネクターのステータスを確認します。
confluent connect list
出力例:
ID | Name | Status | Type | Trace
+------------+----------------------------+---------+------+-------+
lcc-do6vzd | AzureSqlDwSinkConnector_0 | RUNNING | sink | |
ステップ 6: レコードを確認します。¶
データが Kafka からデータウェアハウスにエクスポートされていることを確認します。
Connect 用の Confluent Cloud API の使用に関する詳細とサンプルについては、「Confluent Cloud API for Connect」セクションを参照してください。
ちなみに
コネクターを起動すると、デッドレターキューのトピックが自動的に作成されます。詳細については、「Confluent Cloud デッドレターキュー」を参照してください。
構成プロパティ¶
このコネクターでは、以下のコネクター構成プロパティを使用します。
データの取得元とするトピック(Which topics do you want to get data from?)¶
topics
特定のトピック名を指定するか、複数のトピック名をコンマ区切りにしたリストを指定します。
- 型: list
- 重要度: 高
入力メッセージ(Input messages)¶
input.data.format
Kafka 入力レコード値のフォーマットを設定します。指定可能なエントリは、AVRO、JSON_SR、または PROTOBUF です。スキーマベースのメッセージフォーマット(AVRO、JSON_SR、PROTOBUF など)を使用する場合は、Confluent Cloud Schema Registry を構成しておく必要がある点に注意してください。
- 型: string
- 重要度: 高
データへの接続方法(How should we connect to your data?)¶
name
コネクターの名前を設定します。
- 型: string
- 指定可能な値: 最大 64 文字の文字列
- 重要度: 高
Kafka クラスターの認証情報(Kafka Cluster credentials)¶
kafka.auth.mode
Kafka の認証モード。KAFKA_API_KEY または SERVICE_ACCOUNT を指定できます。デフォルトは KAFKA_API_KEY モードです。
- 型: string
- デフォルト: KAFKA_API_KEY
- 指定可能な値: KAFKA_API_KEY、SERVICE_ACCOUNT
- 重要度: 高
kafka.api.key
- 型: password
- 重要度: 高
kafka.service.account.id
Kafka クラスターとの通信用の API キーを生成するために使用されるサービスアカウント。
- 型: string
- 重要度: 高
kafka.api.secret
- 型: password
- 重要度: 高
Azure SQL Data Warehouse¶
azure.sql.dw.server.name
有効な形式の完全な Azure SQL サーバー名。たとえば、<server-name>.database.windows.net のように設定します。
- 型: string
- 重要度: 高
azure.sql.dw.user
専用の SQL プール(または SQL データベース)のログイン。
- 型: string
- 重要度: 高
azure.sql.dw.password
SQL ログインに関連付けられているパスワード。
- 型: password
- 重要度: 高
azure.sql.dw.database.name
専用の SQL プール(または SQL データベース)の名前。
- 型: string
- 重要度: 高
データマッピング(Data Mapping)¶
table.name.format
送信先テーブル名のフォーマット文字列。送信元のトピック名を表すプレースホルダーとして「${topic}」を含めることができます。
たとえば、トピック「orders」の場合、
kafka_${topic}
はテーブル名「kafka_orders」にマップされます。- 型: string
- デフォルト: ${topic}
- 重要度: 中
fields.whitelist
レコード値フィールド名のコンマ区切りのリスト。空の場合は、レコード値のすべてのフィールドが利用されます。リストを設定した場合は、目的のフィールドのフィルター処理に使用されます。
- 型: list
- 重要度: 中
db.timezone
コネクターで時間ベースの値を挿入する場合に使用する必要がある JDBC タイムゾーンの名前。デフォルトは UTC です。
- 型: string
- デフォルト: UTC
- 重要度: 中
書き込み(Writes)¶
batch.size
書き込み先テーブルへの挿入で、バッチ処理を試みる際のレコード数を指定します(可能な場合)。
- 型: int
- デフォルト: 3000
- 指定可能な値: [1,...,3000]
- 重要度: 中
SQL/DDL サポート(SQL/DDL Support)¶
auto.create
書き込み先テーブルが存在しない場合に、
CREATE
を発行して、レコードスキーマに基づくテーブルを自動的に作成するかどうかを指定します。- 型: boolean
- デフォルト: false
- 重要度: 中
auto.evolve
レコードスキーマに関連する列がテーブルスキーマに存在しない場合に、
ALTER
を発行して、列を自動的に追加するかどうかを指定します。- 型: boolean
- デフォルト: false
- 重要度: 中
quote.sql.identifiers
SQL ステートメントで、テーブル名、列名、その他の識別子をいつクォートするかを指定します。後方互換性のため、デフォルトは「always」となっています。
- 型: string
- デフォルト: ALWAYS
- 指定可能な値: ALWAYS、NEVER
- 重要度: 中
このコネクターのタスク数(Number of tasks for this connector)¶
tasks.max
- 型: int
- 指定可能な値: [1,...]
- 重要度: 高
次のステップ¶
参考
フルマネージド型の Confluent Cloud コネクターが Confluent Cloud ksqlDB でどのように動作するかを示す例については、「Cloud ETL のデモ」を参照してください。この例では、Confluent CLI を使用して Confluent Cloud のリソースを管理する方法についても説明しています。