重要

このページの日本語コンテンツは古くなっている可能性があります。最新の英語版コンテンツをご覧になるには、こちらをクリックしてください。

AWS Lambda Sink Connector for Confluent Cloud

注釈

Confluent Platform 用にコネクターをローカルにインストールする場合は、「AWS Lambda Sink Connector for Confluent Platform」を参照してください。

このコネクターでは、AWS Lambda 関数を同期的または非同期的に呼び出すことができます。

  • 同期モード の場合、1 つのトピックおよび 1 つのパーティションの中では、レコードは順次処理されます。異なるトピックのパーティション内のレコードは並列処理することができます。AWS Lambda からの応答は success-<connector-id> トピックに書き込まれます。このコネクターは、Lambda の実行でエラーが発生した場合、そのエラーを error-<connector-id> トピックに書き込んで処理を進めるように構成されています。Lambda 呼び出しの詳細については、『同期呼び出し』を参照してください。
  • 非同期モード の場合、コネクターは "受領応答を待たない(fire-and-forget)モード" で実行されます。レコードは、ベストエフォート方式で順次処理されます。コネクターは再試行を一切行いません。AWS Lambda が自動的に最大 2 回再試行します。AWS Lambda は、その後にそのリクエストをデッドレターキューに移動することができます。Lambda 呼び出しの詳細については、『非同期呼び出し』を参照してください。

機能

AWS Lambda Sink Connector には、以下の機能があります。

  • 同期および非同期の Lambda 関数呼び出し: このコネクターを使用して、AWS Lambda 関数を同期または非同期で呼び出すことができます。

  • 少なくとも 1 回のデリバリー: このコネクターは、少なくとも 1 回の処理セマンティクスを保証します。特定の状況下では、同じレコードが 2 回以上処理される可能性があります。AWS Lambda 関数は、べき等 になるように設計する必要があります。Lambda 関数からの応答を Kafka のトピックに記録するようにコネクターを構成した場合は、重複したレコードがトピックに含まれる可能性があります。トピックで Kafka ログ圧縮を有効にして、重複レコードを削除することができます。あるいは、一定期間における重複レコードを検出する ksqlDB クエリを作成することもできます。

  • 複数のタスクのサポート: このコネクターは、1 つまたは複数のタスクの実行をサポートしています。タスクが多いほどパフォーマンスが向上する可能性があります。

  • 結果のトピック: 同期モードでは、AWS Lambda の結果は success-<connector-id> および error-<connector-id> トピックに保管されます。

  • スキーマありまたはスキーマなしの入力データフォーマット: このコネクターは、Avro、JSON スキーマ(JSON_SR)、Protobuf、または JSON(スキーマレス)フォーマットになっている Kafka トピックの入力データをサポートします。スキーマレジストリ ベースのフォーマットを使用するには、Schema Registry を有効にしておく必要があります。

    注釈

    スキーマが定義されていない場合、値はプレーンテキストの文字列としてエンコードされます。たとえば、"name": "Kimberley Human"name=Kimberley Human としてエンコードされます。

Connect 用の Confluent Cloud API の使用に関する詳細とサンプルについては、「Confluent Cloud API for Connect」セクションを参照してください。

制限

以下の情報を確認してください。

クイックスタート

このクイックスタートを使用して、Confluent Cloud AWS Lambda Sink Connector の利用を開始することができます。このクイックスタートでは、コネクターを選択してから、AWS Lambda にレコードを送信するようにコネクターを構成するための基本的な方法について説明します。

前提条件
  • AWS 上の Confluent Cloud クラスターへのアクセスを許可されていること。

  • Confluent CLI がインストールされ、クラスター用に構成されていること。「Confluent CLI のインストール」を参照してください。

  • スキーマレジストリ ベースのフォーマット(Avro、JSON_SR(JSON スキーマ)、Protobuf など)を使用するには、Schema Registry を有効にしておく必要があります。詳細については、「スキーマレジストリ Enabled Environments」を参照してください。

    注釈

    スキーマが定義されていない場合、値はプレーンテキストの文字列としてエンコードされます。たとえば、"name": "Kimberley Human"name=Kimberley Human としてエンコードされます。

  • ネットワークに関する考慮事項については、「Networking and DNS Considerations」を参照してください。静的なエグレス IP を使用する方法については、「静的なエグレス IP アドレス」を参照してください。

  • AWS Lambda プロジェクトは、コネクターを実行する Confluent Cloud クラスターと同じリージョンに存在している必要があります。

  • アクセスキー を構成した AWS アカウント。

  • lambda:InvokeFunction および lambda:GetFunction を許可するようにアカウントの Lambda IAM ポリシーを構成する必要があります。以下に、このポリシーを設定するための JSON の例を示します。

    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Sid": "VisualEditor0",
                "Effect": "Allow",
                "Action": [
                    "lambda:InvokeFunction",
                    "lambda:GetFunction"
                ],
                "Resource": "*"
            }
        ]
    }
    
  • Kafka クラスターの認証情報。次のいずれかの方法で認証情報を指定できます。
    • 既存の サービスアカウント のリソース ID を入力する。
    • コネクター用の Confluent Cloud サービスアカウント を作成します。「サービスアカウント」のドキュメントで、必要な ACL エントリを確認してください。一部のコネクターには固有の ACL 要件があります。
    • Confluent Cloud の API キーとシークレットを作成する。キーとシークレットを作成するには、confluent api-key create を使用するか、コネクターのセットアップ時に Cloud Console で直接 API キーとシークレットを自動生成します。

Confluent Cloud Console の使用

ステップ 1: Confluent Cloud クラスターを起動します。

インストール手順については、「Quick Start for Confluent Cloud」を参照してください。

ステップ 2: コネクターを追加します。

左のナビゲーションメニューの Data integration をクリックし、Connectors をクリックします。クラスター内に既にコネクターがある場合は、+ Add connector をクリックします。

ステップ 3: コネクターを選択します。

Click the AWS Lambda Sink connector card.

AWS Lambda Sink Connector Card

ステップ 4: コネクターの詳細情報を入力します。

注釈

  • すべての 前提条件 を満たしていることを確認してください。
  • アスタリスク ( * ) は必須項目であることを示しています。

Add AWS Lambda Sink Connector 画面で、以下を実行します。

既に Kafka トピックを用意している場合は、Topics リストから接続するトピックを選択します。

新しいトピックを作成するには、+Add new topic をクリックします。

ステップ 5: Lambda 関数のメトリクスを確認します。

AWS Lambda コンソールに移動して、Lambda 関数を開き、レコードが処理されていることを確認します。Lambda 関数ページの Monitoring タブで処理状況を確認できます。同期モード では、AWS Lambda の結果は以下のトピックに保管されます。

  • success-<connector-id>
  • error-<connector-id>

Connect 用の Confluent Cloud API の使用に関する詳細とサンプルについては、「Confluent Cloud API for Connect」セクションを参照してください。

ちなみに

コネクターを起動すると、デッドレターキューのトピックが自動的に生成されます。詳細については、「Confluent Cloud デッドレターキュー」を参照してください。

参考

フルマネージド型の Confluent Cloud コネクターが Confluent Cloud ksqlDB でどのように動作するかを示す例については、「Cloud ETL のデモ」を参照してください。この例では、Confluent CLI を使用して Confluent Cloud のリソースを管理する方法についても説明しています。

../_images/topology.ja.png

Confluent CLI を使用する場合

以下の手順に従うと、Confluent CLI を使用してコネクターをセットアップし、実行できます。

注釈

ステップ 1: 使用可能なコネクターをリスト表示します。

以下のコマンドを入力して、使用可能なコネクターをリスト表示します。

confluent connect plugin list

ステップ 2: コネクターの必須の構成プロパティを表示します。

以下のコマンドを実行して、コネクターの必須プロパティを表示します。

confluent connect plugin describe <connector-catalog-name>

例:

confluent connect plugin describe LambdaSink

出力例:

Following are the required configs:
connector.class: LambdaSink
name
topics
input.data.format
connector.class
kafka.auth.mode
kafka.api.key
kafka.api.secret
aws.access.key.id
aws.secret.access.key
aws.lambda.function.name
aws.lambda.invocation.type
tasks.max

ステップ 3: コネクターの構成ファイルを作成します。

コネクター構成プロパティを含む JSON ファイルを作成します。以下の例は、コネクターの必須プロパティを示しています。

{
  "connector.class": "LambdaSink",
  "name": "LambdaSinkConnector_0",
  "topics": "users",
  "input.data.format": "JSON",
  "connector.class": "LambdaSink",
  "kafka.auth.mode": "KAFKA_API_KEY",
  "kafka.api.key": "****************",
  "kafka.api.secret": "*************************************************",
  "aws.access.key.id": "****************",
  "aws.secret.access.key": "********************************************",
  "aws.lambda.function.name": "myLambdaFunction",
  "aws.lambda.invocation.type": "sync",
  "behavior.on.error": "fail",
  "tasks.max": "1"
}

以下の必須プロパティの定義に注意してください。

  • "connector.class": コネクターのプラグイン名を指定します。
  • "name": 新しいコネクターの名前を設定します。
  • "topics": 特定のトピック名を指定するか、複数のトピック名をコンマ区切りにしたリストを指定します。
  • "kafka.auth.mode": 使用するコネクターの認証モードを指定します。オプションは SERVICE_ACCOUNT または KAFKA_API_KEY (デフォルト)です。API キーとシークレットを使用するには、構成プロパティ kafka.api.keykafka.api.secret を構成例(前述)のように指定します。サービスアカウント を使用するには、プロパティ kafka.service.account.id=<service-account-resource-ID>リソース ID を指定します。使用できるサービスアカウントのリソース ID のリストを表示するには、次のコマンドを使用します。

    confluent iam service-account list
    

    例:

    confluent iam service-account list
    
       Id     | Resource ID |       Name        |    Description
    +---------+-------------+-------------------+-------------------
       123456 | sa-l1r23m   | sa-1              | Service account 1
       789101 | sa-l4d56p   | sa-2              | Service account 2
    
  • "input.data.format": Kafka 入力レコード値のフォーマット(Kafka トピックから送られるデータ)を設定します。指定可能なエントリは、AVROJSON_SR (JSON スキーマ)、PROTOBUFJSON (スキーマレス)、BYTES です。スキーマベースのメッセージフォーマットを使用する場合は、Confluent Cloud Schema Registry を構成しておく必要があります。

    注釈

    スキーマが定義されていない場合、値はプレーンテキストの文字列としてエンコードされます。たとえば、"name": "Kimberley Human"name=Kimberley Human としてエンコードされます。

  • "aws.access.key.id" および "aws.secret.access.key": AWS のアクセスキー ID とシークレットを入力します。これらのセットアップ方法については、『アクセスキー』を参照してください。

  • "aws.lambda.function.name": 呼び出す Lambda 関数を入力します。詳しくは、『AWS Lambda とは』を参照してください。

  • "aws.lambda.invocation.type":

    • "sync": 1 つのトピックおよび 1 つのパーティションの中では、レコードは順次処理されます。異なるトピックのパーティション内のレコードとは並列処理することができます。構成すれば、AWS Lambda からの応答を Kafka トピックに書き込むことができます。Lambda の実行でエラーが発生した場合に、コネクターの構成によって、無視して続行したり、エラーをログに記録したり、コネクターを完全に停止したりすることができます。Lambda 呼び出しの詳細については、『同期呼び出し』を参照してください。
    • "async": コネクターは "受領応答を待たない(fire-and-forget)モード" で実行されます。レコードは、ベストエフォート方式で順次処理されます。コネクターは再試行を一切行いません。AWS Lambda が自動的に最大 2 回再試行します。AWS Lambda は、その後にそのリクエストをデッドレターキューに移動することができます。Lambda 呼び出しの詳細については、『非同期呼び出し』を参照してください。
  • "behavior.on.error": AWS Lambda 関数の呼び出しのエラー処理の動作を選択します。

    • "fail": エラーが発生した場合、コネクターを停止します。
    • "ignore": 次のレコードセットの処理に進みます。
    • "log": エラーメッセージを error-<connector-id> トピックに記録して処理を続行します。

    注釈

    この設定は、aws.lambda.invocation.typesync モードに設定されている場合にのみ適用されます。

  • "tasks.max": このコネクターで使用する タスク の数を入力します。詳しくは、Confluent Cloud コネクターの制限事項 を参照してください。

Single Message Transforms: CLI を使用する SMT の追加の詳細については、Single Message Transforms(SMT) のドキュメントを参照してください。

すべてのプロパティの値と定義については、「構成プロパティ」を参照してください。

ステップ 4: プロパティファイルを読み込み、コネクターを作成します。

以下のコマンドを入力して、構成を読み込み、コネクターを起動します。

confluent connect create --config <file-name>.json

例:

confluent connect create --config lambda-sink-config.json

出力例:

Created connector LambdaSinkConnector_0 lcc-ix4dl

ステップ 5: コネクターのステータスを確認します。

以下のコマンドを入力して、コネクターのステータスを確認します。

confluent connect list

出力例:

ID          |       Name            | Status  | Type
+-----------+-----------------------+---------+------+
lcc-ix4dl   | LambdaSinkConnector_0 | RUNNING | sink

ステップ 6: Lambda 関数のメトリクスを確認します。

AWS Lambda コンソールに移動して、Lambda 関数を開き、レコードが処理されていることを確認します。Lambda 関数ページの Monitoring タブで処理状況を確認できます。同期モード では、AWS Lambda の結果は以下のトピックに保管されます。

  • success-<connector-id>
  • error-<connector-id>

Connect 用の Confluent Cloud API の使用に関する詳細とサンプルについては、「Confluent Cloud API for Connect」セクションを参照してください。

ちなみに

コネクターを起動すると、デッドレターキューのトピックが自動的に生成されます。詳細については、「Confluent Cloud デッドレターキュー」を参照してください。

構成プロパティ

このコネクターでは、以下のコネクター構成プロパティを使用します。

データの取得元とするトピック(Which topics do you want to get data from?)

topics

特定のトピック名を指定するか、複数のトピック名をコンマ区切りにしたリストを指定します。

  • 型: list
  • 重要度: 高

入力メッセージ(Input messages)

input.data.format

Kafka 入力レコード値のフォーマットを設定します。指定可能なエントリは、AVRO、JSON_SR、PROTOBUF、JSON、または BYTES です。スキーマベースのメッセージフォーマット(AVRO、JSON_SR、PROTOBUF など)を使用する場合は、Confluent Cloud Schema Registry を構成しておく必要がある点に注意してください。

  • 型: string
  • 重要度: 高

データへの接続方法(How should we connect to your data?)

name

コネクターの名前を設定します。

  • 型: string
  • 指定可能な値: 最大 64 文字の文字列
  • 重要度: 高

Kafka クラスターの認証情報(Kafka Cluster credentials)

kafka.auth.mode

Kafka の認証モード。KAFKA_API_KEY または SERVICE_ACCOUNT を指定できます。デフォルトは KAFKA_API_KEY モードです。

  • 型: string
  • デフォルト: KAFKA_API_KEY
  • 指定可能な値: KAFKA_API_KEY、SERVICE_ACCOUNT
  • 重要度: 高
kafka.api.key
  • 型: password
  • 重要度: 高
kafka.service.account.id

Kafka クラスターとの通信用の API キーを生成するために使用されるサービスアカウント。

  • 型: string
  • 重要度: 高
kafka.api.secret
  • 型: password
  • 重要度: 高

AWS 認証情報(AWS credentials)

aws.access.key.id

AWS Lambda への接続に使用される Amazon アクセスキー。

  • 型: password
  • 重要度: 高
aws.secret.access.key

AWS Lambda への接続に使用される Amazon シークレットキー。

  • 型: password
  • 重要度: 高

AWS Lambda への接続方法(How should we connect to AWS Lambda?)

aws.lambda.function.name

呼び出す AWS Lambda 関数。

  • 型: string
  • 重要度: 高
aws.lambda.region

lambda が定義されている AWS のリージョン。このプロパティの値が指定されていない場合、「kafka.region」プロパティに指定されている値が使用されます。

  • 型: string
  • 重要度: 低

Lambda の詳細(Lambda details)

aws.lambda.invocation.type

AWS Lambda 関数を呼び出すモード。2 つのモードがサポートされています。まず "sync" モードでは、レコードが順次処理されます。必要に応じて、AWS Lambda 関数の結果またはエラーを Kafka のトピック success-<connector-id> または error-<connector-id> に保存することができます。次に "async" モードでは、レコードが順次処理されない可能性があります。このコネクターは、AWS Lambda 関数の実行の結果もエラーも保存しません。AWS に送達不能キューを構成することで、関数のエラーを記録できます。

  • 型: string
  • デフォルト: sync
  • 重要度: 高
aws.lambda.batch.size

1 回の AWS Lambda 関数呼び出しにまとめることができる Kafka レコードの最大数。これは、AWS Lambda 呼び出しのペイロード制限を超えない範囲で、可能な限り大きい値に設定する必要があります。レコードのバッチ処理を無効にするには、この値を 1 に設定します。

  • 型: int
  • デフォルト: 20
  • 指定可能な値: [1、…]
  • 重要度: 低
record.converter.class

Kafka レコードを AWS Lambda ペイロードに変換するレコードコンバーターのクラス。指定可能なエントリは「JsonKeyValueConverter」および「EscapedJsonKeyValueConverter」です。JsonKeyValueConverter の場合、「key」と「value」はスキーマに応じて JSON プリミティブまたはオブジェクトに変換されます。スキーマが定義されていない場合、プレーンテキストの文字列としてエンコードされます。EscapedJsonKeyValueConverter の場合も JsonKeyValueConverter と同様です。ただし、スキーマが定義されていない場合、「key」と「value」はエスケープされた json 文字列としてエンコードされます。詳細については、コネクターのドキュメントを参照してください。

  • 型: string
  • デフォルト: JsonKeyValueConverter
  • 重要度: 低

エラーの処理方法(How should we handle errors?)

behavior.on.error

lambda 関数を実行した結果、エラーが返された場合のコネクターの動作。この設定は、AWS Lambda の呼び出しタイプが「sync」モードに設定されている場合のみ適用されます。指定可能なオプションは、「log」、「fail」、「ignore」です。「log」の場合、エラーメッセージが error-<connector-id> トピックに記録されて処理が続行されます。「fail」の場合、エラーが発生すると接続を停止します。「ignore」の場合、次のレコードセットの処理に進みます。

  • 型: string
  • デフォルト: log
  • 重要度: 低

このコネクターのタスク数(Number of tasks for this connector)

tasks.max
  • 型: int
  • 指定可能な値: [1、…]
  • 重要度: 高

次のステップ

参考

フルマネージド型の Confluent Cloud コネクターが Confluent Cloud ksqlDB でどのように動作するかを示す例については、「Cloud ETL のデモ」を参照してください。この例では、Confluent CLI を使用して Confluent Cloud のリソースを管理する方法についても説明しています。

../_images/topology.ja.png