Azure Functions Sink Connector for Confluent Platform¶
Kafka Connect Azure Functions Sink Connector により、Kafka が Azure Functions と統合されます。
注釈
Confluent Cloud を使用している場合は、「Azure Functions Sink Connector for Confluent Cloud」でクラウドのクイックスタートを参照してください。
このコネクターでは、Kafka トピックからのレコードを消費し、Azure Function を実行します。Azure Functions に送信される各リクエストには、最大 max.batch.size
レコードを含めることができます。また、max.outstanding.requests
で設定された複数のリクエストを同時に送信することもできます。
以下の JSON フォーマットリクエストを受け付けられるようにターゲット関数を構成する必要があります。
[
{
"key": ...,
"value": ...,
"topic": string,
"partition": <number>,
"offset": <number>,
"timestamp": <number>
},
...,
]
注釈
キーと値は以下のようにエンコードされます。
String
、int
、long
、float
、double
、boolean
、null
はそのまま JSON にエンコードされます。Structs
は JSON に変換され、スキーマ "なし" でエクスポートされます。byte[]
は base64 のString
としてエンコードされ、JSON 文字列として送信されます。- その他すべての Java オブジェクトは、
toString()
を使用してString
に変換されてから JSON 文字列として送信されます。
コネクターでは、Azure Function から受信された応答が、HTTP Response コードに応じて、(構成に設定されている)結果またはエラーのトピックに書き込まれます。応答コード 400
以上はエラー、400 未満は成功と見なされます。
コネクターでは、対応するトピックへの生成前に、各応答と単一レコードとのマップが試行されます。Azure Function からの応答は、以下の 3 フォーマットで受信できます。
1 つ目のフォーマットは JSON です。
[ { "payload": { "result": ..., "topic": string, "partition": <number>, "offset": <number>, } }, ... ]
このリストは、レコードが提供された順序とは一致していないことがあります。コネクターでは、Kafka 座標に基づいて結果がレコードと正しく対応付けられます。ただし、このリストは、リクエストで送信されたレコードのリストと 1 対 1 で対応している必要があります。
2 つ目のフォーマットは JSON リストです。
[ ..., ..., ... ]
このリストは、レコードのリストと 1 対 1 対応である限り、対応するレコードと同じ順序で一致すると想定されます。
3 つ目のフォーマットは、上述のどちらにも該当しない任意のフォーマットです。コネクターでは、個々のレコードについて応答全体がレポートされます(1 対多の相関)。
機能¶
Azure Functions Sink Connector には、以下の機能があります。
少なくとも 1 回のデリバリー¶
このコネクターによって、Kafka のトピックのレコードが少なくとも 1 回は配信されることが保証されます。
複数のタスク¶
Azure Functions Sink Connector は、1 つまたは複数のタスクの実行をサポートしています。タスクの数は tasks.max
構成パラメーターで指定できます。これにより、複数のファイルを解析する必要がある場合に、パフォーマンスが大幅に向上する可能性があります。
Azure Functions コネクターのインストール¶
このコネクターは、Confluent Hub クライアントのインストール手順 に従うか、手作業で ZIP ファイルをダウンロードしてインストールします。
前提条件¶
注釈
このコネクターは、Connect が実行されるすべてのマシンにインストールする必要があります。
Confluent Hub クライアント のインストール。
注釈
これは、Confluent Enterprise とともにデフォルトでインストールされます。
コネクターの最新(
latest
)バージョンのインストール。コネクターの
latest
バージョンをインストールするには、Confluent Platform のインストールディレクトリに移動し、次のコマンドを実行します。confluent-hub install confluentinc/kafka-connect-azure-functions:latest
特定のバージョンをインストールするには、次の例に示すように
latest
をバージョン番号に置き換えます。confluent-hub install confluentinc/kafka-connect-azure-functions:1.0.0-preview
コネクターの手動インストール¶
コネクターの ZIP ファイル をダウンロードして展開し、コネクターの手動インストール 手順 に従ってください。
ライセンス¶
このコネクターは、ライセンスキーがなくても 30 日間試用できます。
30 日間経過後は、 Confluent エンタープライズライセンス を取得することで、このコネクターを利用できます。Confluent では、契約者に Confluent エンタープライズライセンス キーを発行します。併せて、Confluent Platform とコネクターに関する エンタープライズレベルのサポート を提供します。既にご契約されている場合は、詳細について Confluent サポート(support@confluent.io)にお問い合わせください。
ライセンスのプロパティについては、「Confluent Platform ライセンス」を参照してください。ライセンストピックの詳細については、「ライセンストピックの構成」を参照してください。
構成プロパティ¶
このコネクターの構成プロパティに関する網羅的なリストについては、「Azure Functions Sink Connector 構成プロパティ」を参照してください。
注釈
Kafka Connect を Confluent Cloud に接続する方法の例については、「分散クラスター」を参照してください。
クイックスタート¶
このクイックスタートでは、Azure Functions Sink Connector を使用して、レコードを消費し、Azure Function の例を実行します。
- 前提条件
- Confluent Platform
- Confluent CLI (個別のインストールが必要)
コネクターを開始する前に、Azure Functions インスタンスを作成してデプロイします。
- Microsoft Azure Portal へ移動します。
- この クイックスタートガイド に従って FunctionApp を作成します。
重要
ポータル内で、Node.js ランタイムスタックを選択し、関数を作成してください。
このコードをコピーして、ポータルで
index.js
ファイルに貼り付けます。module.exports = async function (context, req) { context.log('JavaScript HTTP trigger function processed a request.'); context.res = { status: 200, body: req.body }; };
ポータルから関数の URL とキー(オプション)をコピーして保存し、後で使用できるようにします。これで、Azure Functions がコネクター用にセットアップされました。
注釈
このサンプルはリクエストがミラーされるだけの関数であり、リクエスト本文が応答に含められて送り返されます。
Confluent Hub クライアント を使用してコネクターをインストールします。
# run from your CP installation directory confluent-hub install confluentinc/kafka-connect-azure-functions:latest
Confluent CLI コマンドを使用して Confluent Platform を起動します。
ちなみに
Confluent CLI 開発用コマンドのコマンド構文が、5.3.0 で変更されています。該当するコマンドは
confluent local
に移行されています。たとえば、confluent start
の構文は、confluent local services start
に変わりました。詳しくは、「confluent local」を参照してください。confluent local start
Confluent CLI confluent local produce コマンドを使用して、Kafka の
functions-test
トピックにテストデータを生成します。echo key1,value1 | confluent local produce functions-test --property parse.key=true --property key.separator=, echo key2,value2 | confluent local produce functions-test --property parse.key=true --property key.separator=, echo key3,value3 | confluent local produce functions-test --property parse.key=true --property key.separator=,
azure-functions-sink.json
ファイルを以下の内容で作成します。{ "name": "azure-functions", "config": { "topics": "functions-test", "tasks.max": "1", "connector.class": "io.confluent.connect.azure.functions.AzureFunctionsSinkConnector", "key.converter":"org.apache.kafka.connect.storage.StringConverter", "value.converter":"org.apache.kafka.connect.storage.StringConverter", "confluent.topic.bootstrap.servers": "localhost:9092", "confluent.topic.replication.factor": "1", "function.url": "<the copied function url>", "function.key": "<the copied function key>", "reporter.bootstrap.servers": "localhost:9092", "reporter.error.topic.name": "test-error", "reporter.error.topic.replication.factor": 1, "reporter.error.topic.key.format": "string", "reporter.error.topic.value.format": "string", "reporter.result.topic.name": "test-result", "reporter.result.topic.key.format": "string", "reporter.result.topic.value.format": "string", "reporter.result.topic.replication.factor": 1 } }
注意
JSON ファイルの
function.url
値をコピーした関数 URL に忘れずに置き換えてください。Azure Functions Sink Connector を読み込みます。
注意
トピック名とフラグの間には、ダブルダッシュ(
--
)を含める必要があります。詳細については、こちらの投稿 を参照してください。confluent local load azure-functions --config path/to/azure-functions-sink.json
重要
Confluent CLI コマンドは本稼働環境では使用しないでください。
コネクターが
RUNNING
状態であることを確認します。confluent local status azure-functions
メッセージが Kafka の結果トピックに配信されたことを確認します。
confluent local consume test-result --from-beginning
リソースをクリーンアップします。
ソースコネクターを削除します。
confluent local unload azure-functions
Confluent Platform を停止します。
confluent local stop
Azure ポータルで作成した Azure 関数を削除します。