重要

このページの日本語コンテンツは古くなっている可能性があります。最新の英語版コンテンツをご覧になるには、こちらをクリックしてください。

HTTP Sink Connector for Confluent Cloud

注釈

Confluent Platform 用にコネクターをローカルにインストールする場合は、「HTTP Sink Connector for Confluent Platform」を参照してください。

Kafka Connect HTTP Sink Connector for Confluent Cloud を使用すると、HTTP または HTTPS を介して Apache Kafka® を API と統合できます。

コネクターによって、Kafka のトピックのレコードが消費され、各レコードの値が文字列または JSON フォーマットに変換されます。その後、構成された http.api.url にリクエスト本文で送信されます。API URL はレコードキーまたはトピック名も参照できます。ターゲットである API では、POST リクエストまたは PUT リクエストをサポートしている必要があります。

コネクターでは、バッチリクエストを API に送信する前に、Batch max sizebatch.max.size)に設定された数までのレコードがバッチにまとめられます。各レコードは、その文字列表現、または Request Body Format で JSON 表現(request.body.format=json の場合)に変換された後、Batch separatorbatch.separator)で区切られます。構成プロパティの詳細については、「構成プロパティ」を参照してください。

HTTP Sink Connector は、基本認証、OAuth2、またはプロキシ認証サーバーを使用した、API との SSL 接続をサポートしています。

機能

HTTP Sink Connector は、次の機能をサポートしています。

  • At least once delivery: コネクターによって、Kafka のトピックからのレコードが少なくとも 1 回は配信されることが保証されます。

  • 複数のタスクのサポート: このコネクターは、1 つまたは複数のタスクの実行をサポートしています。タスクが多いほどパフォーマンスが向上する可能性があります(複数のタスクを実行するとコンシューマーラグが減少します)。

  • トピックの自動作成: コネクターの起動時に、以下の 3 つのトピックが自動的に作成されます。

    各トピック名にはサフィックスとして、コネクターの論理 ID が付けられます。以下の例では、コネクターのトピックが 3 つと、pageviews という名前の既存の Kafka トピックが 1 つ含まれています。

    HTTP Sink Connector のトピック

    コネクターのトピック

    トピックに送信されたレコードが正しいフォーマットではない場合、またはレコード内に重要なフィールドが存在しない場合は、エラートピックにエラーが記録され、コネクターは動作を継続します。

  • サポートされるデータフォーマット: このコネクターは、Avro、JSON スキーマ(JSON-SR)、Protobuf、JSON(スキーマレス)、および Bytes フォーマットをサポートします。スキーマレジストリ ベースのフォーマット(Avro、JSON スキーマ、Protobuf など)を使用するには、Schema Registry を有効にしておく必要があります。詳細については、「スキーマレジストリ Enabled Environments」を参照してください。

  • 正規表現の置換: コネクターでは、多数の正規表現パターンと置換文字列を設定して、送信先 API に送信する前にレコードに適用することができます。このためには、コネクターでは、構成オプション regex.patternsregex.replacements、および regex.separator が使用されます。

  • バッチ処理のサポート: コネクターでは、効率化のため、HTTP API に送信されるリクエストをバッチにまとめます。バッチは、構成オプション batch.prefixbatch.suffix、および batch.separator でビルドできます。すべての正規表現のオプションは、バッチ処理時に適用され、個別のレコードに適用されてからバッチに送られます。

Connect 用の Confluent Cloud API の使用に関する詳細とサンプルについては、「Confluent Cloud API for Connect」セクションを参照してください。

制限

以下の情報を確認してください。

クイックスタート

このクイックスタートを使用して、Confluent Cloud HTTP Sink Connector の利用を開始することができます。このクイックスタートでは、コネクターを選択し、HTTP エンドポイントにイベントをストリーミングするようにコネクターを構成するための基本的な方法について示します。

前提条件
  • アマゾンウェブサービス (AWS)、Microsoft Azure (Azure)、または Google Cloud Platform (GCP)上の Confluent Cloud クラスターへのアクセスを許可されていること。
  • Confluent CLI がインストールされ、クラスター用に構成されていること。「Confluent CLI のインストール」を参照してください。
  • スキーマレジストリ ベースのフォーマット(Avro、JSON_SR(JSON スキーマ)、Protobuf など)を使用するには、Schema Registry を有効にしておく必要があります。詳細については、「スキーマレジストリ Enabled Environments」を参照してください。
  • シンクコネクターを作成する前に、Confluent Cloud クラスター上にソース Kafka トピックが 1 つ以上存在している必要があります。

Confluent Cloud Console の使用

ステップ 1: Confluent Cloud クラスターを起動します。

インストール手順については、「Quick Start for Confluent Cloud」を参照してください。

ステップ 2: コネクターを追加します。

左のナビゲーションメニューの Data integration をクリックし、Connectors をクリックします。クラスター内に既にコネクターがある場合は、+ Add connector をクリックします。

ステップ 3: コネクターを選択します。

Click the HTTP Sink connector card.

HTTP Sink Connector Card

ステップ 4: コネクターの詳細情報を入力します。

注釈

  • すべての 前提条件 を満たしていることを確認してください。
  • アスタリスク(* )は必須項目であることを示しています。

Add HTTP Sink Connector 画面で、以下を実行します。

既に Kafka トピックを用意している場合は、Topics リストから接続するトピックを選択します。

新しいトピックを作成するには、+Add new topic をクリックします。

ステップ 5: レコードを確認します。

レコードがエンドポイントに生成されていることを確認します。

Connect 用の Confluent Cloud API の使用に関する詳細とサンプルについては、「Confluent Cloud API for Connect」セクションを参照してください。

ちなみに

コネクターを起動すると、デッドレターキューのトピックが自動的に生成されます。詳細については、「Confluent Cloud デッドレターキュー」を参照してください。

Confluent CLI の使用

Confluent CLI でコネクターをセットアップして実行するには、次の手順を実行します。

注釈

ステップ 1: 使用可能なコネクターをリスト表示します。

以下のコマンドを入力して、使用可能なコネクターをリスト表示します。

confluent connect plugin list

ステップ 2: コネクターの必須の構成プロパティを表示します。

以下のコマンドを実行して、コネクターの必須プロパティを表示します。

confluent connect plugin describe <connector-catalog-name>

例:

confluent connect plugin describe HttpSink

出力例:

Following are the required configs:
connector.class: HttpSink
input.data.format
name
kafka.auth.mode
kafka.api.key
kafka.api.secret
http.api.url
tasks.max
topics

ステップ 3: コネクターの構成ファイルを作成します。

コネクター構成プロパティを含む JSON ファイルを作成します。以下の例は、コネクターの必須プロパティを示しています。

{
  "connector.class": "HttpSink",
  "input.data.format": "JSON",
  "name": "HttpSinkConnector_0",
  "kafka.auth.mode": "KAFKA_API_KEY",
  "kafka.api.key": "<my-kafka-api-key>",
  "kafka.api.secret": "<my-kafka-api-secret>",
  "http.api.url": "http:://eshost1:9200/",
  "request.method": "POST",
  "tasks.max": "1",
  "topics": "orders",
}

以下のプロパティ定義にご注意ください。

  • "connector.class": コネクターのプラグイン名を特定します。

  • "input.data.format": Kafka 入力レコード値のフォーマット(Kafka トピックから送られるデータ)を設定します。指定可能なエントリは、AVROJSON_SRPROTOBUFJSON、または BYTES です。スキーマベースのメッセージフォーマット(たとえば、Avro、JSON_SR(JSON スキーマ)、および Protobuf)を使用するには、Confluent Cloud Schema Registry を構成しておく必要があります。

    ちなみに

    文字列データを消費するためのスキーマレス JSON を選択します。

  • "name": 新しいコネクターの名前を設定します。

  • "kafka.auth.mode": 使用するコネクターの認証モードを指定します。オプションは SERVICE_ACCOUNT または KAFKA_API_KEY (デフォルト)です。API キーとシークレットを使用するには、構成プロパティ kafka.api.keykafka.api.secret を構成例(前述)のように指定します。サービスアカウント を使用するには、プロパティ kafka.service.account.id=<service-account-resource-ID>リソース ID を指定します。使用できるサービスアカウントのリソース ID のリストを表示するには、次のコマンドを使用します。

    confluent iam service-account list
    

    例:

    confluent iam service-account list
    
       Id     | Resource ID |       Name        |    Description
    +---------+-------------+-------------------+-------------------
       123456 | sa-l1r23m   | sa-1              | Service account 1
       789101 | sa-l4d56p   | sa-2              | Service account 2
    
  • "http.api.url": HTTP または HTTPS 接続 URL を入力します。たとえば、http://eshost1:9200 または https://eshost3:9200 のように入力します。接続 URL が HTTPS の場合は、すべての接続に HTTPS が使用されます。プロトコルなしの URL は HTTP とみなされます。

  • "request.method": HTTP API リクエストメソッド(PUT または POST)を入力します。

  • "tasks.max": このコネクターで使用できる タスク の最大数を入力します。タスクが多いほどパフォーマンスが向上する可能性があります(複数のタスクを実行するとコンシューマーラグが減少します)。

  • "topics": 特定のトピック名を指定するか、複数のトピック名をコンマ区切りにしたリストを指定します。

Single Message Transforms: CLI を使用する SMT の追加の詳細については、Single Message Transforms(SMT) のドキュメントを参照してください。

すべてのプロパティの値と説明については、「構成プロパティ」を参照してください。

ステップ 3: プロパティファイルを読み込み、コネクターを作成します。

以下のコマンドを入力して、構成を読み込み、コネクターを起動します。

confluent connect create --config <file-name>.json

例:

confluent connect create --config http-sink-config.json

出力例:

Created connector HttpSinkConnector_0 lcc-do6vzd

ステップ 4: コネクターのステータスを確認します。

以下のコマンドを入力して、コネクターのステータスを確認します。

confluent connect list

出力例:

ID           |             Name              | Status  | Type | Trace
+------------+-------------------------------+---------+------+-------+
lcc-do6vzd   | HttpSinkConnector_0           | RUNNING | sink |       |

ステップ 5: レコードを確認します。

レコードがエンドポイントに取り込まれていることを確認します。

Connect 用の Confluent Cloud API の使用に関する詳細とサンプルについては、「Confluent Cloud API for Connect」セクションを参照してください。

ちなみに

コネクターを起動すると、デッドレターキューのトピックが自動的に生成されます。詳細については、「Confluent Cloud デッドレターキュー」を参照してください。

構成プロパティ

このコネクターでは、以下のコネクター構成プロパティを使用します。

データの取得元とするトピック(Which topics do you want to get data from?)

topics

特定のトピック名を指定するか、複数のトピック名をコンマ区切りにしたリストを指定します。

  • 型: list
  • 重要度: 高

入力メッセージ(Input messages)

input.data.format

Kafka 入力レコード値のフォーマットを設定します。指定可能なエントリは、AVRO、JSON_SR、PROTOBUF、JSON、または BYTES です。スキーマベースのメッセージフォーマット(AVRO、JSON_SR、PROTOBUF など)を使用する場合は、Confluent Cloud Schema Registry を構成しておく必要がある点に注意してください。

  • 型: string
  • 重要度: 高

データへの接続方法(How should we connect to your data?)

name

コネクターの名前を設定します。

  • 型: string
  • 指定可能な値: 最大 64 文字の文字列
  • 重要度: 高

Kafka クラスターの認証情報(Kafka Cluster credentials)

kafka.auth.mode

Kafka の認証モード。KAFKA_API_KEY または SERVICE_ACCOUNT を指定できます。デフォルトは KAFKA_API_KEY モードです。

  • 型: string
  • デフォルト: KAFKA_API_KEY
  • 指定可能な値: KAFKA_API_KEY、SERVICE_ACCOUNT
  • 重要度: 高
kafka.api.key
  • 型: password
  • 重要度: 高
kafka.service.account.id

Kafka クラスターとの通信用の API キーを生成するために使用されるサービスアカウント。

  • 型: string
  • 重要度: 高
kafka.api.secret
  • 型: password
  • 重要度: 高

HTTP サーバーの詳細

http.api.url
  • 型: string
  • 重要度: 高
request.method
  • 型: string
  • デフォルト: POST
  • 重要度: 高
headers

すべてのリクエストに含まれる HTTP ヘッダー。個々のヘッダーは、ヘッダー区切り文字で区切る必要があります

  • 型: string
  • 重要度: 高
header.separator

ヘッダーで使用される区切り文字。

  • 型: string
  • 重要度: 高
behavior.on.null.values

null 以外のキーと null 値を含むレコード(Kafka tombstone レコードなど)を処理する方法。使用可能なオプションは、ignoredeletefail です

  • 型: string
  • デフォルト: ignore
  • 重要度: 低

HTTP サーバーのエラー処理(HTTP server error handling)

behavior.on.error

HTTP リクエストのエラー応答を処理する際のエラー処理の動作の構成。

  • 型: string
  • デフォルト: ignore
  • 重要度: 中
report.errors.as

エラートピックに対して生成されるレコードのコンテンツを指定します。error_string に設定すると、その値は失敗を説明する可読形式の文字列になります。値には、HTTP 応答コード、理由のフレーズ、送信されたペイロード、URL、応答コンテンツ、例外、エラーメッセージの情報の一部または全部が含まれます(利用可能な場合)。http_response に設定すると、その値は、レコードの書き込みに失敗したリクエストに関するプレーンな応答コンテンツとなります。どちらのモードでも、失敗に関する情報はエラーレコードのヘッダーにも含まれます。

  • 型: string
  • デフォルト: error_string
  • 重要度: 中

HTTP サーバーのバッチ処理(HTTP server batches)

request.body.format

JSON フォーマットまたは文字列フォーマットのどちらでリクエスト本文を生成するかの指定に使用します。

  • 型: string
  • デフォルト: string
  • 重要度: 中
batch.key.pattern

特定のバッチのキーのビルドに使用されるパターン。${key} と ${topic} を使用して、ここにメッセージの属性を含めることができます。

  • 型: string
  • 重要度: 高
batch.max.size

HTTP API が呼び出されるまでにバッチに蓄積される最大レコード数。

  • 型: int
  • デフォルト: 1
  • 重要度: 高
batch.prefix

レコードバッチに追加されるプレフィックス。これは、レコードのバッチの最初に 1 回のみ適用されます。

  • 型: string
  • 重要度: 高
batch.suffix

レコードバッチに追加されるサフィックス。これは、レコードのバッチの最後に 1 回のみ適用されます。

  • 型: string
  • 重要度: 高
batch.separator

バッチ内のレコードの区切り文字。

  • 型: string
  • 重要度: 高
batch.json.as.array

配列を使用して json レコードをバンドルするかどうかを指定します。request.body.format が json に設定されている場合のみ使用します。これは batch.max.size が 1 に設定されている場合のみ、無効にできます。

  • 型: boolean
  • 重要度: 高

HTTP サーバー認証(HTTP server authentication)

auth.type
  • 型: string
  • デフォルト: NONE
  • 重要度: 高
connection.user

認証を要求するエンドポイントで使用されるユーザー名。

  • 型: string
  • 重要度: 高
connection.password

認証を要求するエンドポイントで使用されるパスワード。

  • 型: password
  • 重要度: 高
oauth2.token.url

OAuth2 トークンのフェッチで使用される URL。クライアント認証情報は、唯一サポートされている付与タイプです。

  • 型: string
  • 重要度: 高
oauth2.client.id

OAuth2 トークンをフェッチする際に使用されるクライアント ID

  • 型: string
  • 重要度: 高
oauth2.client.secret

OAuth2 トークンをフェッチする際に使用されるシークレット

  • 型: password
  • 重要度: 高
oauth2.token.property

HTTP プロキシによって返される OAuth2 トークンを含むプロパティの名前。

  • 型: string
  • デフォルト: access_token
  • 重要度: 高
oauth2.client.auth.mode

OAuth2 認可リクエストで client_idclient_secret をエンコードする方法を指定します。「header」を設定した場合、認証情報は 'Authorization: Basic <base-64 encoded client_id:client_secret>' HTTP ヘッダーとしてエンコードされます。「url」と設定した場合、client_idclient_secret は URL エンコードされたパラメーターとして送信されます。

  • 型: string
  • デフォルト: header
  • 重要度: 低
oauth2.client.scope

OAuth2 トークンのフェッチで使用されるスコープ。空白の場合、認可リクエストにこのパラメーターは設定されません

  • 型: string
  • デフォルト: any
  • 重要度: 低
oauth2.client.headers

HTTP headers to be included in the OAuth2 client endpoint. Individual headers should be separated by OAuth2 Client Headers Separator

  • 型: string
  • 重要度: 低
oauth2.client.header.separator

Separator character used in OAuth2 Client Headers

  • 型: string
  • 重要度: 低

HTTP サーバーの再試行(HTTP server retries)

retry.on.status.codes

再試行対象の HTTP エラーコード。再試行対象のコードまたはコードの範囲のコンマ区切りリスト。範囲の最初と最後(省略可)のコードで指定します。範囲の最初と最後のコードも範囲に含まれます。たとえば、「400-」は 400 以上のすべてのコードを含みます。「400-500」は、400 ~ 500(500 を含む)のコードを含みます。再試行動作をきめ細かく制御するには、範囲と個別のコードを複数組み合わせて指定できます。たとえば、「404,408,500-」では、404 NOT FOUND、408 REQUEST TIMEOUT、および 5xx のすべてのエラーコードの場合に再試行されます。

  • 型: string
  • デフォルト: 400-
  • 重要度: 中
max.retries

エラー時に再試行する最大回数。これを超えるとタスクは失敗します。

  • 型: int
  • デフォルト: 10
  • 重要度: 中
retry.backoff.ms

エラーの後、再試行するまでの時間(ミリ秒)。

  • 型: int
  • デフォルト: 3000(3 秒)
  • 指定可能な値: [100,…]
  • 重要度: 中
http.connect.timeout.ms

接続の確立を待機する最長時間(ミリ秒)。

  • 型: int
  • デフォルト: 30000(30 秒)
  • 重要度: 中
http.request.timeout.ms

サーバーからのリクエストの応答を待つ時間(ミリ秒)。

  • 型: int
  • デフォルト: 30000(30 秒)
  • 重要度: 中

HTTP サーバーの正規表現(HTTP server regular expressions)

regex.patterns

HTTP サービスに送信されたメッセージで置換に使用される正規表現パターン。複数の正規表現パターンを指定できますが、regex.separator で区切る必要があります。

  • 型: string
  • 重要度: 中
regex.replacements

regex.patterns のパターンで使用する正規表現の置換。複数の置換を指定できますが、regex.separator で区切る必要があります。ここでは ${key}${topic} を使用できます。

  • 型: string
  • 重要度: 中
regex.separator

regex.patterns プロパティおよび regex.replacements プロパティで使用される区切り文字。

  • 型: string
  • 重要度: 中

HTTP サーバーの SSL(HTTP server SSL)

https.ssl.key.password

キーストアファイル内のプライベートキーのパスワード。クライアントでは省略できます。

  • 型: password
  • 重要度: 高
https.ssl.keystorefile

サーバー証明書が含まれるキーストア。HTTPS を使用する場合にのみ必要です。

  • 型: password
  • 重要度: 低
https.ssl.keystore.password

キーストアファイルのストアパスワード。クライアントでは省略可能です。https.ssl.keystore.location を構成した場合にのみ必要です。

  • 型: password
  • 重要度: 高
https.ssl.truststorefile

サーバー CA 証明書情報が含まれるトラストストア。HTTPS を使用する場合にのみ必要です。

  • 型: password
  • 重要度: 高
https.ssl.truststore.password

サーバー CA 証明書情報が含まれるトラストストアのパスワード。HTTPS を使用する場合にのみ必要です。

  • 型: password
  • 重要度: 高
https.ssl.protocol

SSL 接続に使用するプロトコル。

  • 型: string
  • デフォルト: TLSv1.3
  • 重要度: 中
https.host.verifier.enabled

SSL ホストの検証を有効にする必要がある場合は true にします。

  • 型: boolean
  • デフォルト: true
  • 重要度: 中

このコネクターのタスク数(Number of tasks for this connector)

tasks.max
  • 型: int
  • 指定可能な値: [1,…]
  • 重要度: 高

次のステップ

参考

フルマネージド型の Confluent Cloud コネクターが Confluent Cloud ksqlDB でどのように動作するかを示す例については、「Cloud ETL のデモ」を参照してください。この例では、Confluent CLI を使用して Confluent Cloud のリソースを管理する方法についても説明しています。

../_images/topology.ja.png