HTTP Sink Connector for Confluent Cloud¶
注釈
If you are installing the connector locally for Confluent Platform, see HTTP Sink Connector for Confluent Platform.
Kafka Connect HTTP Sink Connector for Confluent Cloud を使用すると、HTTP または HTTPS を介して Apache Kafka® を API と統合できます。
コネクターによって、Kafka のトピックのレコードが消費され、各レコードの値が文字列または JSON フォーマットに変換されます。その後、構成された http.api.url
にリクエスト本文で送信されます。API URL はレコードキーまたはトピック名も参照できます。ターゲットである API では、POST
リクエストまたは PUT
リクエストをサポートしている必要があります。
コネクターでは、バッチリクエストを API に送信する前に、Batch max size (batch.max.size
)に設定された数までのレコードがバッチにまとめられます。各レコードは、その文字列表現、または Request Body Format で JSON 表現(request.body.format=json
の場合)に変換された後、Batch separator (batch.separator
)で区切られます。構成プロパティの詳細については、「構成プロパティ」を参照してください。
HTTP Sink Connector は、基本認証、OAuth2、またはプロキシ認証サーバーを使用した、API との SSL 接続をサポートしています。
機能¶
HTTP Sink Connector は、次の機能をサポートしています。
At least once delivery: コネクターによって、Kafka のトピックからのレコードが少なくとも 1 回は配信されることが保証されます。
複数のタスクのサポート: このコネクターは、1 つまたは複数のタスクの実行をサポートしています。タスクが多いほどパフォーマンスが向上する可能性があります(複数のタスクを実行するとコンシューマーラグが減少します)。
トピックの自動作成: コネクターの起動時に、以下の 3 つのトピックが自動的に作成されます。
- 成功トピック
- エラートピック
- デッドレターキュー(DLQ)トピック
各トピック名にはサフィックスとして、コネクターの論理 ID が付けられます。以下の例では、コネクターのトピックが 3 つと、pageviews という名前の既存の Kafka トピックが 1 つ含まれています。
コネクターのトピック¶
トピックに送信されたレコードが正しいフォーマットではない場合、またはレコード内に重要なフィールドが存在しない場合は、エラートピックにエラーが記録され、コネクターは動作を継続します。
サポートされるデータフォーマット: このコネクターは、Avro、JSON スキーマ(JSON-SR)、Protobuf、JSON(スキーマレス)、および Bytes フォーマットをサポートします。スキーマレジストリ ベースのフォーマット(Avro、JSON スキーマ、Protobuf など)を使用するには、Schema Registry を有効にしておく必要があります。詳細については、「環境の制限」を参照してください。
正規表現の置換: コネクターでは、多数の正規表現パターンと置換文字列を設定して、送信先 API に送信する前にレコードに適用することができます。このためには、コネクターでは、構成オプション
regex.patterns
、regex.replacements
、およびregex.separator
が使用されます。バッチ処理のサポート: コネクターでは、効率化のため、HTTP API に送信されるリクエストをバッチにまとめます。バッチは、構成オプション
batch.prefix
、batch.suffix
、およびbatch.separator
でビルドできます。すべての正規表現のオプションは、バッチ処理時に適用され、個別のレコードに適用されてからバッチに送られます。
構成プロパティの値と詳細については、「構成プロパティ」を参照してください。Cloud コネクターの制限事項 も参照してください。
クイックスタート¶
このクイックスタートを使用して、Confluent Cloud HTTP Sink Connector の利用を開始することができます。このクイックスタートでは、コネクターを選択し、HTTP エンドポイントにイベントをストリーミングするようにコネクターを構成するための基本的な方法について示します。
- 前提条件
- アマゾンウェブサービス (AWS)、Microsoft Azure (Azure)、または Google Cloud Platform (GCP)上の Confluent Cloud クラスターへのアクセスを許可されていること。
- Confluent CLI がインストールされ、クラスター用に構成されていること。「Confluent CLI のインストール」を参照してください。
- スキーマレジストリ ベースのフォーマット(Avro、JSON_SR(JSON スキーマ)、Protobuf など)を使用するには、Schema Registry を有効にしておく必要があります。詳細については、「環境の制限」を参照してください。
- シンクコネクターを作成する前に、Confluent Cloud クラスター上にソース Kafka トピックが 1 つ以上存在している必要があります。
Confluent Cloud Console の使用¶
ステップ 1: Confluent Cloud クラスターを起動します。¶
インストール手順については、「Confluent Cloud を使用した Apache Kafka のクイックスタート」を参照してください。
ステップ 2: コネクターを追加します。¶
左のナビゲーションメニューの Data integration をクリックし、Connectors をクリックします。クラスター内に既にコネクターがある場合は、+ Add connector をクリックします。
Step 4: Enter the connector details.¶
注釈
- Ensure you have all your prerequisites completed.
- アスタリスク(* )は必須項目であることを示しています。
At the Add HTTP Sink Connector screen, complete the following:
If you've already populated your Kafka topics, select the topic(s) you want to connect from the Topics list.
To create a new topic, click +Add new topic.
- Select the way you want to provide Kafka Cluster credentials. You can
choose one of the following options:
- Global Access: Allows your connector to access everything you have access to. With global access, connector access will be linked to your account. This option is not recommended for production.
- Granular access: Limits the access for your connector. You will be able to manage connector access through a service account. This option is recommended for production.
- Use an existing API key: Allows you to enter an API key and secret part you have stored. You can enter an API key and secret (or generate these in the Cloud Console).
- Click Continue.
- HTTP URL を入力します。HTTP または HTTPS 接続 URL を使用します。たとえば、http://eshost1:9200 または https://eshost3:9200 のように入力します。接続 URL が HTTPS の場合は、すべての接続に HTTPS が使用されます。プロトコルなしの URL は HTTP とみなされます。
- Enter the following authentication details:
- Auth username: The username to be used with an endpoint requiring authentication.
- Auth password: The password to be used with an endpoint requiring authentication.
- OAuth2 token url: The URL to be used for fetching the OAuth2 token in the field. Client Credentials is the only supported grant type.
- OAuth2 client id: The client ID used when fetching the OAuth2 token.
- OAuth2 secret: The secret used when fetching the OAuth2 token.
- OAuth2 token property name: The name of the property containing the OAuth2 token returned by the HTTP proxy.
- OAuth2 auth mode mechanism: Specifies how to encode
client_id
andclient_secret
in the OAuth2 authorization request. - OAuth2 scope: The scope used when fetching OAuth2 token. If empty, this parameter is not set in the authorization request
- Key Password: The password of the private key in the key store file. This is optional for client.
- Key Store: The key store containing server certificate. Only required if using HTTPS.
- Keystore Password: The store password for the key store file.
This is optional for a client and is only needed if
https.ssl.keystore.location
is configured. - Trust store: The trust store containing server CA certificate. Only required if using HTTPS.
- Trust store password: The trust store password containing server CA certificate. Only required if using HTTPS.
- SSL Protocol: The protocol to use for SSL connections.
- Enable host verification: True if SSL host verification should be enabled.
- Endpoint Authentication type: Endpoint authentication type. You
can select either
NONE
,BASIC
, orAUTH
.
- Click Continue.
注釈
Configuration properties that are not shown in the Cloud Console use the default values. See 構成プロパティ for all property values and definitions.
Select the Input Kafka record value format (data coming from the Kafka topic): AVRO, PROTOBUF, JSON_SR, JSON, or BYTES. A valid schema must be available in Schema Registry to use a schema-based message format (for example, Avro, JSON Schema, or Protobuf). See 環境の制限 for additional information.
Show advanced configurations
HTTP Request Method: If the PagerDuty Sink connector fails to connect to the PagerDuty endpoint it automatically retries the connection using exponential backoff. This property controls how long the connector retries requests (in milliseconds). The default value is
10000
ms (10 seconds). Be sure to set this value to at least1000
ms (1 second).HTTP Headers: HTTP headers to be included in all requests. Individual headers should be separated by the Header Separator.
HTTP Headers Separator: Separator character used in headers.
Behavior for null valued records: How to handle records with a non-null key and a null value (that is–Kafka tombstone records). Valid options are
ignore
,delete
andfail
.Behavior on errors: Error handling behavior config for handling error responses from HTTP requests.
Report errors as: Dictates the content of records produced to the error topic. If set to
error_string
, the value would be a human readable string describing the failure. The value will include some or all of the following information if available: HTTP response code, reason phrase, submitted payload, URL, response content, exception and error message.Retry on HTTP codes: Comma-separated list of HTTP codes or range of codes to retry on.
Maximum Retries: The maximum number of times to retry on errors before failing the task.
Retry Backoff (milliseconds): The time in milliseconds to wait following an error before a retry attempt is made.
Connect timeout (milliseconds): The time in milliseconds to wait for a connection to be established.
Request timeout (milliseconds): The time in milliseconds to wait for a request response from the server.
Request Body Format: Used to produce request body in either JSON or String format.
Batch key pattern: Pattern used to build the key for a given batch.
Batch max size: The number of records accumulated in a batch before the HTTP API is invoked.
Batch prefix: Prefix added to record batches. This is applied once at the beginning of the batch of records.
Batch suffix: Suffix added to record batches. This is applied once at the end of the batch of records.
Batch separator: Separator for records in a batch.
Batch json as array: Whether or not to use an array to bundle JSON records. Only used when
request.body.format
is set to JSON.Regular expression patterns: Regular expression patterns used for replacements in the message sent to the HTTP service.
Regular expression replacements: Regex replacements to use with the patterns in
regex.patterns
.Regular expression separator: Separator character used in
regex.patterns
andregex.replacements
property.Transforms and Predicates: 詳細については、Single Message Transforms(SMT) のドキュメントを参照してください。
See 構成プロパティ for all property values and definitions.
Click Continue.
Based on the number of topic partitions you select, you will be provided with a recommended number of tasks.
- To change the number of recommended tasks, enter the number of tasks for the connector to use in the Tasks field.
- Click Continue.
ステップ 5: レコードを確認します。¶
レコードがエンドポイントに生成されていることを確認します。
Connect 用の Confluent Cloud API の使用に関する詳細とサンプルについては、「Confluent Cloud API for Connect」セクションを参照してください。
ちなみに
コネクターを起動すると、デッドレターキューのトピックが自動的に生成されます。詳細については、「Confluent Cloud デッドレターキュー」を参照してください。
Confluent CLI の使用¶
Confluent CLI でコネクターをセットアップして実行するには、次の手順を実行します。
注釈
- すべての 前提条件 を満たしていることを確認してください。
- コマンド例では Confluent CLI バージョン 2 を使用しています。詳細については、「Confluent CLI v2 への移行 <https://docs.confluent.io/confluent-cli/current/migrate.html#cli-migrate>`__」を参照してください。
ステップ 2: コネクターの必須の構成プロパティを表示します。¶
以下のコマンドを実行して、コネクターの必須プロパティを表示します。
confluent connect plugin describe <connector-catalog-name>
例:
confluent connect plugin describe HttpSink
出力例:
Following are the required configs:
connector.class: HttpSink
input.data.format
name
kafka.auth.mode
kafka.api.key
kafka.api.secret
http.api.url
tasks.max
topics
ステップ 3: コネクターの構成ファイルを作成します。¶
コネクター構成プロパティを含む JSON ファイルを作成します。以下の例は、コネクターの必須プロパティを示しています。
{
"connector.class": "HttpSink",
"input.data.format": "JSON",
"name": "HttpSinkConnector_0",
"kafka.auth.mode": "KAFKA_API_KEY",
"kafka.api.key": "<my-kafka-api-key>",
"kafka.api.secret": "<my-kafka-api-secret>",
"http.api.url": "http:://eshost1:9200/",
"request.method": "POST",
"tasks.max": "1",
"topics": "orders",
}
以下のプロパティ定義にご注意ください。
"connector.class"
: コネクターのプラグイン名を特定します。"input.data.format"
: Kafka 入力レコード値のフォーマット(Kafka トピックから送られるデータ)を設定します。指定可能なエントリは、AVRO、JSON_SR、PROTOBUF、JSON、または BYTES です。スキーマベースのメッセージフォーマット(たとえば、Avro、JSON_SR(JSON スキーマ)、および Protobuf)を使用するには、Confluent Cloud Schema Registry を構成しておく必要があります。"name"
: 新しいコネクターの名前を設定します。
"kafka.auth.mode"
: 使用するコネクターの認証モードを指定します。オプションはSERVICE_ACCOUNT
またはKAFKA_API_KEY
(デフォルト)です。API キーとシークレットを使用するには、構成プロパティkafka.api.key
とkafka.api.secret
を構成例(前述)のように指定します。サービスアカウント を使用するには、プロパティkafka.service.account.id=<service-account-resource-ID>
に リソース ID を指定します。使用できるサービスアカウントのリソース ID のリストを表示するには、次のコマンドを使用します。confluent iam service-account list
例:
confluent iam service-account list Id | Resource ID | Name | Description +---------+-------------+-------------------+------------------- 123456 | sa-l1r23m | sa-1 | Service account 1 789101 | sa-l4d56p | sa-2 | Service account 2
"http.api.url"
: HTTP または HTTPS 接続 URL を入力します。たとえば、http://eshost1:9200 または https://eshost3:9200 のように入力します。接続 URL が HTTPS の場合は、すべての接続に HTTPS が使用されます。プロトコルなしの URL は HTTP とみなされます。"request.method"
: HTTP API リクエストメソッド(PUT
またはPOST
)を入力します。"tasks.max"
: このコネクターで使用できる タスク の最大数を入力します。タスクが多いほどパフォーマンスが向上する可能性があります(複数のタスクを実行するとコンシューマーラグが減少します)。"topics"
: 特定のトピック名を指定するか、複数のトピック名をコンマ区切りにしたリストを指定します。
Single Message Transforms: CLI を使用する SMT の追加の詳細については、Single Message Transforms(SMT) のドキュメントを参照してください。
See 構成プロパティ for all property values and descriptions.
ステップ 3: プロパティファイルを読み込み、コネクターを作成します。¶
以下のコマンドを入力して、構成を読み込み、コネクターを起動します。
confluent connect create --config <file-name>.json
例:
confluent connect create --config http-sink-config.json
出力例:
Created connector HttpSinkConnector_0 lcc-do6vzd
ステップ 4: コネクターのステータスを確認します。¶
以下のコマンドを入力して、コネクターのステータスを確認します。
confluent connect list
出力例:
ID | Name | Status | Type | Trace
+------------+-------------------------------+---------+------+-------+
lcc-do6vzd | HttpSinkConnector_0 | RUNNING | sink | |
ステップ 5: レコードを確認します。¶
レコードがエンドポイントに取り込まれていることを確認します。
Connect 用の Confluent Cloud API の使用に関する詳細とサンプルについては、「Confluent Cloud API for Connect」セクションを参照してください。
ちなみに
コネクターを起動すると、デッドレターキューのトピックが自動的に生成されます。詳細については、「Confluent Cloud デッドレターキュー」を参照してください。
構成プロパティ¶
Use the following configuration properties with this connector.
Which topics do you want to get data from?¶
topics
Identifies the topic name or a comma-separated list of topic names.
- 型: list
- 重要度: 高
Input messages¶
input.data.format
Sets the input Kafka record value format. Valid entries are AVRO, JSON_SR, PROTOBUF, JSON or BYTES. Note that you need to have Confluent Cloud Schema Registry configured if using a schema-based message format like AVRO, JSON_SR, and PROTOBUF.
- 型: string
- 重要度: 高
How should we connect to your data?¶
name
Sets a name for your connector.
- 型: string
- Valid Values: A string at most 64 characters long
- 重要度: 高
Kafka Cluster credentials¶
kafka.auth.mode
Kafka Authentication mode. It can be one of KAFKA_API_KEY or SERVICE_ACCOUNT. It defaults to KAFKA_API_KEY mode.
- 型: string
- Default: KAFKA_API_KEY
- Valid Values: SERVICE_ACCOUNT, KAFKA_API_KEY
- 重要度: 高
kafka.api.key
- 型: password
- 重要度: 高
kafka.service.account.id
The Service Account that will be used to generate the API keys to communicate with Kafka Cluster.
- 型: string
- 重要度: 高
kafka.api.secret
- 型: password
- 重要度: 高
HTTP サーバーの詳細¶
http.api.url
- 型: string
- 重要度: 高
request.method
- 型: string
- Default: POST
- 重要度: 高
headers
HTTP headers to be included in all requests. Individual headers should be separated by the Header Separator
- 型: string
- 重要度: 高
header.separator
Separator character used in headers
- 型: string
- 重要度: 高
behavior.on.null.values
How to handle records with a non-null key and a null value (i.e. Kafka tombstone records). Valid options are
ignore
,delete
andfail
- 型: string
- Default: ignore
- 重要度: 低
HTTP server error handling¶
behavior.on.error
Error handling behavior config for handling error responses from HTTP requests
- 型: string
- Default: ignore
- 重要度: 中
report.errors.as
Dictates the content of records produced to the error topic. If set to
error_string
, the value would be a human readable string describing the failure. The value will include some or all of the following information if available: http response code, reason phrase, submitted payload, url, response content, exception and error message. If set tohttp_response
, the value would be the plain response content for the request which failed to write the record. In both modes, any information about the failure will also be included in the error record's headers- 型: string
- Default: error_string
- 重要度: 中
HTTP server batches¶
request.body.format
Used to produce request body in either JSON or String format
- 型: string
- デフォルト: string
- 重要度: 中
batch.key.pattern
Pattern used to build the key for a given batch. ${key} and ${topic} can be used to include message attributes here
- 型: string
- 重要度: 高
batch.max.size
The number of records accumulated in a batch before the HTTP API is invoked
- 型: int
- デフォルト: 1
- 重要度: 高
batch.prefix
Prefix added to record batches. This is applied once at the beginning of the batch of records
- 型: string
- 重要度: 高
batch.suffix
Suffix added to record batches. This is applied once at the end of the batch of records
- 型: string
- 重要度: 高
batch.separator
Separator for records in a batch
- 型: string
- 重要度: 高
batch.json.as.array
Whether or not to use an array to bundle json records. Only used when request.body.format is set to json. This can be disabled only when batch.max.size is set to 1.
- 型: boolean
- 重要度: 高
HTTP server authentication¶
auth.type
- 型: string
- デフォルト: NONE
- 重要度: 高
connection.user
The username to be used with an endpoint requiring authentication
- 型: string
- 重要度: 高
connection.password
The password to be used with an endpoint requiring authentication
- 型: password
- 重要度: 高
oauth2.token.url
OAuth2 トークンのフェッチで使用される URL。クライアント認証情報は、唯一サポートされている付与タイプです。
- 型: string
- 重要度: 高
oauth2.client.id
The client id used when fetching OAuth2 token
- 型: string
- 重要度: 高
oauth2.client.secret
The secret used when fetching OAuth2 token
- 型: password
- 重要度: 高
oauth2.token.property
The name of the property containing the OAuth2 token returned by the http proxy.
- 型: string
- デフォルト: access_token
- 重要度: 高
oauth2.client.auth.mode
OAuth2 認可リクエストで
client_id
とclient_secret
をエンコードする方法を指定します。「header」を設定した場合、認証情報は'Authorization: Basic <base-64 encoded client_id:client_secret>'
HTTP ヘッダーとしてエンコードされます。「url」と設定した場合、client_id
とclient_secret
は URL エンコードされたパラメーターとして送信されます。- 型: string
- デフォルト: header
- 重要度: 低
oauth2.client.scope
The scope used when fetching OAuth2 token. If empty, this parameter is not set in the authorization request
- 型: string
- デフォルト: any
- 重要度: 低
HTTP server retries¶
retry.on.status.codes
The HTTP error codes to retry on. Comma-separated list of codes or range of codes to retry on. Ranges are specified with start and optional end code. Range boundaries are inclusive. For instance, '400-' includes all codes greater than or equal to 400. '400-500' includes codes from 400 to 500, including 500. Multiple ranges and single codes can be specified together to achieve fine grained control over retry behavior. For example, '404,408,500-' will retry on 404 NOT FOUND, 408 REQUEST TIMEOUT, and all 5xx error codes
- 型: string
- デフォルト: 400-
- 重要度: 中
max.retries
The maximum number of times to retry on errors before failing the task
- 型: int
- デフォルト: 10
- 重要度: 中
retry.backoff.ms
The time in milliseconds to wait following an error before a retry attempt is made
- 型: int
- Default: 3000 (3 seconds)
- 指定可能な値: [100,…]
- 重要度: 中
http.connect.timeout.ms
The time in milliseconds to wait for a connection to be established
- 型: int
- Default: 30000 (30 seconds)
- 重要度: 中
http.request.timeout.ms
The time in milliseconds to wait for a request response from the server
- 型: int
- Default: 30000 (30 seconds)
- 重要度: 中
HTTP server regular expressions¶
regex.patterns
Regular expression patterns used for replacements in the message sent to the HTTP service. Multiple regular expression patterns can be specified, but must be separated by
regex.separator
- 型: string
- 重要度: 中
regex.replacements
Regex replacements to use with the patterns in
regex.patterns
. Multiple replacements can be specified, but must be separated byregex.separator
.${key}
and${topic}
can be used here.- 型: string
- 重要度: 中
regex.separator
regex.patterns
プロパティおよびregex.replacements
プロパティで使用される区切り文字。- 型: string
- 重要度: 中
HTTP server SSL¶
https.ssl.key.password
The password of the private key in the key store file. This is optional for client
- 型: password
- 重要度: 高
https.ssl.keystorefile
The key store containing server certificate. Only required if using https
- 型: password
- 重要度: 低
https.ssl.keystore.password
The store password for the key store file. This is optional for a client and is only needed if https.ssl.keystore.location is configured
- 型: password
- 重要度: 高
https.ssl.truststorefile
The trust store containing server CA certificate. Only required if using https
- 型: password
- 重要度: 高
https.ssl.truststore.password
The trust store password containing server CA certificate. Only required if using https
- 型: password
- 重要度: 高
https.ssl.protocol
The protocol to use for SSL connections
- 型: string
- Default: TLSv1.3
- 重要度: 中
https.host.verifier.enabled
True if SSL host verification should be enabled
- 型: boolean
- デフォルト: true
- 重要度: 中
Number of tasks for this connector¶
tasks.max
- 型: int
- 指定可能な値: [1,…]
- 重要度: 高
次のステップ¶
参考
フルマネージド型の Confluent Cloud コネクターが Confluent Cloud ksqlDB でどのように動作するかを示す例については、「Cloud ETL のデモ」を参照してください。この例では、Confluent CLI を使用して Confluent Cloud のリソースを管理する方法についても説明しています。