重要
このページの日本語コンテンツは古くなっている可能性があります。最新の英語版コンテンツをご覧になるには、こちらをクリックしてください。
AWS Lambda Sink Connector for Confluent Cloud¶
注釈
Confluent Platform 用にコネクターをローカルにインストールする場合は、「AWS Lambda Sink Connector for Confluent Platform」を参照してください。
このコネクターでは、AWS Lambda 関数を同期的または非同期的に呼び出すことができます。
- 同期モード の場合、1 つのトピックおよび 1 つのパーティションの中では、レコードは順次処理されます。異なるトピックのパーティション内のレコードは並列処理することができます。AWS Lambda からの応答は
success-<connector-id>
トピックに書き込まれます。このコネクターは、Lambda の実行でエラーが発生した場合、そのエラーをerror-<connector-id>
トピックに書き込んで処理を進めるように構成されています。Lambda 呼び出しの詳細については、『同期呼び出し』を参照してください。 - 非同期モード の場合、コネクターは "受領応答を待たない(fire-and-forget)モード" で実行されます。レコードは、ベストエフォート方式で順次処理されます。コネクターは再試行を一切行いません。AWS Lambda が自動的に最大 2 回再試行します。AWS Lambda は、その後にそのリクエストをデッドレターキューに移動することができます。Lambda 呼び出しの詳細については、『非同期呼び出し』を参照してください。
機能¶
AWS Lambda Sink Connector には、以下の機能があります。
同期および非同期の Lambda 関数呼び出し: このコネクターを使用して、AWS Lambda 関数を同期または非同期で呼び出すことができます。
少なくとも 1 回のデリバリー: このコネクターは、少なくとも 1 回の処理セマンティクスを保証します。特定の状況下では、同じレコードが 2 回以上処理される可能性があります。AWS Lambda 関数は、べき等 になるように設計する必要があります。Lambda 関数からの応答を Kafka のトピックに記録するようにコネクターを構成した場合は、重複したレコードがトピックに含まれる可能性があります。トピックで Kafka ログ圧縮を有効にして、重複レコードを削除することができます。あるいは、一定期間における重複レコードを検出する ksqlDB クエリを作成することもできます。
複数のタスクのサポート: このコネクターは、1 つまたは複数のタスクの実行をサポートしています。タスクが多いほどパフォーマンスが向上する可能性があります。
結果のトピック: 同期モードでは、AWS Lambda の結果は
success-<connector-id>
およびerror-<connector-id>
トピックに保管されます。スキーマありまたはスキーマなしの入力データフォーマット: このコネクターは、Avro、JSON スキーマ(JSON_SR)、Protobuf、または JSON(スキーマレス)フォーマットになっている Kafka トピックの入力データをサポートします。スキーマレジストリ ベースのフォーマットを使用するには、Schema Registry を有効にしておく必要があります。
注釈
スキーマが定義されていない場合、値はプレーンテキストの文字列としてエンコードされます。たとえば、
"name": "Kimberley Human"
はname=Kimberley Human
としてエンコードされます。
Connect 用の Confluent Cloud API の使用に関する詳細とサンプルについては、「Confluent Cloud API for Connect」セクションを参照してください。
制限¶
以下の情報を確認してください。
- コネクターの制限事項については、AWS Lambda Sink Connector の制限事項を参照してください。
- 1 つ以上の Single Message Transforms(SMT)を使用する場合は、「SMT の制限」を参照してください。
- Confluent Cloud Schema Registry を使用する場合は、「スキーマレジストリ Enabled Environments」を参照してください。
クイックスタート¶
このクイックスタートを使用して、Confluent Cloud AWS Lambda Sink Connector の利用を開始することができます。このクイックスタートでは、コネクターを選択してから、AWS Lambda にレコードを送信するようにコネクターを構成するための基本的な方法について説明します。
- 前提条件
AWS 上の Confluent Cloud クラスターへのアクセスを許可されていること。
Confluent CLI がインストールされ、クラスター用に構成されていること。「Confluent CLI のインストール」を参照してください。
スキーマレジストリ ベースのフォーマット(Avro、JSON_SR(JSON スキーマ)、Protobuf など)を使用するには、Schema Registry を有効にしておく必要があります。詳細については、「スキーマレジストリ Enabled Environments」を参照してください。
注釈
スキーマが定義されていない場合、値はプレーンテキストの文字列としてエンコードされます。たとえば、
"name": "Kimberley Human"
はname=Kimberley Human
としてエンコードされます。ネットワークに関する考慮事項については、「Networking and DNS Considerations」を参照してください。静的なエグレス IP を使用する方法については、「静的なエグレス IP アドレス」を参照してください。
AWS Lambda プロジェクトは、コネクターを実行する Confluent Cloud クラスターと同じリージョンに存在している必要があります。
アクセスキー を構成した AWS アカウント。
lambda:InvokeFunction
およびlambda:GetFunction
を許可するようにアカウントの Lambda IAM ポリシーを構成する必要があります。以下に、このポリシーを設定するための JSON の例を示します。{ "Version": "2012-10-17", "Statement": [ { "Sid": "VisualEditor0", "Effect": "Allow", "Action": [ "lambda:InvokeFunction", "lambda:GetFunction" ], "Resource": "*" } ] }
- Kafka クラスターの認証情報。次のいずれかの方法で認証情報を指定できます。
- 既存の サービスアカウント のリソース ID を入力する。
- コネクター用の Confluent Cloud サービスアカウント を作成します。「サービスアカウント」のドキュメントで、必要な ACL エントリを確認してください。一部のコネクターには固有の ACL 要件があります。
- Confluent Cloud の API キーとシークレットを作成する。キーとシークレットを作成するには、confluent api-key create を使用するか、コネクターのセットアップ時に Cloud Console で直接 API キーとシークレットを自動生成します。
Confluent Cloud Console の使用¶
ステップ 1: Confluent Cloud クラスターを起動します。¶
インストール手順については、「Quick Start for Confluent Cloud」を参照してください。
ステップ 2: コネクターを追加します。¶
左のナビゲーションメニューの Data integration をクリックし、Connectors をクリックします。クラスター内に既にコネクターがある場合は、+ Add connector をクリックします。
ステップ 4: コネクターの詳細情報を入力します。¶
注釈
- すべての 前提条件 を満たしていることを確認してください。
- アスタリスク ( * ) は必須項目であることを示しています。
Add AWS Lambda Sink Connector 画面で、以下を実行します。
既に Kafka トピックを用意している場合は、Topics リストから接続するトピックを選択します。
新しいトピックを作成するには、+Add new topic をクリックします。
- Kafka Cluster credentials で Kafka クラスターの認証情報の指定方法を選択します。以下のいずれかのオプションを選択できます。
- Global Access: コネクターは、ユーザーがアクセス権限を持つすべての対象にアクセスできます。グローバルアクセスの場合、コネクターのアクセス権限は、ユーザーのアカウントにリンクされます。このオプションは本稼働環境では推奨されません。
- Granular access: コネクターのアクセスが制限されます。コネクターのアクセス権限は サービスアカウント から制御できます。本稼働環境にはこのオプションをお勧めします。
- Use an existing API key: 保存済みの API キーおよびシークレット部分を入力できます。API キーとシークレットを入力するか Cloud Console でこれらを生成することもできます。
- Continue をクリックします。
- 以下の情報を入力します。
- AWS access key ID: AWS Lambda への接続に使用される Amazon アクセスキー。
- AWS secret access key: AWS Lambda への接続に使用される Amazon シークレットキー。
- AWS Lambda function name: 呼び出す AWS Lambda 関数を入力します。
- Continue をクリックします。
注釈
Cloud Console に表示されない構成プロパティでは、デフォルト値が使用されます。すべてのプロパティの値と定義については、「構成プロパティ」を参照してください。
Input Kafka record value で、Kafka 入力レコード値のフォーマット(Kafka トピックから送られるデータ)を AVRO、JSON_SR、PROTOBUF、JSON、または BYTES から選択します。スキーマベースのメッセージフォーマットを使用するには、有効なスキーマが Schema Registry に存在する必要があります。
Show advanced configurations
AWS Lambda invocation type: AWS Lambda 関数を呼び出すモード。sync と async の 2 つのモードがサポートされています。Lambda 呼び出しの詳細については、「同期呼び出し」または「非同期呼び出し」を参照してください。
Batch size: 1 回の AWS Lambda 関数呼び出しにまとめることができる Kafka レコードの最大数。これは、AWS Lambda 呼び出しのペイロード制限を超えない範囲で、可能な限り大きい値に設定する必要があります。レコードのバッチ処理を無効にするには、この値を 1 に設定します。
Record Converter Class: Kafka レコードを AWS Lambda ペイロードに変換するレコードコンバーターのクラス。
Transforms and Predicates: 詳細については、Single Message Transforms(SMT) のドキュメントを参照してください。
すべてのプロパティの値と定義については、「構成プロパティ」を参照してください。
Continue をクリックします。
選択するトピックのパーティション数に基づいて、推奨タスク数が表示されます。
- 推奨されたタスク数を変更するには、Tasks フィールドに、コネクターで使用する タスク の数を入力します。
- Continue をクリックします。
接続の詳細情報を確認します。
Launch をクリックします。
コネクターのステータスが Provisioning から Running に変わります。
ステップ 5: Lambda 関数のメトリクスを確認します。¶
AWS Lambda コンソールに移動して、Lambda 関数を開き、レコードが処理されていることを確認します。Lambda 関数ページの Monitoring タブで処理状況を確認できます。同期モード では、AWS Lambda の結果は以下のトピックに保管されます。
success-<connector-id>
error-<connector-id>
Connect 用の Confluent Cloud API の使用に関する詳細とサンプルについては、「Confluent Cloud API for Connect」セクションを参照してください。
ちなみに
コネクターを起動すると、デッドレターキューのトピックが自動的に生成されます。詳細については、「Confluent Cloud デッドレターキュー」を参照してください。
参考
フルマネージド型の Confluent Cloud コネクターが Confluent Cloud ksqlDB でどのように動作するかを示す例については、「Cloud ETL のデモ」を参照してください。この例では、Confluent CLI を使用して Confluent Cloud のリソースを管理する方法についても説明しています。
Confluent CLI を使用する場合¶
以下の手順に従うと、Confluent CLI を使用してコネクターをセットアップし、実行できます。
注釈
- すべての 前提条件 を満たしていることを確認してください。
- コマンド例では Confluent CLI バージョン 2 を使用しています。詳細については、「Confluent CLI v2 への移行 <https://docs.confluent.io/confluent-cli/current/migrate.html#cli-migrate>`__」を参照してください。
ステップ 2: コネクターの必須の構成プロパティを表示します。¶
以下のコマンドを実行して、コネクターの必須プロパティを表示します。
confluent connect plugin describe <connector-catalog-name>
例:
confluent connect plugin describe LambdaSink
出力例:
Following are the required configs:
connector.class: LambdaSink
name
topics
input.data.format
connector.class
kafka.auth.mode
kafka.api.key
kafka.api.secret
aws.access.key.id
aws.secret.access.key
aws.lambda.function.name
aws.lambda.invocation.type
tasks.max
ステップ 3: コネクターの構成ファイルを作成します。¶
コネクター構成プロパティを含む JSON ファイルを作成します。以下の例は、コネクターの必須プロパティを示しています。
{
"connector.class": "LambdaSink",
"name": "LambdaSinkConnector_0",
"topics": "users",
"input.data.format": "JSON",
"connector.class": "LambdaSink",
"kafka.auth.mode": "KAFKA_API_KEY",
"kafka.api.key": "****************",
"kafka.api.secret": "*************************************************",
"aws.access.key.id": "****************",
"aws.secret.access.key": "********************************************",
"aws.lambda.function.name": "myLambdaFunction",
"aws.lambda.invocation.type": "sync",
"behavior.on.error": "fail",
"tasks.max": "1"
}
以下の必須プロパティの定義に注意してください。
"connector.class"
: コネクターのプラグイン名を指定します。"name"
: 新しいコネクターの名前を設定します。"topics"
: 特定のトピック名を指定するか、複数のトピック名をコンマ区切りにしたリストを指定します。
"kafka.auth.mode"
: 使用するコネクターの認証モードを指定します。オプションはSERVICE_ACCOUNT
またはKAFKA_API_KEY
(デフォルト)です。API キーとシークレットを使用するには、構成プロパティkafka.api.key
とkafka.api.secret
を構成例(前述)のように指定します。サービスアカウント を使用するには、プロパティkafka.service.account.id=<service-account-resource-ID>
に リソース ID を指定します。使用できるサービスアカウントのリソース ID のリストを表示するには、次のコマンドを使用します。confluent iam service-account list
例:
confluent iam service-account list Id | Resource ID | Name | Description +---------+-------------+-------------------+------------------- 123456 | sa-l1r23m | sa-1 | Service account 1 789101 | sa-l4d56p | sa-2 | Service account 2
"input.data.format"
: Kafka 入力レコード値のフォーマット(Kafka トピックから送られるデータ)を設定します。指定可能なエントリは、AVRO、JSON_SR (JSON スキーマ)、PROTOBUF、JSON (スキーマレス)、BYTES です。スキーマベースのメッセージフォーマットを使用する場合は、Confluent Cloud Schema Registry を構成しておく必要があります。注釈
スキーマが定義されていない場合、値はプレーンテキストの文字列としてエンコードされます。たとえば、
"name": "Kimberley Human"
はname=Kimberley Human
としてエンコードされます。"aws.access.key.id"
および"aws.secret.access.key"
: AWS のアクセスキー ID とシークレットを入力します。これらのセットアップ方法については、『アクセスキー』を参照してください。"aws.lambda.function.name"
: 呼び出す Lambda 関数を入力します。詳しくは、『AWS Lambda とは』を参照してください。"aws.lambda.invocation.type"
:"sync"
: 1 つのトピックおよび 1 つのパーティションの中では、レコードは順次処理されます。異なるトピックのパーティション内のレコードとは並列処理することができます。構成すれば、AWS Lambda からの応答を Kafka トピックに書き込むことができます。Lambda の実行でエラーが発生した場合に、コネクターの構成によって、無視して続行したり、エラーをログに記録したり、コネクターを完全に停止したりすることができます。Lambda 呼び出しの詳細については、『同期呼び出し』を参照してください。"async"
: コネクターは "受領応答を待たない(fire-and-forget)モード" で実行されます。レコードは、ベストエフォート方式で順次処理されます。コネクターは再試行を一切行いません。AWS Lambda が自動的に最大 2 回再試行します。AWS Lambda は、その後にそのリクエストをデッドレターキューに移動することができます。Lambda 呼び出しの詳細については、『非同期呼び出し』を参照してください。
"behavior.on.error"
: AWS Lambda 関数の呼び出しのエラー処理の動作を選択します。"fail"
: エラーが発生した場合、コネクターを停止します。"ignore"
: 次のレコードセットの処理に進みます。"log"
: エラーメッセージをerror-<connector-id>
トピックに記録して処理を続行します。
注釈
この設定は、
aws.lambda.invocation.type
がsync
モードに設定されている場合にのみ適用されます。"tasks.max"
: このコネクターで使用する タスク の数を入力します。詳しくは、Confluent Cloud コネクターの制限事項 を参照してください。
Single Message Transforms: CLI を使用する SMT の追加の詳細については、Single Message Transforms(SMT) のドキュメントを参照してください。
すべてのプロパティの値と定義については、「構成プロパティ」を参照してください。
ステップ 4: プロパティファイルを読み込み、コネクターを作成します。¶
以下のコマンドを入力して、構成を読み込み、コネクターを起動します。
confluent connect create --config <file-name>.json
例:
confluent connect create --config lambda-sink-config.json
出力例:
Created connector LambdaSinkConnector_0 lcc-ix4dl
ステップ 5: コネクターのステータスを確認します。¶
以下のコマンドを入力して、コネクターのステータスを確認します。
confluent connect list
出力例:
ID | Name | Status | Type
+-----------+-----------------------+---------+------+
lcc-ix4dl | LambdaSinkConnector_0 | RUNNING | sink
ステップ 6: Lambda 関数のメトリクスを確認します。¶
AWS Lambda コンソールに移動して、Lambda 関数を開き、レコードが処理されていることを確認します。Lambda 関数ページの Monitoring タブで処理状況を確認できます。同期モード では、AWS Lambda の結果は以下のトピックに保管されます。
success-<connector-id>
error-<connector-id>
Connect 用の Confluent Cloud API の使用に関する詳細とサンプルについては、「Confluent Cloud API for Connect」セクションを参照してください。
ちなみに
コネクターを起動すると、デッドレターキューのトピックが自動的に生成されます。詳細については、「Confluent Cloud デッドレターキュー」を参照してください。
構成プロパティ¶
このコネクターでは、以下のコネクター構成プロパティを使用します。
データの取得元とするトピック(Which topics do you want to get data from?)¶
topics
特定のトピック名を指定するか、複数のトピック名をコンマ区切りにしたリストを指定します。
- 型: list
- 重要度: 高
入力メッセージ(Input messages)¶
input.data.format
Kafka 入力レコード値のフォーマットを設定します。指定可能なエントリは、AVRO、JSON_SR、PROTOBUF、JSON、または BYTES です。スキーマベースのメッセージフォーマット(AVRO、JSON_SR、PROTOBUF など)を使用する場合は、Confluent Cloud Schema Registry を構成しておく必要がある点に注意してください。
- 型: string
- 重要度: 高
データへの接続方法(How should we connect to your data?)¶
name
コネクターの名前を設定します。
- 型: string
- 指定可能な値: 最大 64 文字の文字列
- 重要度: 高
Kafka クラスターの認証情報(Kafka Cluster credentials)¶
kafka.auth.mode
Kafka の認証モード。KAFKA_API_KEY または SERVICE_ACCOUNT を指定できます。デフォルトは KAFKA_API_KEY モードです。
- 型: string
- デフォルト: KAFKA_API_KEY
- 指定可能な値: KAFKA_API_KEY、SERVICE_ACCOUNT
- 重要度: 高
kafka.api.key
- 型: password
- 重要度: 高
kafka.service.account.id
Kafka クラスターとの通信用の API キーを生成するために使用されるサービスアカウント。
- 型: string
- 重要度: 高
kafka.api.secret
- 型: password
- 重要度: 高
AWS 認証情報(AWS credentials)¶
aws.access.key.id
AWS Lambda への接続に使用される Amazon アクセスキー。
- 型: password
- 重要度: 高
aws.secret.access.key
AWS Lambda への接続に使用される Amazon シークレットキー。
- 型: password
- 重要度: 高
AWS Lambda への接続方法(How should we connect to AWS Lambda?)¶
aws.lambda.configuration.mode
The mode in connector is configured to invoke a single AWS Lambda function or multiple Lambda functions
- 型: string
- Default: single
- 重要度: 高
aws.lambda.function.name
呼び出す AWS Lambda 関数。
- 型: string
- 重要度: 高
aws.lambda.topic2function.map
Map of Kafka topics to Lambda functions. Format: comma-separated tuples, e.g. <topic-1>;<function-1>,<topic-2>;<function-2>,...
- 型: list
- 重要度: 高
aws.lambda.region
lambda が定義されている AWS のリージョン。このプロパティの値が指定されていない場合、「kafka.region」プロパティに指定されている値が使用されます。
- 型: string
- 重要度: 低
Lambda の詳細(Lambda details)¶
aws.lambda.invocation.type
AWS Lambda 関数を呼び出すモード。2 つのモードがサポートされています。まず "sync" モードでは、レコードが順次処理されます。必要に応じて、AWS Lambda 関数の結果またはエラーを Kafka のトピック success-<connector-id> または error-<connector-id> に保存することができます。次に "async" モードでは、レコードが順次処理されない可能性があります。このコネクターは、AWS Lambda 関数の実行の結果もエラーも保存しません。AWS に送達不能キューを構成することで、関数のエラーを記録できます。
- 型: string
- デフォルト: sync
- 重要度: 高
aws.lambda.batch.size
1 回の AWS Lambda 関数呼び出しにまとめることができる Kafka レコードの最大数。これは、AWS Lambda 呼び出しのペイロード制限を超えない範囲で、可能な限り大きい値に設定する必要があります。レコードのバッチ処理を無効にするには、この値を 1 に設定します。
- 型: int
- デフォルト: 20
- 指定可能な値: [1、…]
- 重要度: 低
record.converter.class
Kafka レコードを AWS Lambda ペイロードに変換するレコードコンバーターのクラス。指定可能なエントリは「JsonKeyValueConverter」および「EscapedJsonKeyValueConverter」です。JsonKeyValueConverter の場合、「key」と「value」はスキーマに応じて JSON プリミティブまたはオブジェクトに変換されます。スキーマが定義されていない場合、プレーンテキストの文字列としてエンコードされます。EscapedJsonKeyValueConverter の場合も JsonKeyValueConverter と同様です。ただし、スキーマが定義されていない場合、「key」と「value」はエスケープされた json 文字列としてエンコードされます。詳細については、コネクターのドキュメントを参照してください。
- 型: string
- デフォルト: JsonKeyValueConverter
- 重要度: 低
エラーの処理方法(How should we handle errors?)¶
behavior.on.error
lambda 関数を実行した結果、エラーが返された場合のコネクターの動作。この設定は、AWS Lambda の呼び出しタイプが「sync」モードに設定されている場合のみ適用されます。指定可能なオプションは、「log」、「fail」、「ignore」です。「log」の場合、エラーメッセージが error-<connector-id> トピックに記録されて処理が続行されます。「fail」の場合、エラーが発生すると接続を停止します。「ignore」の場合、次のレコードセットの処理に進みます。
- 型: string
- デフォルト: log
- 重要度: 低
このコネクターのタスク数(Number of tasks for this connector)¶
tasks.max
- 型: int
- 指定可能な値: [1、…]
- 重要度: 高
次のステップ¶
参考
フルマネージド型の Confluent Cloud コネクターが Confluent Cloud ksqlDB でどのように動作するかを示す例については、「Cloud ETL のデモ」を参照してください。この例では、Confluent CLI を使用して Confluent Cloud のリソースを管理する方法についても説明しています。