重要

このページの日本語コンテンツは古くなっている可能性があります。最新の英語版コンテンツをご覧になるには、こちらをクリックしてください。

Azure Cognitive Search Sink Connector for Confluent Cloud

注釈

Confluent Platform 用にコネクターをローカルにインストールする場合は、「Azure Cognitive Search Sink Connector for Confluent Platform」を参照してください。

Azure Cognitive Search Sink Connector for Confluent Cloud を使用すると、Apache Kafka® から Azure Cognitive Search にデータを移動できます。このコネクターは、各イベントを Kafka のトピックから(ドキュメントとして)|az| Cognitive Search のインデックスに書き込みます。コネクターでは、レコードをドキュメントとして送信するために、Azure Cognitive Search の REST API が使用されます。

機能

Azure Cognitive Search Sink Connector では、以下の機能をサポートしています。

  • 少なくとも 1 回のデリバリー: コネクターによって、Kafka のトピックからのレコードが少なくとも 1 回は配信されることが保証されます。

  • 複数のタスクのサポート: このコネクターは、1 つまたは複数のタスクの実行をサポートしています。タスクが多いほどパフォーマンスが向上する可能性があります。

  • 順序どおりの書き込み: レコードはコネクターによって受信されたとおりの順序で書き込まれます。また、一意性確保のために、ドキュメントキーとして Kafka 座標(トピック、パーティション、オフセット)を使用できます。それ以外の場合は、コネクターでドキュメントキーとしてレコードキーが使用されます。

  • トピックの自動作成: コネクターの起動時に、以下の 3 つのトピックが自動的に作成されます。

    各トピック名にはサフィックスとして、コネクターの論理 ID が付けられます。以下の例では、コネクターのトピックが 3 つと、pageviews という名前の既存の Kafka トピックが 1 つ含まれています。

    シンクコネクターの自動作成トピック

    コネクターのトピック

    トピックに送信されたレコードが正しいフォーマットではない場合、またはレコード内に重要なフィールドが存在しない場合は、エラートピックにエラーが記録され、コネクターは動作を継続します。

  • 自動再試行: Azure Cognitive Search サービスで障害が発生した場合は、すべてのリクエスト(再試行可能なもの)がコネクターによって再試行されます。コネクターによる再試行期間の最大値を指定するには、max.retry.ms 構成プロパティを使用します。

  • サポートされるデータフォーマット: このコネクターは、Avro、JSON スキーマ(JSON-SR)、および Protobuf 入力フォーマットをサポートします。これらの スキーマレジストリ ベースのフォーマットを使用するには、Schema Registry を有効にしておく必要があります。

Connect 用の Confluent Cloud API の使用に関する詳細とサンプルについては、「Confluent Cloud API for Connect」セクションを参照してください。

制限

以下の情報を確認してください。

Azure サービスプリンシパル

コネクターを実行するには、Azure RBAC サービスプリンシパルが必要です。サービスプリンシパルを作成する場合の Azure CLI コマンドからの出力結果に、必要な認証および認可の詳細が含まれます。これをコネクターの構成に追加します。

注釈

CLI の代わりに Azure ポータルを使用して既存のサービスプリンシパルにロールを割り当てる場合は、「Azure portal を使用して Azure ロールを割り当てる」を参照してください。

Azure CLI を使用してサービスプリンシパルキーを作成するには、以下の手順を実行します。

  1. Azure CLI にログインします。

    az login
    
  2. 以下のコマンドを入力して、サービスプリンシパルを作成します。

    az ad sp create-for-rbac --name <Name of service principal> --scopes \
    /subscriptions/<SubscriptionID>/resourceGroups/<Resource_Group>
    

    以下に例を示します。

    az ad sp create-for-rbac --name azure_search --scopes /subscriptions/
    d92eeba4-...omitted...-37c2bd9259d0/resourceGroups/connect-azure
    
    Creating 'Contributor' role assignment under scope '/subscriptions/
    d92eeba4-...omitted...-37c2bd9259d0/resourceGroups/connect-azure'
    
    The output includes credentials that you must protect. Be sure that you
    do not include these credentials in your code or check the credentials
    into your source control.
    
    {
       "appId": "8ec186f9-...omitted...-e575b928b00a",
       "displayName": "azure_search",
       "name": "8ec186f9-...omitted...-e575b928b00a",
       "password": "jdGzGTwCKQ...omitted...QwE3hx",
       "tenant": "0893715b-...omitted...-2789e1ead045"
    }
    

    コネクター構成で使用するために、以下の詳細を保存します。

    • "appId" の出力は、コネクターの UI フィールド Azure Client ID (CLI プロパティ azure.search.client.id)に使用します。

    • "password" の出力は、コネクターの UI フィールド Azure Client Secret (CLI プロパティ azure.search.client.secret)に使用します。

    • "tenant" の出力は、コネクターの UI フィールド Azure Tenant ID (CLI プロパティ azure.search.tenant.id)に使用します。

      ちなみに

      必要な場合は、サービスプリンシパルをさらに細かく設定できます。たとえば、以下のコマンドでは、特に Azure 検索サービスにアクセスするための contributor ロールの割り当てを作成します。

      az ad sp create-for-rbac --name <Name of service principal> --scopes
       /subscriptions/<SubscriptionID>/resourceGroups/<Resource Group>
       /providers/Microsoft.Search/searchServices/<Search Service Name>
       --role Reader
      

クイックスタート

このクイックスタートを使用して、Confluent Cloud Azure Cognitive Search Sink Connector の利用を開始することができます。このクイックスタートでは、コネクターを選択し、イベントをストリーミングするようにコネクターを構成するための基本的な方法について説明します。

前提条件
  • Microsoft Azure (Azure)上の Confluent Cloud クラスターへのアクセスを許可されていること。
  • Azure サービスプリンシパル、Azure Cognitive Search の API キー、および契約プランの詳細情報(コネクター構成で使用)。
  • Confluent CLI がインストールされ、クラスター用に構成されていること。「Confluent CLI のインストール」を参照してください。
  • スキーマレジストリ ベースのフォーマット(Avro、JSON_SR(JSON スキーマ)、Protobuf など)を使用するには、Schema Registry を有効にしておく必要があります。詳細については、「スキーマレジストリ Enabled Environments」を参照してください。
  • Azure Cognitive Search に少なくとも 1 つのインデックスが存在する必要があります。
  • すべてのレコードスキーマフィールドが Azure 検索サービスのインデックスフィールドとして存在する必要があります。
  • シンクコネクターを作成する前に、Confluent Cloud クラスター上にソース Kafka トピックが 1 つ以上存在している必要があります。

Confluent Cloud Console の使用

ステップ 1: Confluent Cloud クラスターを起動します。

インストール手順については、「Quick Start for Confluent Cloud」を参照してください。

ステップ 2: コネクターを追加します。

左のナビゲーションメニューの Data integration をクリックし、Connectors をクリックします。クラスター内に既にコネクターがある場合は、+ Add connector をクリックします。

ステップ 3: コネクターを選択します。

Click the Azure Cognitive Search Sink connector card.

Azure Cognitive Search Sink Connector Card

ステップ 4: コネクターの詳細情報を入力します。

注釈

  • すべての 前提条件 を満たしていることを確認してください。
  • アスタリスク( * )は必須項目であることを示しています。

Add Azure Cognitive Search Sink Connector 画面で、以下を実行します。

既に Kafka トピックを用意している場合は、Topics リストから接続するトピックを選択します。

新しいトピックを作成するには、+Add new topic をクリックします。

ステップ 5: ドキュメントを確認します。

ドキュメントが検索インデックスに取り込まれていることを確認します。

Connect 用の Confluent Cloud API の使用に関する詳細とサンプルについては、「Confluent Cloud API for Connect」セクションを参照してください。

ちなみに

コネクターを起動すると、デッドレターキューのトピックが自動的に作成されます。詳細については、「Confluent Cloud デッドレターキュー」を参照してください。

Confluent CLI の使用

以下の手順に従うと、Confluent CLI を使用してコネクターをセットアップし、実行できます。

注釈

ステップ 1: 使用可能なコネクターをリスト表示します。

以下のコマンドを入力して、使用可能なコネクターをリスト表示します。

confluent connect plugin list

ステップ 2: コネクターの必須の構成プロパティを表示します。

以下のコマンドを入力して、コネクターの必須プロパティを表示します。

confluent connect plugin describe <connector-catalog-name>

以下に例を示します。

confluent connect plugin describe AzureCognitiveSearchSink

出力例:

Following are the required configs:
connector.class: AzureCognitiveSearchSink
input.data.format
name
kafka.api.key
kafka.api.secret
azure.search.service.name
azure.search.api.key
azure.search.client.id
azure.search.client.secret
azure.search.tenant.id
azure.search.subscription.id
azure.search.resourcegroup.name
index.name
tasks.max
topics

ステップ 3: コネクターの構成ファイルを作成します。

コネクター構成プロパティを含む JSON ファイルを作成します。以下の例は、コネクターの必須プロパティを示しています。

{
  "connector.class": "AzureCognitiveSearchSink",
  "input.data.format": "AVRO",
  "name": "AzureCognitiveSearchSink_0",
  "kafka.api.key": "****************",
  "kafka.api.secret": "************************************************",
  "azure.search.service.name": "<service_name>",
  "azure.search.api.key": "<api_key>",
  "azure.search.client.id": "<client_id>",
  "azure.search.client.secret": "<client_secret>",
  "azure.search.tenant.id": "<tenant_id>",
  "azure.search.subscription.id": "<subscription_id>",
  "azure.search.resourcegroup.name": "<resource_group>",
  "index.name": "<index_name>",
  "tasks.max": "1",
  "topics": "<topic_name>"
}

以下のプロパティ定義に注意してください。

  • "connector.class": コネクターのプラグイン名を指定します。
  • "input.data.format": Kafka 入力レコード値のフォーマット(Kafka トピックから送られるデータ)を設定します。指定可能なエントリは、AVROJSON_SR、および PROTOBUF です。スキーマベースのメッセージフォーマット(たとえば、Avro、JSON_SR(JSON スキーマ)、および Protobuf)を使用するには、Confluent Cloud Schema Registry を構成しておく必要があります。
  • "name": 新しいコネクターの名前を設定します。
  • "kafka.api.key" および "kafka.api.secret": これらの認証情報として、クラスター API キーとシークレットを使用するか、サービスアカウント の API キーとシークレットを使用します。
  • azure.search.<...>: 必須の Azure および Azure 検索接続の詳細情報。プロパティの詳細については、「Azure サービスプリンシパル」および Azure Cognitive Search の API キー を参照してください。
  • "index.name": レコードを(ドキュメントとして)書き込む先の検索インデックスの名前。
  • "tasks.max": このコネクターで使用できる タスク の最大数を入力します。タスクが多いほどパフォーマンスが向上する可能性があります。
  • "topics": 特定のトピック名を指定するか、複数のトピック名をコンマ区切りにしたリストを指定します。

Single Message Transforms: CLI を使用した SMT の追加の詳細については、Single Message Transforms(SMT) のドキュメントを参照してください。このコネクターでサポートされていない SMT のリストについては、「サポートされない変換」を参照してください。

すべてのプロパティの値と説明については、「構成プロパティ」を参照してください。

ステップ 4: プロパティファイルを読み込み、コネクターを作成します。

以下のコマンドを入力して、構成を読み込み、コネクターを起動します。

confluent connect create --config <file-name>.json

以下に例を示します。

confluent connect create --config azure-search-sink-config.json

出力例:

Created connector AzureCognitiveSearchSink_0 lcc-do6vzd

ステップ 5: コネクターのステータスを確認します。

以下のコマンドを入力して、コネクターのステータスを確認します。

confluent connect list

出力例:

ID           |             Name             | Status  | Type | Trace
+------------+------------------------------+---------+------+-------+
lcc-do6vzd   | AzureCognitiveSearchSink_0   | RUNNING | sink |       |

ステップ 6: ドキュメントを確認します。

Azure 検索インデックスに取り込まれていることを確認します。

Connect 用の Confluent Cloud API の使用に関する詳細とサンプルについては、「Confluent Cloud API for Connect」セクションを参照してください。

ちなみに

コネクターを起動すると、デッドレターキューのトピックが自動的に作成されます。詳細については、「Confluent Cloud デッドレターキュー」を参照してください。

構成プロパティ

このコネクターでは、以下のコネクター構成プロパティを使用します。

データの取得元とするトピック(Which topics do you want to get data from?)

topics

特定のトピック名を指定するか、複数のトピック名をコンマ区切りにしたリストを指定します。

  • 型: list
  • 重要度: 高

入力メッセージ(Input messages)

input.data.format

Kafka 入力レコード値のフォーマットを設定します。指定可能なエントリは、AVRO、JSON_SR、および PROTOBUF です。Confluent Cloud Schema Registry を構成しておく必要があります。

  • 型: string
  • 重要度: 高

データへの接続方法(How should we connect to your data?)

name

コネクターの名前を設定します。

  • 型: string
  • 指定可能な値: 最大 64 文字の文字列
  • 重要度: 高

Kafka クラスターの認証情報(Kafka Cluster credentials)

kafka.auth.mode

Kafka の認証モード。KAFKA_API_KEY または SERVICE_ACCOUNT を指定できます。デフォルトは KAFKA_API_KEY モードです。

  • 型: string
  • デフォルト: KAFKA_API_KEY
  • 指定可能な値: KAFKA_API_KEY、SERVICE_ACCOUNT
  • 重要度: 高
kafka.api.key
  • 型: password
  • 重要度: 高
kafka.service.account.id

Kafka クラスターとの通信用の API キーを生成するために使用されるサービスアカウント。

  • 型: string
  • 重要度: 高
kafka.api.secret
  • 型: password
  • 重要度: 高

Azure Search Service への接続方法(How should we connect to your Azure Search Service?)

azure.search.service.name

Azure Search サービスの名前。

  • 型: string
  • 重要度: 高
azure.search.api.key

Azure Search サービスの API キー。

  • 型: password
  • 重要度: 高
azure.search.client.id

サブスクリプションのサービスプリンシパルのクライアント ID。

  • 型: password
  • 重要度: 高
azure.search.client.secret

サブスクリプションのサービスプリンシパルのクライアントシークレット。

  • 型: password
  • 重要度: 高
azure.search.tenant.id

サブスクリプションのサービスプリンシパルのテナント ID。

  • 型: password
  • 重要度: 高
azure.search.subscription.id

Azure アカウントの Azure サブスクリプション ID。

  • 型: password
  • 重要度: 高
azure.search.resourcegroup.name

Azure Search サービスが存在する ResourceGroup。

  • 型: string
  • 重要度: 高

Search サービスの書き込みの詳細情報(Search Service Write Details)

index.name

レコードをドキュメントとして書き込む先のインデックスの名前。レコードのトピックを指定するには、パターン内で ${topic} を使用します。

  • 型: string
  • 重要度: 高
write.method

Kafka レコードのインデックスへの書き込みに使用する方式。使用できる方式には次のものがあります。Upload: アップサートと同様に機能します。ドキュメントが存在しなかった場合は挿入され、存在していた場合はアップデートまたは置換されます。MergeOrUpload: 指定されたフィールドで既存のドキュメントをアップデートします。ドキュメントが存在しなかった場合は、Upload と同様の動作となります。

  • 型: string
  • デフォルト: Upload
  • 重要度: 高
delete.enabled

レコード値が null の場合にドキュメントが削除されるかどうかを設定します。

  • 型: boolean
  • デフォルト: false
  • 重要度: 高
key.mode

ドキュメントキー ID として何を使用するかを指定します。使用可能なモードには次のものがあります。KEY: ドキュメントキーとして Kafka レコードキーが使用されます。COORDINATES: Kafka 座標(トピック、パーティション、オフセット)が連結されてドキュメントキーを形成します。これにより、一意のドキュメントキーが生成されます。

  • 型: string
  • デフォルト: KEY
  • 重要度: 中
max.batch.size

リクエストごとの送信 Kafka レコードの最大数。レコードのバッチ処理を無効にするには、この値を 1 に設定します

  • 型: int
  • デフォルト: 1
  • 指定可能な値: [1,...,1000]
  • 重要度: 高
max.retry.ms

コネクターによってリクエストが試行されてからそれが中止されるまでの最大時間(ミリ秒)。

  • 型: int
  • デフォルト: 300000(5 分)
  • 指定可能な値: [0,...]
  • 重要度: 低

このコネクターのタスク数(Number of tasks for this connector)

tasks.max
  • 型: int
  • 指定可能な値: [1,...]
  • 重要度: 高

次のステップ

参考

フルマネージド型の Confluent Cloud コネクターが Confluent Cloud ksqlDB でどのように動作するかを示す例については、「Cloud ETL のデモ」を参照してください。この例では、Confluent CLI を使用して Confluent Cloud のリソースを管理する方法についても説明しています。

../_images/topology.ja.png