重要

このページの日本語コンテンツは古くなっている可能性があります。最新の英語版コンテンツをご覧になるには、こちらをクリックしてください。

Azure Synapse Analytics Sink Connector for Confluent Cloud

注釈

Confluent Platform 用にコネクターをローカルにインストールする場合は、「Azure Synapse Analytics Sink Connector for Confluent Platform」を参照してください。

Azure Synapse Analytics Sink Connector を使用すると、Apache Kafka® トピックから Azure Synapse Analytics にデータをエクスポートできます。コネクターによって、Kafka からデータがポーリングされ、トピックのサブスクリプションに基づいてデータウェアハウスにデータが書き込まれます。テーブルの自動作成と、制限付きの自動進化もサポートされています。このコネクターは、Azure Synapse Analytics SQL プール と互換性があります。

機能

Azure Synapse Analytics Sink Connector は、次の機能をサポートしています。

  • 少なくとも 1 回のデリバリー: コネクターによって、Kafka のトピックからのレコードが少なくとも 1 回は配信されることが保証されます。

  • 複数のタスクのサポート: このコネクターは、1 つまたは複数のタスクの実行をサポートしています。タスクが多いほどパフォーマンスが向上する可能性があります。

  • 自動作成と自動進化のサポート:

    • Auto create tableauto.create)を有効にすると、送信先テーブルが存在しない場合に、コネクターでテーブルを作成できます。コネクターは、テーブル定義のベースとしてレコードスキーマを使用します。テーブルは、レコードがトピックから消費される際に作成されます。

    • Auto add columnsauto.evolve)を有効にすると、存在しない列がある新しいレコードでは、コネクターが送信先テーブルに対して alter コマンドを発行して、制限付きの自動進化を実行できます。コネクターは、新しいレコードに列のみを追加します。既存のレコードでは、新しい列の値は "null" となります。

      重要

      スキーマ進化の後方互換性を維持するため、レコードスキーマの新規フィールドは、省略可能にするか、デフォルト値を設定する必要があります。

  • サポートされるデータフォーマット: このコネクターは、Avro、JSON スキーマ(JSON_SR)、および Protobuf の入力フォーマットをサポートします。これらの スキーマレジストリ ベースのフォーマットを使用するには、Schema Registry を有効にしておく必要があります。

Connect 用の Confluent Cloud API の使用に関する詳細とサンプルについては、「Confluent Cloud API for Connect」セクションを参照してください。

制限

以下の情報を確認してください。

クイックスタート

このクイックスタートを使用して、フルマネージド型の Azure Synapse Analytics Sink Connector の利用を開始することができます。このクイックスタートでは、コネクターを選択し、イベントをストリーミングするようにコネクターを構成するための基本的な方法について説明します。

前提条件
  • Microsoft Azure (Azure)上の Confluent Cloud クラスターへのアクセスを許可されていること。
  • コネクターの構成で使用する、認可された SQL データウェアハウスユーザーとパスワード。
  • Confluent CLI がインストールされ、クラスター用に構成されていること。「Confluent CLI のインストール」を参照してください。
  • スキーマレジストリ ベースのフォーマット(Avro、JSON_SR(JSON スキーマ)、Protobuf など)を使用するには、Schema Registry を有効にしておく必要があります。詳細については、「スキーマレジストリ Enabled Environments」を参照してください。
  • シンクコネクターを作成する前に、Confluent Cloud クラスター上にソース Kafka トピックが 1 つ以上存在している必要があります。

Confluent Cloud Console の使用

ステップ 1: Confluent Cloud クラスターを起動します。

インストール手順については、「Quick Start for Confluent Cloud」を参照してください。

ステップ 2: コネクターを追加します。

左のナビゲーションメニューの Data integration をクリックし、Connectors をクリックします。クラスター内に既にコネクターがある場合は、+ Add connector をクリックします。

ステップ 3: コネクターを選択します。

Click the ServiceNow Sink connector card.

Azure Synapse Analytics Sink Connector Card

ステップ 4: コネクターの詳細情報を入力します。

注釈

  • すべての 前提条件 を満たしていることを確認してください。
  • アスタリスク( * )は必須項目であることを示しています。

Add Azure Synapse Analytics Sink Connector 画面で、以下を実行します。

既に Kafka トピックを用意している場合は、Topics リストから接続するトピックを選択します。

新しいトピックを作成するには、+Add new topic をクリックします。

ステップ 5: レコードを確認します。

データが Kafka からデータウェアハウスにエクスポートされていることを確認します。

Connect 用の Confluent Cloud API の使用に関する詳細とサンプルについては、「Confluent Cloud API for Connect」セクションを参照してください。

ちなみに

コネクターを起動すると、デッドレターキューのトピックが自動的に作成されます。詳細については、「Confluent Cloud デッドレターキュー」を参照してください。

Confluent CLI の使用

以下の手順に従うと、Confluent CLI を使用してコネクターをセットアップし、実行できます。

注釈

ステップ 1: 使用可能なコネクターをリスト表示します。

以下のコマンドを入力して、使用可能なコネクターをリスト表示します。

confluent connect plugin list

ステップ 2: コネクターの必須の構成プロパティを表示します。

以下のコマンドを実行して、コネクターの必須プロパティを表示します。

confluent connect plugin describe <connector-catalog-name>

例:

confluent connect plugin describe AzureSqlDwSink

出力例:

Following are the required configs:
connector.class: AzureSqlDwSink
input.data.format
name
kafka.auth.mode
kafka.api.key
kafka.api.secret
azure.sql.dw.server.name
azure.sql.dw.user
azure.sql.dw.password
azure.sql.dw.database.name
tasks.max
topics

ステップ 3: コネクターの構成ファイルを作成します。

コネクター構成プロパティを含む JSON ファイルを作成します。以下の例は、コネクターの必須プロパティを示しています。

{
  "name": "AzureSqlDwSinkConnector_0",
  "config": {
    "topics": "pageviews",
    "input.data.format": "AVRO",
    "connector.class": "AzureSqlDwSink",
    "name": "AzureSqlDwSinkConnector_0",
    "kafka.auth.mode": "KAFKA_API_KEY",
    "kafka.api.key": "<my-kafka-api-key>",
    "kafka.api.secret": "<my-kafka-api-secret>",
    "azure.sql.dw.server.name": "azure-sql-dw-sink.db.windows.net",
    "azure.sql.dw.user": "<db_user>",
    "azure.sql.dw.password": "**************",
    "azure.sql.dw.database.name": "<db_name>",
    "db.timezone": "UTC",
    "auto.create": "true",
    "auto.evolve": "true",
    "tasks.max": "1"
  }
}

以下のプロパティ定義に注意してください。

  • "name": 新しいコネクターの名前を設定します。
  • "topics": 特定のトピック名を指定するか、複数のトピック名をコンマ区切りにしたリストを指定します。
  • "input.data.format": Kafka 入力レコード値のフォーマット(Kafka トピックから送られるデータ)を設定します。指定可能なエントリは、AVROJSON_SR、および PROTOBUF です。スキーマベースのメッセージフォーマット(Avro、JSON_SR(JSON スキーマ)、Protobuf など)を使用する場合は、Schema Registry を構成しておく必要があります。
  • "connector.class": コネクターのプラグイン名を指定します。
  • "kafka.auth.mode": 使用するコネクターの認証モードを指定します。オプションは SERVICE_ACCOUNT または KAFKA_API_KEY (デフォルト)です。API キーとシークレットを使用するには、構成プロパティ kafka.api.keykafka.api.secret を構成例(前述)のように指定します。サービスアカウント を使用するには、プロパティ kafka.service.account.id=<service-account-resource-ID>リソース ID を指定します。使用できるサービスアカウントのリソース ID のリストを表示するには、次のコマンドを使用します。

    confluent iam service-account list
    

    例:

    confluent iam service-account list
    
       Id     | Resource ID |       Name        |    Description
    +---------+-------------+-------------------+-------------------
       123456 | sa-l1r23m   | sa-1              | Service account 1
       789101 | sa-l4d56p   | sa-2              | Service account 2
    
  • "azure.sql.<>": Azure SQL データウェアハウス接続の詳細情報を入力します。Azure SQL データウェアハウスサーバー名は、<my_server_name>.db.windows.net というフォーマットになります。

  • "db.timezone": 有効なデータベースのタイムゾーン を入力します。デフォルトは UTC です。

  • "auto.create": true に設定すると、送信先テーブルが存在しない場合に、コネクターがテーブルを作成します。コネクターでは、テーブル定義のベースとしてレコードスキーマが使用されます。テーブルは、レコードがトピックから消費される際に作成されます。

  • "auto.evolve": true に設定すると、コネクターでは制限付きで自動進化を実行できます。存在しない列がある新しいレコードでは、コネクターが送信先テーブルに対して alter コマンドを発行します。コネクターは、新しいレコードに列のみを追加します。既存のレコードでは、新しい列の値は "null" となります。

  • "tasks.max": このコネクターで使用できる タスク の最大数を入力します。タスクが多いほどパフォーマンスが向上する可能性があります。

Single Message Transforms: CLI を使用する SMT の追加の詳細については、Single Message Transforms(SMT) のドキュメントを参照してください。

すべてのプロパティの値と説明については、「構成プロパティ」を参照してください。

ステップ 4: プロパティファイルを読み込み、コネクターを作成します。

以下のコマンドを入力して、構成を読み込み、コネクターを起動します。

confluent connect create --config <file-name>.json

例:

confluent connect create --config azure-synapse-analytics-sink-config.json

出力例:

Created connector AzureSqlDwSinkConnector_0 lcc-do6vzd

ステップ 5: コネクターのステータスを確認します。

以下のコマンドを入力して、コネクターのステータスを確認します。

confluent connect list

出力例:

ID           |             Name           | Status  | Type | Trace
+------------+----------------------------+---------+------+-------+
lcc-do6vzd   | AzureSqlDwSinkConnector_0  | RUNNING | sink |       |

ステップ 6: レコードを確認します。

データが Kafka からデータウェアハウスにエクスポートされていることを確認します。

Connect 用の Confluent Cloud API の使用に関する詳細とサンプルについては、「Confluent Cloud API for Connect」セクションを参照してください。

ちなみに

コネクターを起動すると、デッドレターキューのトピックが自動的に作成されます。詳細については、「Confluent Cloud デッドレターキュー」を参照してください。

構成プロパティ

このコネクターでは、以下のコネクター構成プロパティを使用します。

データの取得元とするトピック(Which topics do you want to get data from?)

topics

特定のトピック名を指定するか、複数のトピック名をコンマ区切りにしたリストを指定します。

  • 型: list
  • 重要度: 高

入力メッセージ(Input messages)

input.data.format

Kafka 入力レコード値のフォーマットを設定します。指定可能なエントリは、AVRO、JSON_SR、または PROTOBUF です。スキーマベースのメッセージフォーマット(AVRO、JSON_SR、PROTOBUF など)を使用する場合は、Confluent Cloud Schema Registry を構成しておく必要がある点に注意してください。

  • 型: string
  • 重要度: 高

データへの接続方法(How should we connect to your data?)

name

コネクターの名前を設定します。

  • 型: string
  • 指定可能な値: 最大 64 文字の文字列
  • 重要度: 高

Kafka クラスターの認証情報(Kafka Cluster credentials)

kafka.auth.mode

Kafka の認証モード。KAFKA_API_KEY または SERVICE_ACCOUNT を指定できます。デフォルトは KAFKA_API_KEY モードです。

  • 型: string
  • デフォルト: KAFKA_API_KEY
  • 指定可能な値: KAFKA_API_KEY、SERVICE_ACCOUNT
  • 重要度: 高
kafka.api.key
  • 型: password
  • 重要度: 高
kafka.service.account.id

Kafka クラスターとの通信用の API キーを生成するために使用されるサービスアカウント。

  • 型: string
  • 重要度: 高
kafka.api.secret
  • 型: password
  • 重要度: 高

Azure SQL Data Warehouse

azure.sql.dw.server.name

有効な形式の完全な Azure SQL サーバー名。たとえば、<server-name>.database.windows.net のように設定します。

  • 型: string
  • 重要度: 高
azure.sql.dw.user

専用の SQL プール(または SQL データベース)のログイン。

  • 型: string
  • 重要度: 高
azure.sql.dw.password

SQL ログインに関連付けられているパスワード。

  • 型: password
  • 重要度: 高
azure.sql.dw.database.name

専用の SQL プール(または SQL データベース)の名前。

  • 型: string
  • 重要度: 高

データマッピング(Data Mapping)

table.name.format

送信先テーブル名のフォーマット文字列。送信元のトピック名を表すプレースホルダーとして「${topic}」を含めることができます。

たとえば、トピック「orders」の場合、kafka_${topic} はテーブル名「kafka_orders」にマップされます。

  • 型: string
  • デフォルト: ${topic}
  • 重要度: 中
fields.whitelist

レコード値フィールド名のコンマ区切りのリスト。空の場合は、レコード値のすべてのフィールドが利用されます。リストを設定した場合は、目的のフィールドのフィルター処理に使用されます。

  • 型: list
  • 重要度: 中
db.timezone

コネクターで時間ベースの値を挿入する場合に使用する必要がある JDBC タイムゾーンの名前。デフォルトは UTC です。

  • 型: string
  • デフォルト: UTC
  • 重要度: 中

書き込み(Writes)

batch.size

書き込み先テーブルへの挿入で、バッチ処理を試みる際のレコード数を指定します(可能な場合)。

  • 型: int
  • デフォルト: 3000
  • 指定可能な値: [1,...,3000]
  • 重要度: 中

SQL/DDL サポート(SQL/DDL Support)

auto.create

書き込み先テーブルが存在しない場合に、CREATE を発行して、レコードスキーマに基づくテーブルを自動的に作成するかどうかを指定します。

  • 型: boolean
  • デフォルト: false
  • 重要度: 中
auto.evolve

レコードスキーマに関連する列がテーブルスキーマに存在しない場合に、ALTER を発行して、列を自動的に追加するかどうかを指定します。

  • 型: boolean
  • デフォルト: false
  • 重要度: 中
quote.sql.identifiers

SQL ステートメントで、テーブル名、列名、その他の識別子をいつクォートするかを指定します。後方互換性のため、デフォルトは「always」となっています。

  • 型: string
  • デフォルト: ALWAYS
  • 指定可能な値: ALWAYS、NEVER
  • 重要度: 中

このコネクターのタスク数(Number of tasks for this connector)

tasks.max
  • 型: int
  • 指定可能な値: [1,...]
  • 重要度: 高

次のステップ

参考

フルマネージド型の Confluent Cloud コネクターが Confluent Cloud ksqlDB でどのように動作するかを示す例については、「Cloud ETL のデモ」を参照してください。この例では、Confluent CLI を使用して Confluent Cloud のリソースを管理する方法についても説明しています。

../_images/topology.ja.png