重要

このページの日本語コンテンツは古くなっている可能性があります。最新の英語版コンテンツをご覧になるには、こちらをクリックしてください。

Azure Functions Sink Connector for Confluent Cloud

注釈

Confluent Platform 用にコネクターをローカルにインストールする場合は、「Azure Functions Sink Connector for Confluent Platform」を参照してください。

Kafka Connect Azure Functions Sink Connector for Confluent Cloud を使用すると、Apache Kafka® を Azure Functions と統合できます。Azure 関数の作成について詳しくは、『Azure Functions の概要』を参照してください。

このコネクターは、Kafka トピックからレコードを消費して Azure Functions を実行します。Azure Functions に送信される各リクエストには、max.batch.size で指定した件数までレコードを含めることができます。

機能

Azure Functions Sink Connector には、以下の機能があります。

  • Azure Functions の結果は、以下のトピックに格納されます。
    • success-<connector-id>
    • error-<connector-id>
  • サポートされる入力データのフォーマットは、Bytes、AVRO、JSON_SR(JSON スキーマ)、JSON(スキーマレス)および PROTOBUF です。スキーマが定義されていない場合、値はプレーンテキストの文字列としてエンコードされます。たとえば、"name": "Kimberley Human"name=Kimberley Human としてエンコードされます。

Connect 用の Confluent Cloud API の使用に関する詳細とサンプルについては、「Confluent Cloud API for Connect」セクションを参照してください。

制限

以下の情報を確認してください。

クイックスタート

このクイックスタートを使用して、Confluent Cloud Azure Functions Sink Connector の利用を開始することができます。このクイックスタートでは、コネクターを選択し、ターゲットの Azure Functions にイベントをストリームするようにコネクターを構成するための基本的な方法について説明します。

前提条件
  • Microsoft Azure 上の Confluent Cloud クラスターへのアクセスを許可されていること。
  • Azure 関数へのアクセス。関数の基本的な情報については、『Azure Functions の概要』を参照してください。
  • Confluent CLI がインストールされ、クラスター用に構成されていること。「Confluent CLI のインストール」を参照してください。
  • スキーマレジストリ ベースのフォーマット(Avro、JSON_SR(JSON スキーマ)、Protobuf など)を使用するには、Schema Registry を有効にしておく必要があります。詳細については、「スキーマレジストリ Enabled Environments」を参照してください。
  • ターゲットの Azure 関数 と Kafka クラスターは、同じリージョンに存在している必要があります。
  • Kafka クラスターの認証情報。次のいずれかの方法で認証情報を指定できます。
    • 既存の サービスアカウント のリソース ID を入力する。
    • コネクター用の Confluent Cloud サービスアカウント を作成します。「サービスアカウント」のドキュメントで、必要な ACL エントリを確認してください。一部のコネクターには固有の ACL 要件があります。
    • Confluent Cloud の API キーとシークレットを作成する。キーとシークレットを作成するには、confluent api-key create を使用するか、コネクターのセットアップ時に Cloud Console で直接 API キーとシークレットを自動生成します。

Confluent Cloud Console の使用

ステップ 1: Confluent Cloud クラスターを起動します。

インストール手順については、「Quick Start for Confluent Cloud」を参照してください。

ステップ 2: コネクターを追加します。

左のナビゲーションメニューの Data integration をクリックし、Connectors をクリックします。クラスター内に既にコネクターがある場合は、+ Add connector をクリックします。

ステップ 3: コネクターを選択します。

Click the Azure Functions Sink connector card.

Azure Functions Sink Connector Card

ステップ 4: コネクターの詳細情報を入力します。

注釈

  • すべての 前提条件 を満たしていることを確認してください。
  • アスタリスク( * )は必須項目であることを示しています。

Add Azure Functions Sink Connector 画面で、以下を実行します。

既に Kafka トピックを用意している場合は、Topics リストから接続するトピックを選択します。

新しいトピックを作成するには、+Add new topic をクリックします。

ステップ 5: レコードを確認します。

レコードが生成されていることを確認します。

Connect 用の Confluent Cloud API の使用に関する詳細とサンプルについては、「Confluent Cloud API for Connect」セクションを参照してください。

ちなみに

コネクターを起動すると、デッドレターキューのトピックが自動的に生成されます。詳細については、「Confluent Cloud デッドレターキュー」を参照してください。

参考

フルマネージド型の Confluent Cloud コネクターが Confluent Cloud ksqlDB でどのように動作するかを示す例については、「Cloud ETL のデモ」を参照してください。この例では、Confluent CLI を使用して Confluent Cloud のリソースを管理する方法についても説明しています。

../_images/topology.ja.png

Confluent CLI を使用する場合

以下の手順に従うと、Confluent CLI を使用してコネクターをセットアップし、実行できます。

注釈

ステップ 1: 使用可能なコネクターをリスト表示します。

以下のコマンドを入力して、使用可能なコネクターをリスト表示します。

confluent connect plugin list

ステップ 2: コネクターの必須の構成プロパティを表示します。

以下のコマンドを実行して、コネクターの必須プロパティを表示します。

confluent connect plugin describe <connector-catalog-name>

例:

confluent connect plugin describe AzureFunctionSink

出力例:

Following are the required configs:
connector.class
name
kafka.auth.mode
kafka.api.key
kafka.api.secret
function.url
function.key
topics
tasks.max

ステップ 3: コネクターの構成ファイルを作成します。

コネクター構成プロパティを含む JSON ファイルを作成します。以下の例は、コネクターの必須プロパティを示しています。

{
  "topics":"pageviews",
  "input.data.format": "AVRO",
  "connector.class": "AzureFunctionsSink",
  "name": "AzureFunctionsSinkConnector_0",
  "kafka.auth.mode": "KAFKA_API_KEY",
  "kafka.api.key": "****************",
  "kafka.api.secret": "****************************************************************",
  "function.url": "https://myfunctionapp-dev.azurewebsites.net/api/HttpTrigger1",
  "function.key": "***************",
  "tasks.max": "1"
}

以下のプロパティ定義に注意してください。

  • "topics": 特定のトピック名を指定するか、複数のトピック名をコンマ区切りにしたリストを指定します。
  • "input.data.format": Kafka 入力レコード値のフォーマットを設定します。指定可能なエントリは、AVROJSON_SRPROTOBUFJSON、または BYTES です。スキーマベースのメッセージフォーマット(たとえば、Avro、JSON_SR(JSON スキーマ)、および Protobuf)を使用するには、Confluent Cloud Schema Registry を構成しておく必要があります。
  • "connector.class": コネクターのプラグイン名を指定します。
  • "name": 新しいコネクターの名前を設定します。
  • "kafka.auth.mode": 使用するコネクターの認証モードを指定します。オプションは SERVICE_ACCOUNT または KAFKA_API_KEY (デフォルト)です。API キーとシークレットを使用するには、構成プロパティ kafka.api.keykafka.api.secret を構成例(前述)のように指定します。サービスアカウント を使用するには、プロパティ kafka.service.account.id=<service-account-resource-ID>リソース ID を指定します。使用できるサービスアカウントのリソース ID のリストを表示するには、次のコマンドを使用します。

    confluent iam service-account list
    

    例:

    confluent iam service-account list
    
       Id     | Resource ID |       Name        |    Description
    +---------+-------------+-------------------+-------------------
       123456 | sa-l1r23m   | sa-1              | Service account 1
       789101 | sa-l4d56p   | sa-2              | Service account 2
    
  • "function.url": 事前定義した Azure 関数の URL。

  • "function.key": 事前定義した Azure 関数のキー。

(オプション)

  • "behavior.on.error": レコードの処理中に構成済みの Azure 関数がエラーを返した場合のコネクターのエラー処理の動作を設定します。デフォルトは log です。指定可能なオプションは logfail です。log の場合、エラーメッセージが error-<connector-id> に記録されて処理が続行されます。fail の場合、エラーが発生すると接続を停止します。
  • "max.batch.size": 1 回の Azure 関数の呼び出しにまとめることができるレコードの最大件数。デフォルトは 1 (バッチ処理は無効)です。1 から 1000 までの値を指定できます。
  • "max.pending.requests": Azure Functions に対して同時に要求できる処理待ちリクエストの最大数。デフォルトは 1 です。
  • "request.timeout": コネクターが Azure Functions へのリクエストを試行できる最長時間(ミリ秒)。これを過ぎるとタイムアウト(ソケットタイムアウト)します。デフォルトは 300000 ミリ秒(5 分)です。
  • "retry.timeout": 失敗したリクエスト(スロットリング時など)がコネクターによって指数関数的にバックオフおよび再試行される合計時間(ミリ秒)。再試行される応答コードは HTTP 429 Too BusyHTTP 502 Bad Gateway です。デフォルトは 300000 ミリ秒(5 分)です。永続的に再試行するようにこのプロパティを構成するには -1 を入力します。

Single Message Transforms: CLI を使用する SMT の追加の詳細については、Single Message Transforms(SMT) のドキュメントを参照してください。

すべてのプロパティの値と定義については、「構成プロパティ」を参照してください。

ステップ 4: プロパティファイルを読み込み、コネクターを作成します。

以下のコマンドを入力して、構成を読み込み、コネクターを起動します。

confluent connect create --config <file-name>.json

例:

confluent connect create --config azure-functions-sink-config.json

出力例:

Created connector AzureFunctionsSinkConnector_0 lcc-ix4dl

ステップ 5: コネクターのステータスを確認します。

以下のコマンドを入力して、コネクターのステータスを確認します。

confluent connect list

出力例:

ID          |       Name                    | Status  | Type
+-----------+-------------------------------+---------+------+
lcc-ix4dl   | AzureFunctionsSinkConnector_0 | RUNNING | sink

ステップ 6: レコードを確認します。

レコードが生成されていることを確認します。

Connect 用の Confluent Cloud API の使用に関する詳細とサンプルについては、「Confluent Cloud API for Connect」セクションを参照してください。

ちなみに

コネクターを起動すると、デッドレターキューのトピックが自動的に生成されます。詳細については、「Confluent Cloud デッドレターキュー」を参照してください。

構成プロパティ

このコネクターでは、以下のコネクター構成プロパティを使用します。

データの取得元とするトピック(Which topics do you want to get data from?)

topics

特定のトピック名を指定するか、複数のトピック名をコンマ区切りにしたリストを指定します。

  • 型: list
  • 重要度: 高

入力メッセージ(Input messages)

input.data.format

Kafka 入力レコード値のフォーマットを設定します。指定可能なエントリは、AVRO、JSON_SR、PROTOBUF、JSON、または BYTES です。スキーマベースのメッセージフォーマット(AVRO、JSON_SR、PROTOBUF など)を使用する場合は、Confluent Cloud Schema Registry を構成しておく必要がある点に注意してください。

  • 型: string
  • 重要度: 高

データへの接続方法(How should we connect to your data?)

name

コネクターの名前を設定します。

  • 型: string
  • 指定可能な値: 最大 64 文字の文字列
  • 重要度: 高

Kafka クラスターの認証情報(Kafka Cluster credentials)

kafka.auth.mode

Kafka の認証モード。KAFKA_API_KEY または SERVICE_ACCOUNT を指定できます。デフォルトは KAFKA_API_KEY モードです。

  • 型: string
  • デフォルト: KAFKA_API_KEY
  • 指定可能な値: KAFKA_API_KEY、SERVICE_ACCOUNT
  • 重要度: 高
kafka.api.key
  • 型: password
  • 重要度: 高
kafka.service.account.id

Kafka クラスターとの通信用の API キーを生成するために使用されるサービスアカウント。

  • 型: string
  • 重要度: 高
kafka.api.secret
  • 型: password
  • 重要度: 高

関数への接続方法(How should we connect to your functions)

function.url

定義済みの Azure 関数を呼び出す Azure 関数の URL

  • 型: string
  • 重要度: 高
function.key

定義済みの Azure 関数を呼び出す Azure 関数のキー

  • 型: password
  • デフォルト: [hidden]
  • 重要度: 中

関数の詳細(Function Details)

max.batch.size

1 回の関数呼び出しにまとめることができる Kafka レコードの最大数。レコードのバッチ処理を無効にするには、この値を 1 に設定します

  • 型: int
  • デフォルト: 1
  • 指定可能な値: [1、…]
  • 重要度: 高
max.pending.requests

Azure Functions 宛てとして同時に存在できる保留中のリクエストの最大数。

  • 型: int
  • デフォルト: 1
  • 指定可能な値: [1,...,128]
  • 重要度: 中
request.timeout

コネクターによる Azure Functions へのリクエスト試行がタイムアウト(ソケットタイムアウト)するまでの最大時間(ミリ秒)

  • 型: int
  • デフォルト: 300000
  • 指定可能な値: [1、…]
  • 重要度: 低
retry.timeout

スロットリング時などの失敗したリクエストがコネクターによって指数関数的にバックオフおよび再試行される合計時間(ミリ秒)。再試行される応答コードは HTTP 429 Too Busy と HTTP 502 Bad Gateway です。値が -1 の場合は、永続的に再試行されます。

  • 型: int
  • デフォルト: 300000
  • 指定可能な値: [-1、...]
  • 重要度: 低

エラーの処理方法(How should we handle errors?)

behavior.on.error

呼び出された Azure 関数がエラーを返した場合のコネクターの動作。指定可能なオプションは「log」と「fail」です。「log」の場合、エラーメッセージが記録されて処理が続行されます。「fail」の場合、エラーが発生すると接続を停止します。

  • 型: string
  • デフォルト: log
  • 重要度: 低

このコネクターのタスク数(Number of tasks for this connector)

tasks.max
  • 型: int
  • 指定可能な値: [1、…]
  • 重要度: 高

次のステップ

参考

フルマネージド型の Confluent Cloud コネクターが Confluent Cloud ksqlDB でどのように動作するかを示す例については、「Cloud ETL のデモ」を参照してください。この例では、Confluent CLI を使用して Confluent Cloud のリソースを管理する方法についても説明しています。

../_images/topology.ja.png