重要

このページの日本語コンテンツは古くなっている可能性があります。最新の英語版コンテンツをご覧になるには、こちらをクリックしてください。

Azure Service Bus Source Connector for Confluent Cloud

注釈

Confluent Platform 用にコネクターをローカルにインストールする場合は、「Azure Service Bus Source Connector for Confluent Platform」を参照してください。

Azure Service Bus は、アプリケーションとサービス間の情報送信に使用できる、マルチテナントのクラウドメッセージングサービスです。Kafka Connect Azure Service Bus Source Connector for Confluent Cloud では、Azure Service Bus キューまたはトピックからデータが読み込まれ、Kafka のトピックに保存されます。

機能

Azure Service Bus Source Connector は、次の機能をサポートしています。

  • トピックの自動作成: このコネクターは Kafka トピックを自動的に作成できます。
  • 少なくとも 1 回のデリバリー: コネクターによって、レコードが Kafka のトピックに少なくとも 1 回は配信されることが保証されます。
  • 複数のタスクのサポート: このコネクターは、1 つまたは複数のタスクの実行をサポートしています。
  • サポートされるデータフォーマット: このコネクターは、Avro、JSON スキーマ(JSON-SR)、Protobuf、および JSON(スキーマレス)出力フォーマットをサポートします。スキーマレジストリ ベースのフォーマット(Avro、JSON スキーマ、Protobuf など)を使用するには、Schema Registry を有効にしておく必要があります。詳細については、「スキーマレジストリ Enabled Environments」を参照してください。

Connect 用の Confluent Cloud API の使用に関する詳細とサンプルについては、「Confluent Cloud API for Connect」セクションを参照してください。

制限

以下の情報を確認してください。

クイックスタート

このクイックスタートを使用して、Confluent Cloud Azure Service Bus Source Connector の利用を開始することができます。このクイックスタートでは、コネクターを選択し、イベントをストリーミングするようにコネクターを構成するための基本的な方法について説明します。

前提条件
  • アマゾンウェブサービス (AWS)、Microsoft Azure (Azure)、または Google Cloud Platform (GCP)上の Confluent Cloud クラスターへのアクセスを許可されていること。
  • Confluent CLI がインストールされ、クラスター用に構成されていること。「Confluent CLI のインストール」を参照してください。
  • スキーマレジストリ ベースのフォーマット(Avro、JSON_SR(JSON スキーマ)、Protobuf など)を使用するには、Schema Registry を有効にしておく必要があります。詳細については、「スキーマレジストリ Enabled Environments」を参照してください。
  • Azure Service Bus 接続の詳細情報が必要です。詳細については、Azure Service Bus のドキュメント を参照してください。

Confluent Cloud Console の使用

ステップ 1: Confluent Cloud クラスターを起動します。

インストール手順については、「Quick Start for Confluent Cloud」を参照してください。

ステップ 2: コネクターを追加します。

左のナビゲーションメニューの Data integration をクリックし、Connectors をクリックします。クラスター内に既にコネクターがある場合は、+ Add connector をクリックします。

ステップ 3: コネクターを選択します。

Click the Azure Service Bus Source connector card.

Azure Service Bus Source Connector Card

Step 4: Enter the connector details.

注釈

  • すべての 前提条件 を満たしていることを確認してください。
  • アスタリスク ( * ) は必須項目であることを示しています。

At the Add Azure Service Bus Source Connector screen, complete the following:

Select the topic you want to send data to from the Topics list. To create a new topic, click +Add new topic.

Step 5: Check for records.

レコードが Kafka のトピックに生成されていることを確認します。

Connect 用の Confluent Cloud API の使用に関する詳細とサンプルについては、「Confluent Cloud API for Connect」セクションを参照してください。

Confluent CLI の使用

以下の手順に従うと、Confluent CLI を使用してコネクターをセットアップし、実行できます。

注釈

ステップ 1: 使用可能なコネクターをリスト表示します。

以下のコマンドを入力して、使用可能なコネクターをリスト表示します。

confluent connect plugin list

ステップ 2: コネクターの必須の構成プロパティを表示します。

以下のコマンドを実行して、コネクターの必須プロパティを表示します。

confluent connect plugin describe <connector-catalog-name>

例:

confluent connect plugin describe AzureServiceBusSource

出力例:

Following are the required configs:
connector.class: AzureServiceBusSource
name
kafka.auth.mode
kafka.api.key
kafka.api.secret
kafka.topic
azure.servicebus.namespace
azure.servicebus.sas.keyname
azure.servicebus.sas.key
azure.servicebus.entity.name
output.data.format
tasks.max

ステップ 3: コネクターの構成ファイルを作成します。

コネクター構成プロパティを含む JSON ファイルを作成します。以下の例は、コネクターの必須プロパティを示しています。

{
  "connector.class": "AzureServiceBusSource",
  "name": "AzureServiceBusSource_0",
  "kafka.auth.mode": "KAFKA_API_KEY",
  "kafka.api.key": "****************",
  "kafka.api.secret": "************************************************",
  "kafka.topic": "<topic-name>",
  "azure.servicebus.namespace": "<namespace>",
  "azure.servicebus.sas.keyname": "<keyname>",
  "azure.servicebus.sas.key": "****************************************",
  "azure.servicebus.entity.name": "<entity>",
  "output.data.format": "AVRO",
  "tasks.max": "1",
}

以下のプロパティ定義に注意してください。

  • "connector.class": コネクターのプラグイン名を指定します。
  • "name": 新しいコネクターの名前を設定します。
  • "kafka.auth.mode": 使用するコネクターの認証モードを指定します。オプションは SERVICE_ACCOUNT または KAFKA_API_KEY (デフォルト)です。API キーとシークレットを使用するには、構成プロパティ kafka.api.keykafka.api.secret を構成例(前述)のように指定します。サービスアカウント を使用するには、プロパティ kafka.service.account.id=<service-account-resource-ID>リソース ID を指定します。使用できるサービスアカウントのリソース ID のリストを表示するには、次のコマンドを使用します。

    confluent iam service-account list
    

    例:

    confluent iam service-account list
    
       Id     | Resource ID |       Name        |    Description
    +---------+-------------+-------------------+-------------------
       123456 | sa-l1r23m   | sa-1              | Service account 1
       789101 | sa-l4d56p   | sa-2              | Service account 2
    
  • "kafka.topic": データの送信先となるトピック名を入力します。

  • "azure.servicebus.<>": Azure Service Bus の詳細情報を入力します。詳細については、Azure Service Bus のドキュメント を参照してください。Shared Access Signature(SAS)認可の詳細については、「共有アクセス承認ポリシー」を参照してください。

  • "output.data.format": Kafka 出力レコード値のフォーマット(Kafka トピックに送られるデータ)として AVRO、JSON_SR(JSON スキーマ)、PROTOBUF、または JSON(スキーマレス)を入力します。スキーマレジストリ ベースのフォーマット(Avro、JSON_SR(JSON スキーマ)、Protobuf など)を使用するには、Schema Registry を有効にしておく必要があります。詳細については、「スキーマレジストリ Enabled Environments」を参照してください。

  • "tasks.max": このコネクターで使用できる タスク の最大数を入力します。

Single Message Transforms: CLI を使用する SMT の追加の詳細については、Single Message Transforms(SMT) のドキュメントを参照してください。

すべてのプロパティの値と説明については、「構成プロパティ」を参照してください。

ステップ 4: プロパティファイルを読み込み、コネクターを作成します。

以下のコマンドを入力して、構成を読み込み、コネクターを起動します。

confluent connect create --config <file-name>.json

例:

confluent connect create --config azure-service-bus-source-config.json

出力例:

Created connector AzureServiceBusSource_0 lcc-do6vzd

ステップ 5: コネクターのステータスを確認します。

以下のコマンドを入力して、コネクターのステータスを確認します。

confluent connect list

出力例:

ID           |             Name              | Status  | Type | Trace
+------------+-------------------------------+---------+------+-------+
lcc-do6vzd   | AzureServiceBusSource_0       | RUNNING | sink |       |

ステップ 6: レコードを確認します。

レコードが Kafka のトピックに生成されていることを確認します。

Connect 用の Confluent Cloud API の使用に関する詳細とサンプルについては、「Confluent Cloud API for Connect」セクションを参照してください。

構成プロパティ

このコネクターでは、以下のコネクター構成プロパティを使用します。

データへの接続方法(How should we connect to your data?)

name

コネクターの名前を設定します。

  • 型: string
  • 指定可能な値: 最大 64 文字の文字列
  • 重要度: 高

Kafka クラスターの認証情報(Kafka Cluster credentials)

kafka.auth.mode

Kafka の認証モード。KAFKA_API_KEY または SERVICE_ACCOUNT を指定できます。デフォルトは KAFKA_API_KEY モードです。

  • 型: string
  • デフォルト: KAFKA_API_KEY
  • 指定可能な値: KAFKA_API_KEY、SERVICE_ACCOUNT
  • 重要度: 高
kafka.api.key
  • 型: password
  • 重要度: 高
kafka.service.account.id

Kafka クラスターとの通信用の API キーを生成するために使用されるサービスアカウント。

  • 型: string
  • 重要度: 高
kafka.api.secret
  • 型: password
  • 重要度: 高

データの送信先トピック(Which topic do you want to send data to?)

kafka.topic

データの書き込み先トピック名を指定します。

  • 型: string
  • 重要度: 高

Azure Service Bus への接続方法(How should we connect to Azure Service Bus?)

azure.servicebus.namespace

メッセージエンティティが属する Azure Service Bus の名前空間。

  • 型: string
  • 重要度: 高
azure.servicebus.sas.keyname

アクセス認証に使用する共有アクセスポリシー名。

  • 型: string
  • 重要度: 高
azure.servicebus.sas.key

アクセス認証に使用する共有アクセスキー。

  • 型: password
  • 重要度: 高
azure.servicebus.entity.name

Azure Service Bus メッセージエンティティの名前。メッセージをポーリングする必要のあるキューまたはトピックの名前にしてください。

  • 型: string
  • 重要度: 高
azure.servicebus.subscription

メッセージをポーリングする必要のある構成済みトピックに使用する Azure Service Bus のサブスクリプション名にしてください。メッセージエンティティの種類が topic の場合に限って構成してください。

  • 型: string
  • 重要度: 高

出力メッセージ(Output messages)

output.data.format

Kafka 出力レコード値のフォーマットを設定します。指定可能なエントリは、AVRO、JSON_SR、PROTOBUF、または JSON です。スキーマベースのメッセージフォーマット(AVRO、JSON_SR、PROTOBUF など)を使用する場合は、Confluent Cloud Schema Registry を構成しておく必要がある点に注意してください

  • 型: string
  • 重要度: 高

このコネクターのタスク数(Number of tasks for this connector)

tasks.max
  • 型: int
  • 指定可能な値: [1,...]
  • 重要度: 高

次のステップ

参考

フルマネージド型の Confluent Cloud コネクターが Confluent Cloud ksqlDB でどのように動作するかを示す例については、「Cloud ETL のデモ」を参照してください。この例では、Confluent CLI を使用して Confluent Cloud のリソースを管理する方法についても説明しています。

../_images/topology.ja.png