重要

このページの日本語コンテンツは古くなっている可能性があります。最新の英語版コンテンツをご覧になるには、こちらをクリックしてください。

Google Cloud Functions Sink Connector for Confluent Cloud

注釈

Confluent Platform 用にコネクターをローカルにインストールする場合は、「Google Functions Sink Connector for Confluent Platform」を参照してください。

Kafka Connect Google Cloud Functions Sink Connector for Confluent Cloud を使用すると、Apache Kafka® を Google Cloud Functions と統合できます。関数の基本的な情報については、Google Cloud Functions のドキュメント を参照してください。

このコネクターは、Kafka のトピックからレコードを消費して、Google Cloud Functions を実行します。Google Cloud Functions に送信される各リクエストには、max.batch.size で指定した件数までレコードを含めることができます。

機能

Google Cloud Functions Sink Connector には、以下の機能があります。

  • Google Cloud Functions の結果は、以下のトピックに格納されます。
    • success-<connector-id>
    • error-<connector-id>
  • サポートされる入力データのフォーマットは、Bytes、AVRO、JSON_SR(JSON スキーマ)、JSON(スキーマレス)および PROTOBUF です。スキーマが定義されていない場合、値はプレーンテキストの文字列としてエンコードされます。たとえば、"name": "Kimberley Human"name=Kimberley Human としてエンコードされます。

Connect 用の Confluent Cloud API の使用に関する詳細とサンプルについては、「Confluent Cloud API for Connect」セクションを参照してください。

制限

以下の情報を確認してください。

クイックスタート

このクイックスタートを使用して、Confluent Cloud Google Cloud Functions Sink Connector の利用を開始することができます。このクイックスタートでは、コネクターを選択し、ターゲットの Google Cloud Function にイベントをストリーミングするようにコネクターを構成するための基本的な方法について説明します。

前提条件
  • GCP 上の Confluent Cloud クラスターへのアクセスを許可されていること。
  • Google Cloud の関数へのアクセス。関数の基本的な情報については、Google Cloud Functions のドキュメント を参照してください。
  • GCP サービスアカウント。サービスアカウントの 認証情報を JSON ファイル 形式でダウンロードします。コネクター構成プロパティをセットアップするときに、この認証情報ファイルをアップロードします。
  • Confluent CLI がインストールされ、クラスター用に構成されていること。「Confluent CLI のインストール」を参照してください。
  • スキーマレジストリ ベースのフォーマット(Avro、JSON_SR(JSON スキーマ)、Protobuf など)を使用するには、Schema Registry を有効にしておく必要があります。詳細については、「スキーマレジストリ Enabled Environments」を参照してください。
  • ターゲットの Google Cloud の関数と Kafka クラスターは、同じリージョンに存在している必要があります。
  • Kafka クラスターの認証情報。次のいずれかの方法で認証情報を指定できます。

    • 既存の サービスアカウント のリソース ID を入力する。
    • コネクター用の Confluent Cloud サービスアカウント を作成します。「サービスアカウント」のドキュメントで、必要な ACL エントリを確認してください。一部のコネクターには固有の ACL 要件があります。
    • Confluent Cloud の API キーとシークレットを作成する。キーとシークレットを作成するには、confluent api-key create を使用するか、コネクターのセットアップ時に Cloud Console で直接 API キーとシークレットを自動生成します。
  • 以下の connector-service アカウントロールがプロジェクトで有効になっている必要があります。

    Google Project Connector のロール
  • Trigger typeHTTP に設定する必要があります。Require authentication を選択します。

    HTTP トリガータイプ

Confluent Cloud Console の使用

ステップ 1: Confluent Cloud クラスターを起動します。

インストール手順については、「Quick Start for Confluent Cloud」を参照してください。

ステップ 2: コネクターを追加します。

左のナビゲーションメニューの Data integration をクリックし、Connectors をクリックします。クラスター内に既にコネクターがある場合は、+ Add connector をクリックします。

ステップ 3: コネクターを選択します。

Click the Google Cloud Functions Sink connector card.

Google Cloud Functions Sink Connector Card

ステップ 4: コネクターの詳細情報を入力します。

注釈

  • すべての 前提条件 を満たしていることを確認してください。
  • アスタリスク( * )は必須項目であることを示しています。

Add Google Cloud Functions Sink Connector 画面で、以下を実行します。

既に Kafka トピックを用意している場合は、Topics リストから接続するトピックを選択します。

新しいトピックを作成するには、+Add new topic をクリックします。

ステップ 5: レコードを確認します。

レコードが生成されていることを確認します。

Connect 用の Confluent Cloud API の使用に関する詳細とサンプルについては、「Confluent Cloud API for Connect」セクションを参照してください。

ちなみに

コネクターを起動すると、デッドレターキューのトピックが自動的に生成されます。詳細については、「Confluent Cloud デッドレターキュー」を参照してください。

参考

フルマネージド型の Confluent Cloud コネクターが Confluent Cloud ksqlDB でどのように動作するかを示す例については、「Cloud ETL のデモ」を参照してください。この例では、Confluent CLI を使用して Confluent Cloud のリソースを管理する方法についても説明しています。

../_images/topology.ja.png

Confluent CLI を使用する場合

以下の手順に従うと、Confluent CLI を使用してコネクターをセットアップし、実行できます。

注釈

ステップ 1: 使用可能なコネクターをリスト表示します。

以下のコマンドを入力して、使用可能なコネクターをリスト表示します。

confluent connect plugin list

ステップ 2: コネクターの必須の構成プロパティを表示します。

以下のコマンドを実行して、コネクターの必須プロパティを表示します。

confluent connect plugin describe <connector-catalog-name>

例:

confluent connect plugin describe GoogleCloudFunctionSink

出力例:

Following are the required configs:
connector.class
name
topics
input.data.format
kafka.auth.mode
kafka.api.key
kafka.api.secret
function.name
project.id
gcf.credentials.json
tasks.max

ステップ 3: コネクターの構成ファイルを作成します。

コネクター構成プロパティを含む JSON ファイルを作成します。以下の例は、コネクターの必須プロパティを示しています。

{
  "connector.class": "GoogleCloudFunctionsSink",
  "name": "GoogleCloudFunctionsSinkConnector_0",
  "topics": "pageviews_proto",
  "input.data.format": "PROTOBUF",
  "kafka.auth.mode": "KAFKA_API_KEY",
  "kafka.api.key": "****************",
  "kafka.api.secret": "****************************************************************",
  "function.name": "dev-test",
  "project.id": "connect-2021",
  "gcf.credentials.json": "*",
  "tasks.max": "1"
}

以下のプロパティ定義に注意してください。

  • "connector.class": コネクターのプラグイン名を指定します。
  • "name": 新しいコネクターの名前を設定します。
  • "topics": 特定のトピック名を指定するか、複数のトピック名をコンマ区切りにしたリストを指定します。
  • "input.data.format": Kafka 入力レコード値のフォーマットを設定します。指定可能なエントリは、AVROJSON_SRPROTOBUFJSON、または BYTES です。スキーマベースのメッセージフォーマット(たとえば、Avro、JSON_SR(JSON スキーマ)、および Protobuf)を使用するには、Confluent Cloud Schema Registry を構成しておく必要があります。
  • "kafka.auth.mode": 使用するコネクターの認証モードを指定します。オプションは SERVICE_ACCOUNT または KAFKA_API_KEY (デフォルト)です。API キーとシークレットを使用するには、構成プロパティ kafka.api.keykafka.api.secret を構成例(前述)のように指定します。サービスアカウント を使用するには、プロパティ kafka.service.account.id=<service-account-resource-ID>リソース ID を指定します。使用できるサービスアカウントのリソース ID のリストを表示するには、次のコマンドを使用します。

    confluent iam service-account list
    

    例:

    confluent iam service-account list
    
       Id     | Resource ID |       Name        |    Description
    +---------+-------------+-------------------+-------------------
       123456 | sa-l1r23m   | sa-1              | Service account 1
       789101 | sa-l4d56p   | sa-2              | Service account 2
    
  • "function.name": 事前に定義した Google Cloud 関数の名前。

  • "project.id": GCP プロジェクト ID。

  • "gcf.credentials.json": ダウンロードした JSON ファイルの内容が入ります。ダウンロードした認証情報ファイルの内容を、フォーマットを変更して使用する方法について詳しくは、「GCP 認証情報のフォーマットの変更」を参照してください。

オプション

  • "max.batch.size": Google Cloud 関数の 1 回の呼び出しにまとめることができるレコードの最大件数。デフォルトは 1 (バッチ処理は無効)です。1 から 1000 までの値を指定できます。
  • "max.pending.requests": Google Cloud 関数に対して同時に要求できる処理待ちリクエストの最大数。デフォルトは 1 です。
  • "request.timeout": コネクターが Google Cloud Functions へのリクエストを試行できる最長時間(ミリ秒)。これを過ぎるとタイムアウト(ソケットタイムアウト)します。デフォルトは 300000 ミリ秒(5 分)です。
  • "retry.timeout": 失敗したリクエスト(スロットリング時など)がコネクターによって指数関数的にバックオフおよび再試行される合計時間(ミリ秒)。再試行される応答コードは HTTP 429 Too BusyHTTP 502 Bad Gateway です。デフォルトは 300000 ミリ秒(5 分)です。永続的に再試行するようにこのプロパティを構成するには -1 を入力します。

Single Message Transforms: CLI を使用した SMT の追加の詳細については、Single Message Transforms(SMT) のドキュメントを参照してください。このコネクターでサポートされていない SMT のリストについては、「サポートされない変換」を参照してください。

すべてのプロパティの値と定義については、「構成プロパティ」を参照してください。

GCP 認証情報のフォーマットの変更

ダウンロードした認証情報ファイルの内容は、コネクター構成で使用する前に、文字列フォーマットに変換する必要があります。

  1. JSON ファイルの内容を文字列フォーマットに変換します。これは、オンラインのコンバーターツールを使用して実行できます。たとえば、JSON to String Online Converter などがあります。

  2. Private Key セクションの \n のすべての出現箇所の前にエスケープ文字 \ を追加します。これで、各セクションの先頭が \\n になります(以下の強調表示された行を参照してください)。以下の例は、\\n の出現箇所がわかりすいようにフォーマットを整えています。認証情報キーの大部分は省略しています。

    ちなみに

    認証情報を文字列に変換し、さらに必要に応じてエスケープ文字 \ を追加するスクリプトも用意されています。Stringify GCP Credentials を参照してください。

      {
          "connector.class": "GoogleCloudFunctionsSink",
          "name": "GoogleCloudFunctionsSinkConnector_0",
          "kafka.api.key": "<my-kafka-api-key>",
          "kafka.api.secret": "<my-kafka-api-secret>",
          "topics": "<topic-name>",
          "data.format": "AVRO",
          "function.name": "dev-test",
          "project.id": "connect-2021",
          "gcf.credentials.json": "{\"type\":\"service_account\",\"project_id\":\"connect-
          1234567\",\"private_key_id\":\"omitted\",
          \"private_key\":\"-----BEGIN PRIVATE KEY-----
          \\nMIIEvAIBADANBgkqhkiG9w0BA
          \\n6MhBA9TIXB4dPiYYNOYwbfy0Lki8zGn7T6wovGS5pzsIh
          \\nOAQ8oRolFp\rdwc2cC5wyZ2+E+bhwn
          \\nPdCTW+oZoodY\\nOGB18cCKn5mJRzpiYsb5eGv2fN\/J
          \\n...rest of key omitted...
          \\n-----END PRIVATE KEY-----\\n\",
          \"client_email\":\"pub-sub@connect-123456789.iam.gserviceaccount.com\",
          \"client_id\":\"123456789\",\"auth_uri\":\"https:\/\/accounts.google.com\/o\/oauth2\/
          auth\",\"token_uri\":\"https:\/\/oauth2.googleapis.com\/
          token\",\"auth_provider_x509_cert_url\":\"https:\/\/
          www.googleapis.com\/oauth2\/v1\/
          certs\",\"client_x509_cert_url\":\"https:\/\/www.googleapis.com\/
          robot\/v1\/metadata\/x509\/pub-sub%40connect-
          123456789.iam.gserviceaccount.com\"}",
          "tasks.max": "1"
      }
    
  3. 変換したすべての文字列の内容を、上記の例のように構成ファイルの "gcf.credentials.json" セクションに追加します。

ステップ 4: プロパティファイルを読み込み、コネクターを作成します。

以下のコマンドを入力して、構成を読み込み、コネクターを起動します。

confluent connect create --config <file-name>.json

例:

confluent connect create --config google-functions-sink-config.json

出力例:

Created connector GoogleCloudFunctionsSinkConnector_0 lcc-ix4dl

ステップ 5: コネクターのステータスを確認します。

以下のコマンドを入力して、コネクターのステータスを確認します。

confluent connect list

出力例:

ID          |       Name                          | Status  | Type
+-----------+-------------------------------------+---------+------+
lcc-ix4dl   | GoogleCloudFunctionsSinkConnector_0 | RUNNING | sink

ステップ 6: レコードを確認します。

レコードが生成されていることを確認します。

Connect 用の Confluent Cloud API の使用に関する詳細とサンプルについては、「Confluent Cloud API for Connect」セクションを参照してください。

ちなみに

コネクターを起動すると、デッドレターキューのトピックが自動的に生成されます。詳細については、「Confluent Cloud デッドレターキュー」を参照してください。

構成プロパティ

このコネクターでは、以下のコネクター構成プロパティを使用します。

データの取得元とするトピック(Which topics do you want to get data from?)

topics

特定のトピック名を指定するか、複数のトピック名をコンマ区切りにしたリストを指定します。

  • 型: list
  • 重要度: 高

入力メッセージ(Input messages)

input.data.format

Kafka 入力レコード値のフォーマットを設定します。指定可能なエントリは、AVRO、JSON_SR、PROTOBUF、JSON、または BYTES です。スキーマベースのメッセージフォーマット(AVRO、JSON_SR、PROTOBUF など)を使用する場合は、Confluent Cloud Schema Registry を構成しておく必要がある点に注意してください。

  • 型: string
  • 重要度: 高

データへの接続方法(How should we connect to your data?)

name

コネクターの名前を設定します。

  • 型: string
  • 指定可能な値: 最大 64 文字の文字列
  • 重要度: 高

Kafka クラスターの認証情報(Kafka Cluster credentials)

kafka.auth.mode

Kafka の認証モード。KAFKA_API_KEY または SERVICE_ACCOUNT を指定できます。デフォルトは KAFKA_API_KEY モードです。

  • 型: string
  • デフォルト: KAFKA_API_KEY
  • 指定可能な値: KAFKA_API_KEY、SERVICE_ACCOUNT
  • 重要度: 高
kafka.api.key
  • 型: password
  • 重要度: 高
kafka.service.account.id

Kafka クラスターとの通信用の API キーを生成するために使用されるサービスアカウント。

  • 型: string
  • 重要度: 高
kafka.api.secret
  • 型: password
  • 重要度: 高

関数への接続方法(How should we connect to your function?)

function.name

呼び出す Google Cloud 関数。

  • 型: string
  • 重要度: 高
project.id

関数をデプロイする Google Cloud Project の ID。

  • 型: string
  • 重要度: 高

GCP 認証情報(GCP credentials)

gcf.credentials.json

関数の呼び出しアクセス許可を持つ GCP サービスアカウントの JSON ファイル。

  • 型: password
  • 重要度: 高

クラウド関数の詳細(Cloud Function details)

max.batch.size

1 回の関数呼び出しにまとめることができる Kafka レコードの最大数。レコードのバッチ処理を無効にするには、この値を 1 に設定します。

  • 型: int
  • デフォルト: 1
  • 指定可能な値: [1、…]
  • 重要度: 低
max.pending.requests

Google Cloud Functions 宛てとして同時に存在できる保留中のリクエストの最大数。

  • 型: int
  • デフォルト: 1
  • 指定可能な値: [1,...,128]
  • 重要度: 低
request.timeout.ms

コネクターによる Google Cloud Functions へのリクエスト試行がタイムアウト(ソケットタイムアウト)するまでの最大時間(ミリ秒)。

  • 型: int
  • デフォルト: 300000(5 分)
  • 指定可能な値: [0、…]
  • 重要度: 低
retry.timeout.ms

スロットリング時などの失敗したリクエストがコネクターによって指数関数的にバックオフおよび再試行される合計時間(ミリ秒)。再試行される応答コードは HTTP 401 Unauthorized と HTTP 500 Internal Server Error です。値が -1 の場合は、永続的に再試行されます。

  • 型: int
  • デフォルト: 300000(5 分)
  • 指定可能な値: [-1、...]
  • 重要度: 低

エラーの処理方法(How should we handle errors?)

behavior.on.error

呼び出された GCP 関数がエラーを返した場合のコネクターの動作。指定可能なオプションは「log」と「fail」です。「log」の場合、エラーメッセージが記録されて処理が続行されます。「fail」の場合、エラーが発生すると接続を停止します。

  • 型: string
  • デフォルト: log
  • 重要度: 低

このコネクターのタスク数(Number of tasks for this connector)

tasks.max
  • 型: int
  • 指定可能な値: [1、…]
  • 重要度: 高

次のステップ

参考

フルマネージド型の Confluent Cloud コネクターが Confluent Cloud ksqlDB でどのように動作するかを示す例については、「Cloud ETL のデモ」を参照してください。この例では、Confluent CLI を使用して Confluent Cloud のリソースを管理する方法についても説明しています。

../_images/topology.ja.png