重要
このページの日本語コンテンツは古くなっている可能性があります。最新の英語版コンテンツをご覧になるには、こちらをクリックしてください。
マネージド型コネクターで使用する Single Message Transforms¶
Confluent Cloud マネージド型コネクターは、1 つ以上の Single Message Transforms(SMT)を使用するようコネクターを構成して、メッセージの値、キー、およびヘッダーに簡単で軽微な変更を加えることができます。これは、フィールドの挿入、情報のマスク、イベントのルーティング、その他の小規模なデータの調整 などに役立つ場合があります。SMT は、ソースコネクターでもシンクコネクターでも使用できます。
ソースコネクターで複数の SMT が使用される場合、Connect がコネクターで生成された各ソースレコードを最初の SMT に渡すと、変更が加えられます。この更新済みのソースレコードがチェーン内の次の SMT に渡されます。このようにして、チェーン内の残りの SMT も順に実行されます。最終的に更新されたソースレコードが バイナリ形式(バイト)に変換 され、Apache Kafka® に書き込まれます。
シンクコネクターで複数の SMT が使用される場合、Connect はまず Kafka からレコードを読み取り、バイトをシンクレコードに変換します。次に、Connect が、レコードを SMT に渡すと、変更が加えられます。更新済みのシンクレコードがチェーン内の次の SMT に渡されます。このようにして残りの SMT も実行されます。最終的に更新されたシンクレコードがシンクコネクターに渡されて、処理されます。

マネージド型コネクターで使用する Single Message Transforms¶
注釈
- SMT は非常に便利ですが、シンプルなデータ変換にのみ使用してください。より洗練された変換やデータ統合は、ksqlDB および Kafka ストリーム処理 を使用して処理してください。
- 詳細については、「SMT の制限」を参照してください。
Cloud Console で SMT を使用する方法¶
重要
多くのコネクターで、アップデートされた UI を利用できます。コネクターの UI の SMT 部分が以下の例のようになっている場合、コネクターではアップデートされた UI が使用されています。

SMT UI のアップデート¶
以下の UI の違いに注意してください。
- Transforms オプションは UI の詳細構成セクションにあります。SMT を追加するには、Show advanced configurations をクリックします。
- Add a single message transform ボタンが 1 つあります。UI は異なりますが、SMT を作成するための機能は、以下の手順で説明されているものから変更されていません。
- Predicate 機能は現在、アップデートされた UI では構成できません。CLI を使用してコネクター構成を作成する場合は、述語を作成できます。
以下の例の手順は、Confluent Cloud Console を使用した SMT の使用方法です。この手順では、Datagen Source Connector for Confluent Cloud を使用した例を示します。
注釈
以下の手順では、既に Cloud Console でコネクターの構成を開始しているものとします。
Cloud Console のコネクター構成の UI には、SMT をサポートする 2 つの初期フィールド、Transforms および Predicates があります。Transforms フィールドに、SMT のわかりやすい名前(エイリアス)を入力します。たとえば、カスタマー ID をマスクする(
MaskField$Value
)場合は、エイリアスとしてmask_userid
と入力できます。変換¶
Add Transform ボタンをクリックします。変換
org.apache.kafka.connect.transforms.MaskField$Value
を選択すると、Transforms ダイアログボックスが表示されます。Mask フィールドのオプション¶
- fields (必須): マスクするレコードフィールドの名前を入力します。この例では、フィールド
userid
がマスクされます。 - negate (省略可能): negate オプションは述語を反転します(使用される場合)。
true
を選択すると、"定義された述語の条件と一致しない" レコードにのみ述語が適用されます。使用しない場合は、false
がデフォルトとして使用されます。 - predicate (省略可能): 述語オプションを使用すると、SMT に述語の条件を使用する、または使用しないことができます。この SMT で述語の条件を使用する場合は、Predicates フィールドに、作成したエイリアスを入力します。
- replacement (省略可能): このフィールドでは、置換文字列を設定します。この例では、
userid
が置き換えられ、******
と表示されます。
注釈
SMT 構成プロパティは、選択した変換タイプによってさまざまです。変換の構成プロパティと説明については、SMT の説明 を参照してください。
- fields (必須): マスクするレコードフィールドの名前を入力します。この例では、フィールド
最初の変換の入力が終わったら、必要に応じて SMT を追加できます。
ちなみに
SMT は、Transforms フィールドにエイリアスが表示されている順に処理されます。
Predicates の使用はオプションです。述語を追加する場合は、このフィールドに、使用する述語のエイリアスを入力します。述語のエイリアスは、SMT のプロパティの構成時に使用されます。この例では、述語のエイリアス
tombstone_records
が、述語org.apache.kafka.connect.transforms.predicates.RecordIsTombstone
に使用されます。述語¶
述語を SMT に適用すると、述語に応じてコネクターが SMT を条件付きで適用します。述語では、コネクターが処理対象の各メッセージの評価に使用する述語の条件を指定します。レコードが評価される際に、コネクターは、まず述語の条件に対してレコードをチェックします。レコードが条件を満たしている場合、コネクターは、SMT をレコードに適用します。述語と一致しないレコードは、変更されないままで渡されます。negate オプションは述語を反転します。
negate
にtrue
を設定すると、"定義された述語の条件と一致しない" レコードにのみ述語が適用されます。変換と述語の追加が終わったら、Next をクリックします。コネクター構成が表示されます。
コネクターの構成¶
(省略可能) Data preview ボタン をクリックし、
"record"
セクションのデータ出力が想定どおりになっていることを確認します。プレビューの生成には数分かかることがあります。データのプレビューが想定どおりだった場合は、戻って、認証情報を追加し、コネクターを起動します。
コネクターが起動されると、トピックに変換済みのレコードが表示されます。

変換済みのレコード¶
Confluent CLI で SMT を使用する方法¶
CLI を使用して読み込む JSON 構成ファイルを作成するには、以下のドキュメントを参照してください。
- マネージド型コネクター のドキュメントには、各コネクターの構成プロパティおよび JSON ファイルの例が含まれています。
- SMT のドキュメント には、基本的なコネクター構成に追加する必要がある変換の構成プロパティが含まれています。
- 構成と変換の例については、「SMT の例」を参照してください。
失敗したレコード¶
ソースコネクター の場合、SMT の構成に誤りがあると、コネクターが失敗します。以下にトラブルシューティング情報を示します。
- マネージド型コネクターの データ出力のプレビュー を確認できます。ここでは、失敗したレコードの完全なスタックトレースが表示されます。出力プレビューのチェックで何を確認するかについては、「SMT およびデータプレビュー」を参照してください。
- 例外の根本原因を特定するには、コネクターイベントを表示 します。
シンクコネクター の場合、コネクターの実行が継続されます。失敗したレコードは Confluent Cloud デッドレターキュー に送信されます。DLQ には、失敗したレコードの完全なスタックトレースが表示されます。
注釈
詳細については、「変換エイリアスの検証」を参照してください。
SMT およびデータプレビュー¶
SMT の場合、データプレビュー では、Kafka レコードに適用される変換ごとにレコードが作成されます。複数の変換の場合、各レコードが、Kafka レコードに順次適用された変換の 1 つを示します。このため、n 個の変換を構成した場合、データプレビュー は、n + 1 個のレコードを生成します。変換をプレビューする場合は、この点に注意してください。
以下の例では、ソースコネクターに 2 つの SMT が構成されています。以下の例で、変換ステップは、total_step:n
中の current_step:n
で表されています。エントリ transformation_name
および transformation_type
は、ステップで適用される変換を示しています。
SMT の制限¶
次の制限事項に注意してください。
変換エイリアスの検証¶
いくつかのマネージド型コネクターでは、既に内部変換が設定されています。コネクターの内部構成テンプレートに存在するエイリアスと競合する変換エイリアスを追加した場合、Connect によって、以下の例のような検証エラーがスローされます。
Invalid value [internalxform, ..., internalxform] for configuration transforms: Duplicate alias provided.
上のエラーメッセージ内の internalxform
は、追加したエイリアスと競合する、構成テンプレート内の内部エイリアスです。変換では、以下のエイリアスは使用できません。
マネージド型コネクター | コネクタープラグイン | 内部エイリアス |
---|---|---|
Amazon S3 Sink | S3_SINK | requireTimestampTransform |
Google BigQuery Sink | BigQuerySink | requireMapTransform |
HTTP Sink | HttpSink | requireTimestampTransform |
Microsoft SQL Server CDC Source(Debezium) | SqlServerCDC | unwrap |
MySQL CDC Source(Debezium) | MySqlCDC | unwrap |
Postgres CDC Source(Debezium) | PostgresCDC | unwrap |
サポートされない変換¶
- 以下の SMT は現在、マネージド型コネクターではサポートされていません。
- RegexRouter: ソースコネクターの場合、TopicRegexRouter SMT を使用する方法があります。
- InsertHeader
- DropHeaders
- 一部の シンク コネクターでは、以下の変換をサポートしていません。
org.apache.kafka.connect.transforms.TimestampRouter
io.confluent.connect.transforms.MessageTimestampRouter
io.confluent.connect.transforms.ExtractTopic$Header
io.confluent.connect.transforms.ExtractTopic$Key
io.confluent.connect.transforms.ExtractTopic$Value
io.confluent.connect.cloud.transforms.TopicRegexRouter
- 一部の ソース コネクターでは、以下の変換をサポートしていません。
org.apache.kafka.connect.transforms.HoistField$Value
org.apache.kafka.connect.transforms.ValueToKey
SMT リスト¶
注釈
すべてのマネージド型コネクターで、リストされている SMT がすべてサポートされているわけではありません。「サポートされない変換」を参照してください。
変換 | 説明 |
---|---|
Cast | フィールドや、キーまたは値の全体を特定の型に変換(キャスト)します(整数のフィールドを狭範囲の整数のフィールドに変換するなど)。 |
Drop | レコードの 1 つのキーまたは値をドロップ(破棄)して null を設定します。 |
DropHeaders | 現在、マネージド型コネクターでは利用できません。 各レコードから 1 つ以上のヘッダーをドロップします。 |
ExtractField | スキーマが存在する場合は Struct から、スキーマのないデータの場合は Map から、指定されたフィールドを抽出します。null 値はすべてそのまま渡されます。 |
ExtractTopic | レコードトピックを、キーまたは値から派生された新しいトピックに置き換えます。 |
Filter(Apache Kafka) | すべてのレコードをドロップします。述語 と組み合わせて使用します。 |
Filter(Confluent) | 構成可能な filter.condition と一致するレコードを含めるか、ドロップします。 |
Flatten | ネストしたデータ構造をフラット化します。これにより、階層ごとにフィールド名が区切り文字で連結されて、それぞれのフィールド名が生成されます(区切り文字は変更することができます)。 |
GzipDecompress | Not currently available for managed connectors. Gzip-decompress the entire byteArray key or value input. |
HeaderFrom | Not currently available for managed connectors. Moves or copies fields in a record key or value into the record's header. |
HoistField | スキーマが存在する場合は Struct の、スキーマのないデータの場合は Map の、指定されたフィールド名を使用してデータをラップします。 |
InsertField | レコードメタデータの属性または構成された静的な値を使用してフィールドを挿入します。 |
InsertHeader | 現在、マネージド型コネクターでは利用できません。 レコードヘッダーとしてリテラル値を挿入します。 |
MaskField | 指定されたフィールドを、そのフィールドの型に対して有効な null 値でマスクします。 |
MessageTimeStampRouter | 元のトピックの値とレコードのタイムスタンプフィールドに応じて、レコードのトピックフィールドを更新します。 |
RegexRouter | 現在、マネージド型コネクターでは利用できません。 構成されている正規表現と置換文字列を使用して、レコードのトピックを更新します。 |
ReplaceField | フィールドにフィルターをかけるか、フィールドの名前を変更します。 |
SetSchemaMetadata | レコードのキーまたは値のスキーマに対して、スキーマ名とバージョンのいずれか、または両方を設定します。 |
TimestampConverter | Unix エポック、文字列、Connect の Date 型および Timestamp 型など、タイムスタンプのフォーマットを別のフォーマットに変換します。 |
TimestampRouter | 元のトピックの値とレコードのタイムスタンプに応じて、レコードのトピックフィールドを更新します。 |
TombstoneHandler | tombstone レコードを管理します。tombstone レコードとは、ValueSchema の有無に関係なく、値フィールド全体が null になっているレコードと定義されています。 |
TopicRegexRouter | マネージド型ソースコネクターのみで利用できます。 構成されている正規表現と置換文字列を使用して、レコードのトピックをアップデートします。 |
ValueToKey | レコードキーを、レコード値のフィールドのサブセットから形成された新しいキーに置き換えます。 |
SMT の例¶
The following examples show connector configuration and transformation examples for source and sink connectors. The examples show configurations for the Datagen Source and the Google Cloud Functions Sink connectors.
注釈
- 一部の SMT(
Cast
など)には、レコード値とレコードキーのいずれかに適用されるものがあります。Cast
を例として使用すると、コネクターは、org.apache.kafka.connect.transforms.Cast$Value
を Kafka レコード値に適用します。また、org.apache.kafka.connect.transforms.Cast$Key
を Kafka レコードキーに適用します。 - SMT の定義、構成プロパティの値、その他の例については、SMT のドキュメント を参照してください。
Cast¶
構成の例:
{
"connector.class": "DatagenSource",
"kafka.api.key": "${KEY}",
"kafka.api.secret": "${SECRET}",
"kafka.topic": "datagen-source-smt-cast",
"max.interval": "3000",
"name": "DatagenSourceSmtCast",
"output.data.format": "JSON",
"quickstart": "ORDERS",
"tasks.max": "1",
"transforms": "castValues",
"transforms.castValues.type": "org.apache.kafka.connect.transforms.Cast$Value",
"transforms.castValues.spec": "zipcode:float64, orderid:string, orderunits:int32"
}
{
"connector.class": "GoogleCloudFunctionsSink",
"function.name": "${GCP_FUNCTION_NAME}",
"gcf.credentials.json": "${ESCAPED_CREDS_JSON}",
"input.data.format": "JSON",
"kafka.api.key": "${KEY}",
"kafka.api.secret": "${SECRET}",
"name": "GcfSinkSmtCast",
"project.id": "${PROJECT_ID}",
"tasks.max": "1",
"topics": "datagen-source-orders-json",
"transforms": "castValues",
"transforms.castValues.type": "org.apache.kafka.connect.transforms.Cast$Value",
"transforms.castValues.spec": "zipcode:float64, orderid:string, orderunits:int32"
}
変換の例:
{
"ordertime": 1512446289869,
"orderid": 417794,
"itemid": "Item_430",
"orderunits": 5.085317150755766,
"address": {
"city": "City_19",
"state": "State_88",
"zipcode": 72688
}
}
{
"ordertime": 1512446289869,
"orderid": "417794",
"itemid": "Item_430",
"orderunits": 5,
"address": {
"city": "City_19",
"state": "State_88",
"zipcode": 72688.00
}
}
Drop¶
構成の例:
{
"connector.class": "DatagenSource",
"kafka.api.key": "${KEY}",
"kafka.api.secret": "${SECRET}",
"kafka.topic": "datagen-source-smt-drop",
"max.interval": "3000",
"name": "DatagenSourceSmtDrop",
"output.data.format": "JSON",
"quickstart": "ORDERS",
"tasks.max": "1",
"transforms": "dropValue",
"transforms.dropValue.type": "io.confluent.connect.transforms.Drop$Value"
}
{
"connector.class": "GoogleCloudFunctionsSink",
"function.name": "smt-function",
"gcf.credentials.json": "${ESCAPED_CREDS_JSON}",
"input.data.format": "JSON",
"kafka.api.key": "${KEY}",
"kafka.api.secret": "${SECRET}",
"name": "GcfSinkSmtDrop",
"project.id": "connect-205118",
"tasks.max": "1",
"topics": "datagen-source-orders-json",
"transforms": "dropValue",
"transforms.dropValue.type": "io.confluent.connect.transforms.Drop$Value"
}
変換の例:
{
"ordertime": 1512446289869,
"orderid": 417794,
"itemid": "Item_430",
"orderunits": 5.085317150755766,
"address": {
"city": "City_19",
"state": "State_88",
"zipcode": 72688
}
}
null
Extract Field¶
構成の例:
{
"connector.class": "DatagenSource",
"kafka.api.key": "${KEY}",
"kafka.api.secret": "${SECRET}",
"kafka.topic": "datagen-source-smt-extract-field",
"max.interval": "3000",
"name": "DatagenSourceSmtExtractField",
"output.data.format": "JSON",
"quickstart": "ORDERS",
"tasks.max": "1",
"transforms": "extractAddress",
"transforms.extractAddress.type": "org.apache.kafka.connect.transforms.ExtractField$Value",
"transforms.extractAddress.field": "address"
}
{
"connector.class": "GoogleCloudFunctionsSink",
"function.name": "smt-function",
"gcf.credentials.json": "${ESCAPED_CREDS_JSON}",
"input.data.format": "JSON",
"kafka.api.key": "${KEY}",
"kafka.api.secret": "${SECRET}",
"name": "GcfSinkSmtExtractField",
"project.id": "connect-205118",
"tasks.max": "1",
"topics": "datagen-source-orders-json",
"transforms": "extractAddress",
"transforms.extractAddress.type": "org.apache.kafka.connect.transforms.ExtractField$Value",
"transforms.extractAddress.field": "address"
}
変換の例:
{
"ordertime": 1512446289869,
"orderid": 417794,
"itemid": "Item_430",
"orderunits": 5.085317150755766,
"address": {
"city": "City_19",
"state": "State_88",
"zipcode": 72688
}
}
{
"city": "City_19",
"state": "State_88",
"zipcode": 72688
}
Extract Topic¶
構成の例:
{
"connector.class": "DatagenSource",
"kafka.api.key": "${KEY}",
"kafka.api.secret": "${SECRET}",
"kafka.topic": "datagen-source-smt-extract-topic",
"max.interval": "3000",
"name": "DatagenSourceSmtExtractTopic",
"output.data.format": "JSON",
"quickstart": "ORDERS",
"tasks.max": "1",
"transforms": "setTopic",
"transforms.setTopic.type": "io.confluent.connect.transforms.ExtractTopic$Value",
"transforms.setTopic.field": "itemid"
}
{
"connector.class": "GoogleCloudFunctionsSink",
"function.name": "smt-function",
"gcf.credentials.json": "${ESCAPED_CREDS_JSON}",
"input.data.format": "JSON",
"kafka.api.key": "${KEY}",
"kafka.api.secret": "${SECRET}",
"name": "GcfSinkSmtExtractTopic",
"project.id": "connect-205118",
"tasks.max": "1",
"topics": "datagen-source-orders-json",
"transforms": "setTopic",
"transforms.setTopic.type": "io.confluent.connect.transforms.ExtractTopic$Value",
"transforms.setTopic.field": "itemid"
}
変換の例:
{
"ordertime": 1512446289869,
"orderid": 417794,
"itemid": "Item_430",
"orderunits": 5.085317150755766,
"address": {
"city": "City_19",
"state": "State_88",
"zipcode": 72688
}
}
構成されているトピックの代わりにトピック Item_430
に移動します。
{
"ordertime": 1512446289869,
"orderid": 417794,
"itemid": "Item_430",
"orderunits": 5.085317150755766,
"address": {
"city": "City_19",
"state": "State_88",
"zipcode": 72688
}
}
Filter(Apache Kafka)¶
注釈
追加の Tombstone 述語を表示します。
構成の例:
{
"connector.class": "DatagenSource",
"kafka.api.key": "${KEY}",
"kafka.api.secret": "${SECRET}",
"kafka.topic": "datagen-source-smt-filter-ak",
"max.interval": "3000",
"name": "DatagenSourceSmtFilterAk",
"output.data.format": "JSON",
"quickstart": "ORDERS",
"tasks.max": "1",
"predicates": "isNull",
"predicates.isNull.type": "org.apache.kafka.connect.transforms.predicates.RecordIsTombstone",
"transforms": "dropValue, filterNull",
"transforms.dropValue.type": "io.confluent.connect.transforms.Drop$Value",
"transforms.filterNull.type": "org.apache.kafka.connect.transforms.Filter",
"transforms.filterNull.predicate": "isNull"
}
{
"connector.class": "GoogleCloudFunctionsSink",
"function.name": "smt-function",
"gcf.credentials.json": "${ESCAPED_CREDS_JSON}",
"input.data.format": "JSON",
"kafka.api.key": "${KEY}",
"kafka.api.secret": "${SECRET}",
"name": "GcfSinkSmtFilterAk",
"project.id": "connect-205118",
"tasks.max": "1",
"topics": "datagen-source-orders-json",
"predicates": "isNull",
"predicates.isNull.type": "org.apache.kafka.connect.transforms.predicates.RecordIsTombstone",
"transforms": "dropValue, filterNull",
"transforms.dropValue.type": "io.confluent.connect.transforms.Drop$Value",
"transforms.filterNull.type": "org.apache.kafka.connect.transforms.Filter",
"transforms.filterNull.predicate": "isNull"
}
変換の例:
{
"ordertime": 1512446289869,
"orderid": 417794,
"itemid": "Item_430",
"orderunits": 5.085317150755766,
"address": {
"city": "City_19",
"state": "State_88",
"zipcode": 72688
}
}
Record is dropped.
Filter(Confluent)¶
構成の例:
{
"connector.class": "DatagenSource",
"kafka.api.key": "${KEY}",
"kafka.api.secret": "${SECRET}",
"kafka.topic": "datagen-source-smt-filter-cp",
"max.interval": "3000",
"name": "DatagenSourceSmtFilterCp",
"output.data.format": "JSON",
"quickstart": "RATINGS",
"tasks.max": "1",
"transforms": "filterValue",
"transforms.filterValue.filter.condition": "$[?(@.channel == 'ios')]",
"transforms.filterValue.filter.type": "include",
"transforms.filterValue.type": "io.confluent.connect.transforms.Filter$Value"
}
{
"connector.class": "GoogleCloudFunctionsSink",
"function.name": "smt-function",
"gcf.credentials.json": "${ESCAPED_CREDS_JSON}",
"input.data.format": "JSON",
"kafka.api.key": "${KEY}",
"kafka.api.secret": "${SECRET}",
"name": "GcfSinkSmtFilterCp",
"project.id": "connect-205118",
"tasks.max": "1",
"topics": "datagen-source-orders-json",
"transforms": "filterValue",
"transforms.filterValue.filter.condition": "$[?(@.channel == 'ios')]",
"transforms.filterValue.filter.type": "include",
"transforms.filterValue.type": "io.confluent.connect.transforms.Filter$Value"
}
変換の例:
[
{
"rating_id": 140,
"user_id": 3,
"stars": 4,
"route_id": 7425,
"rating_time": 1669,
"channel": "ios",
"message": "thank you for the most friendly, helpful experience today at your new lounge"
},
{
"rating_id": 491,
"user_id": 7,
"stars": 4,
"route_id": 3302,
"rating_time": 5881,
"channel": "iOS-test",
"message": "more peanuts please"
}
]
[
{
"rating_id": 140,
"user_id": 3,
"stars": 4,
"route_id": 7425,
"rating_time": 1669,
"channel": "ios",
"message": "thank you for the most friendly, helpful experience today at your new lounge"
}
]
Flatten¶
構成の例:
{
"connector.class": "DatagenSource",
"kafka.api.key": "${KEY}",
"kafka.api.secret": "${SECRET}",
"kafka.topic": "datagen-source-smt-flatten",
"max.interval": "3000",
"name": "DatagenSourceSmtFlatten",
"output.data.format": "JSON",
"quickstart": "ORDERS",
"tasks.max": "1",
"transforms": "flatten",
"transforms.flatten.type": "org.apache.kafka.connect.transforms.Flatten$Value",
"transforms.flatten.delimiter": "_"
}
{
"connector.class": "GoogleCloudFunctionsSink",
"function.name": "smt-function",
"gcf.credentials.json": "${ESCAPED_CREDS_JSON}",
"input.data.format": "JSON",
"kafka.api.key": "${KEY}",
"kafka.api.secret": "${SECRET}",
"name": "GcfSinkSmtFlatten",
"project.id": "connect-205118",
"tasks.max": "1",
"topics": "datagen-source-orders-json",
"transforms": "flatten",
"transforms.flatten.type": "org.apache.kafka.connect.transforms.Flatten$Value",
"transforms.flatten.delimiter": "_"
}
変換の例:
{
"ordertime": 1491310657544,
"orderid": 9,
"itemid": "Item_826",
"orderunits": 4.188698361592631,
"address": {
"city": "City_73",
"state": "State_47",
"zipcode": 54450
}
}
{
"ordertime": 1491310657544,
"orderid": 9,
"itemid": "Item_826",
"orderunits": 4.188698361592631,
"address_city": "City_73",
"address_state": "State_47",
"address_zipcode": 54450
}
Hoist Field¶
構成の例:
{
"connector.class": "DatagenSource",
"kafka.api.key": "${KEY}",
"kafka.api.secret": "${SECRET}",
"kafka.topic": "datagen-source-smt-hoist-field",
"max.interval": "3000",
"name": "DatagenSourceSmtHoistField",
"output.data.format": "JSON",
"quickstart": "ORDERS",
"tasks.max": "1",
"transforms": "hoist",
"transforms.hoist.type": "org.apache.kafka.connect.transforms.HoistField$Value",
"transforms.hoist.field": "wrapperField"
}
{
"connector.class": "GoogleCloudFunctionsSink",
"function.name": "smt-function",
"gcf.credentials.json": "${ESCAPED_CREDS_JSON}",
"input.data.format": "JSON",
"kafka.api.key": "${KEY}",
"kafka.api.secret": "${SECRET}",
"name": "GcfSinkSmtHoistField",
"project.id": "connect-205118",
"tasks.max": "1",
"topics": "datagen-source-orders-json",
"transforms": "hoist",
"transforms.hoist.type": "org.apache.kafka.connect.transforms.HoistField$Value",
"transforms.hoist.field": "wrapperField"
}
変換の例:
{
"registertime": 1506959883575,
"userid": "User_2",
"regionid": "Region_1",
"gender": "MALE"
}
{
"wrapperField": {
"registertime": 1506959883575,
"userid": "User_2",
"regionid": "Region_1",
"gender": "MALE"
}
}
Insert Field¶
構成の例:
{
"connector.class": "DatagenSource",
"kafka.api.key": "${KEY}",
"kafka.api.secret": "${SECRET}",
"kafka.topic": "datagen-source-smt-insert-field",
"max.interval": "3000",
"name": "DatagenSourceSmtInsertField",
"output.data.format": "JSON",
"quickstart": "ORDERS",
"tasks.max": "1",
"transforms": "insert",
"transforms.insert.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.insert.partition.field": "PartitionField",
"transforms.insert.static.field": "InsertedStaticField",
"transforms.insert.static.value": "SomeValue",
"transforms.insert.timestamp.field": "TimestampField",
"transforms.insert.topic.field": "TopicField"
}
{
"connector.class": "GoogleCloudFunctionsSink",
"function.name": "smt-function",
"gcf.credentials.json": "${ESCAPED_CREDS_JSON}",
"input.data.format": "JSON",
"kafka.api.key": "${KEY}",
"kafka.api.secret": "${SECRET}",
"name": "GcfSinkSmtInsertField",
"project.id": "connect-205118",
"tasks.max": "1",
"topics": "datagen-source-orders-json",
"transforms": "insert",
"transforms.insert.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.insert.offset.field": "offsetField",
"transforms.insert.partition.field": "partitionField",
"transforms.insert.static.field": "staticField",
"transforms.insert.static.value": "staticValue",
"transforms.insert.timestamp.field": "timestampField",
"transforms.insert.topic.field": "topicField"
}
変換の例:
{
"registertime": 1506959883575,
"userid": "User_2",
"regionid": "Region_1",
"gender": "MALE"
}
{
"registertime": 1506959883575,
"userid": "User_2",
"regionid": "Region_1",
"gender": "MALE",
"TopicField": "insert_topic",
"PartitionField": null,
"TimestampField": null,
"InsertedStaticField": "SomeValue"
}
Mask Field¶
構成の例:
{
"connector.class": "DatagenSource",
"kafka.api.key": "${KEY}",
"kafka.api.secret": "${SECRET}",
"kafka.topic": "datagen-source-smt-mask-field",
"max.interval": "3000",
"name": "DatagenSourceSmtMaskField",
"output.data.format": "JSON",
"quickstart": "USERS",
"tasks.max": "1",
"transforms": "mask",
"transforms.mask.type": "org.apache.kafka.connect.transforms.MaskField$Value",
"transforms.mask.fields": "gender",
"transforms.mask.replacement": "REDACTED"
}
{
"connector.class": "GoogleCloudFunctionsSink",
"function.name": "smt-function",
"gcf.credentials.json": "${ESCAPED_CREDS_JSON}",
"input.data.format": "JSON",
"kafka.api.key": "${KEY}",
"kafka.api.secret": "${SECRET}",
"name": "GcfSinkSmtMaskField",
"project.id": "connect-205118",
"tasks.max": "1",
"topics": "datagen-source-users-json",
"transforms.mask.type": "org.apache.kafka.connect.transforms.MaskField$Value",
"transforms.mask.fields": "gender",
"transforms.mask.replacement": "REDACTED"
}
変換の例:
{
"registertime": 1499746213074,
"userid": "User_5",
"regionid": "Region_4",
"gender": "MALE"
}
{
"registertime": 1499746213074,
"userid": "User_5",
"regionid": "Region_4",
"gender": "REDACTED"
}
Message Timestamp Router¶
構成の例:
{
"connector.class": "GoogleCloudFunctionsSink",
"function.name": "smt-function",
"gcf.credentials.json": "${ESCAPED_CREDS_JSON}",
"input.data.format": "JSON",
"kafka.api.key": "${KEY}",
"kafka.api.secret": "${SECRET}",
"name": "GcfSinkSmtMessageTimestampRouter",
"project.id": "connect-205118",
"tasks.max": "1",
"topics": "datagen-source-orders-json",
"transforms": "tsConverter,tsRouter",
"transforms.tsConverter.field": "ordertime",
"transforms.tsConverter.format": "yyyy-MM-dd",
"transforms.tsConverter.target.type": "string",
"transforms.tsConverter.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.tsRouter.type": "io.confluent.connect.transforms.MessageTimestampRouter",
"transforms.tsRouter.message.timestamp.keys": "ordertime"
}
変換の例:
{
"key": "User_8",
"value": "{registertime=2021-09-03, gender=OTHER, regionid=Region_7, userid=User_8}",
"topic": "msg_timestamp_router_topic",
"partition": 0,
"offset": 812925,
"timestamp": 1628486671963
}
{
"key": "User_8",
"value": "{registertime=2021-09-03, gender=OTHER, regionid=Region_7, userid=User_8}",
"topic": "msg_timestamp_router_topic-2021.09.03",
"partition": 0,
"offset": 812925,
"timestamp": 1628486671963
}
Replace Field¶
構成の例:
{
"connector.class": "DatagenSource",
"kafka.api.key": "${KEY}",
"kafka.api.secret": "${SECRET}",
"kafka.topic": "datagen-source-smt-replace-field",
"max.interval": "3000",
"name": "DatagenSourceSmtReplaceField",
"output.data.format": "JSON",
"quickstart": "USERS",
"tasks.max": "1",
"transforms": "replacefield",
"transforms.replacefield.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.replacefield.exclude": "userid",
"transforms.replacefield.include": "regionid",
"transforms.replacefield.renames": "regionid:Region_Id"
}
{
"connector.class": "GoogleCloudFunctionsSink",
"function.name": "smt-function",
"gcf.credentials.json": "${ESCAPED_CREDS_JSON}",
"input.data.format": "JSON",
"kafka.api.key": "${KEY}",
"kafka.api.secret": "${SECRET}",
"name": "GcfSinkSmtReplaceField",
"project.id": "connect-205118",
"tasks.max": "1",
"topics": "datagen-source-orders-json",
"transforms": "replaceField",
"transforms.replaceField.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.replaceField.include": "regionid"
}
変換の例:
{
"topic": "TestTopic",
"key": {
"test": "value"
},
"value": {
"userid": 1234,
"regionid": "Region_3",
"ModifiedBy": "XYZ"
}
}
{
"topic": "TestTopic",
"key": {
"test": "value"
},
"value": {
"Region_id": "Region_3"
}
}
Set Schema Metadata¶
構成の例:
{
"connector.class": "DatagenSource",
"kafka.api.key": "${KEY}",
"kafka.api.secret": "${SECRET}",
"kafka.topic": "datagen-source-smt-set-schema-metadata",
"max.interval": "3000",
"name": "DatagenSourceSmtSetSchemaMetadata",
"output.data.format": "AVRO",
"quickstart": "ORDERS",
"tasks.max": "1",
"transforms": "setSchemaMetadata",
"transforms.setSchemaMetadata.type": "org.apache.kafka.connect.transforms.SetSchemaMetadata$Value",
"transforms.setSchemaMetadata.schema.name": "schema_name",
"transforms.setSchemaMetadata.schema.version": "12"
}
{
"connector.class": "GoogleCloudFunctionsSink",
"function.name": "smt-function",
"gcf.credentials.json": "${ESCAPED_CREDS_JSON}",
"input.data.format": "AVRO",
"kafka.api.key": "${KEY}",
"kafka.api.secret": "${SECRET}",
"name": "GcfSinkSmtSetSchemaMetadata",
"project.id": "connect-205118",
"tasks.max": "1",
"topics": "datagen-source-orders-json",
"transforms": "setSchemaMetadata",
"transforms.setSchemaMetadata.type": "org.apache.kafka.connect.transforms.SetSchemaMetadata$Value",
"transforms.setSchemaMetadata.schema.name": "schema_name",
"transforms.setSchemaMetadata.schema.version": "12"
}
変換の例:
{
"connect.name": "ksql.users",
"fields": [
{
"name": "registertime",
"type": "long"
},
{
"name": "userid",
"type": "string"
},
{
"name": "regionid",
"type": "string"
},
{
"name": "gender",
"type": "string"
}
],
"name": "users",
"namespace": "ksql",
"type": "record"
}
{
"connect.name": "schema_name",
"connect.version": 12,
"fields": [
{
"name": "registertime",
"type": "long"
},
{
"name": "userid",
"type": "string"
},
{
"name": "regionid",
"type": "string"
},
{
"name": "gender",
"type": "string"
}
],
"name": "schema_name",
"type": "record"
}
Timestamp Converter¶
構成の例:
{
"connector.class": "DatagenSource",
"kafka.api.key": "${KEY}",
"kafka.api.secret": "${SECRET}",
"kafka.topic": "datagen-source-smt-ts-converter",
"max.interval": "3000",
"name": "DatagenSourceSmtTimestampConverter",
"output.data.format": "JSON",
"quickstart": "ORDERS",
"tasks.max": "1",
"transforms": "tsConverter",
"transforms.tsConverter.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.tsConverter.field": "ordertime",
"transforms.tsConverter.format": "yyyy-MM-dd",
"transforms.tsConverter.target.type": "string"
}
{
"connector.class": "GoogleCloudFunctionsSink",
"function.name": "smt-function",
"gcf.credentials.json": "${ESCAPED_CREDENTIALS_JSON}",
"input.data.format": "JSON",
"kafka.api.key": "${KEY}",
"kafka.api.secret": "${SECRET}",
"name": "GcfSinkSmtTimestampConverter",
"project.id": "connect-205118",
"tasks.max": "1",
"topics": "datagen-source-orders-json",
"transforms": "tsConverter",
"transforms.tsConverter.field": "ordertime",
"transforms.tsConverter.format": "yyyy-MM-dd",
"transforms.tsConverter.target.type": "string",
"transforms.tsConverter.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value"
}
変換の例:
{
"topic": "TestTopic",
"key": {
"test": "value"
},
"value": {
"ordertime": 1628035200000,
"orderid": "ABC",
"itemid": "XYZ"
}
}
{
"topic": "TestTopic",
"key": {
"test": "value"
},
"value": {
"ordertime": "2021-08-04",
"orderid": "ABC",
"itemid": "XYZ"
}
}
Timestamp Router¶
構成の例:
{
"connector.class": "GoogleCloudFunctionsSink",
"function.name": "smt-function",
"gcf.credentials.json": "${ESCAPED_CREDS_JSON}",
"input.data.format": "JSON",
"kafka.api.key": "${KEY}",
"kafka.api.secret": "${SECRET}",
"name": "GcfSinkSmtTimestampRouter",
"project.id": "connect-205118",
"tasks.max": "1",
"topics": "datagen-source-users-json",
"transforms": "tsRouter",
"transforms.tsRouter.type": "org.apache.kafka.connect.transforms.TimestampRouter",
"transforms.tsRouter.timestamp.format": "YYYYMM",
"transforms.tsRouter.topic.format": "foo-${topic}-${timestamp}"
}
変換の例:
{
"key": "User_8",
"value": "{registertime=1491516816009, gender=FEMALE, regionid=Region_4, userid=User_8}",
"topic": "topic"
}
{
"key": "User_8",
"value": "{registertime=1491516816009, gender=FEMALE, regionid=Region_4, userid=User_8}",
"topic": "foo-topic-202108"
}
Tombstone Handler¶
構成の例:
{
"connector.class": "DatagenSource",
"kafka.api.key": "${KEY}",
"kafka.api.secret": "${SECRET}",
"kafka.topic": "datagen-source-smt-tombstone-handler",
"max.interval": "3000",
"name": "DatagenSourceSmtTombstoneHandler",
"output.data.format": "JSON",
"quickstart": "ORDERS",
"tasks.max": "1",
"transforms": "dropValue,tombstoneFail",
"transforms.dropValue.type": "io.confluent.connect.transforms.Drop$Value",
"transforms.tombstoneFail.type": "io.confluent.connect.transforms.TombstoneHandler",
"transforms.tombstoneFail.behavior": "fail"
}
{
"connector.class": "GoogleCloudFunctionsSink",
"function.name": "smt-function",
"gcf.credentials.json": "${ESCAPED_CREDS_JSON}",
"input.data.format": "JSON",
"kafka.api.key": "${KEY}",
"kafka.api.secret": "${SECRET}",
"name": "GcfSinkSmtTombstoneHandler",
"project.id": "connect-205118",
"tasks.max": "1",
"topics": "datagen-source-orders-json",
"transforms": "dropValue,tombstoneFail",
"transforms.dropValue.type": "io.confluent.connect.transforms.Drop$Value",
"transforms.tombstoneFail.type": "io.confluent.connect.transforms.TombstoneHandler",
"transforms.tombstoneFail.behavior": "fail"
}
変換の例:
{
"topic": "TestTopic",
"key": {
"test": "value"
},
"value": null
}
The connector should either ignore (log) or fail.
トピック正規表現ルーター¶
構成の例:
{
"connector.class": "DatagenSource",
"kafka.api.key": "${KEY}",
"kafka.api.secret": "${SECRET}",
"kafka.topic": "datagen-source-users-json",
"max.interval": "3000",
"name": "DatagenSourceSmtTopicRegexRouter",
"output.data.format": "JSON",
"quickstart": "USERS",
"tasks.max": "1",
"transforms": "addPrefixRegex",
"transforms.addPrefixRegex.type": "io.confluent.connect.cloud.transforms.TopicRegexRouter",
"transforms.addPrefixRegex.regex": ".*",
"transforms.addPrefixRegex.replacement": "prefix_$0"
}
{
"connector.class": "GoogleCloudFunctionsSink",
"function.name": "smt-function",
"gcf.credentials.json": "${ESCAPED_CREDS_JSON}",
"input.data.format": "JSON",
"kafka.api.key": "${KEY}",
"kafka.api.secret": "${SECRET}",
"name": "GcfSinkSmtTopicRegexRouter",
"project.id": "connect-205118",
"tasks.max": "1",
"topics": "datagen-source-users-json",
"transforms": "addPrefixRegex",
"transforms.addPrefixRegex.type": "io.confluent.connect.cloud.transforms.TopicRegexRouter",
"transforms.addPrefixRegex.regex": ".*",
"transforms.addPrefixRegex.replacement": "prefix_$0"
}
変換の例:
{
"key": "User_8",
"value": "{registertime=2021-09-03, gender=OTHER, regionid=Region_7, userid=User_8}",
"topic": "datagen-source-users-json",
"partition": 0,
"offset": 812925,
"timestamp": 1628486671963
}
{
"key": "User_8",
"value": "{registertime=2021-09-03, gender=OTHER, regionid=Region_7, userid=User_8}",
"topic": "prefix_datagen-source-users-json",
"partition": 0,
"offset": 812925,
"timestamp": 1628486671963
}
Value To Key¶
構成の例:
{
"connector.class": "DatagenSource",
"kafka.api.key": "${KEY}",
"kafka.api.secret": "${SECRET}",
"kafka.topic": "datagen-source-smt-value-to-key",
"max.interval": "3000",
"name": "DatagenSourceSmtValueToKey",
"output.data.format": "JSON",
"quickstart": "USERS",
"tasks.max": "1",
"transforms": "valueToKey",
"transforms.valueToKey.type": "org.apache.kafka.connect.transforms.ValueToKey",
"transforms.valueToKey.fields": "registertime, userid"
}
{
"connector.class": "GoogleCloudFunctionsSink",
"function.name": "smt-function",
"gcf.credentials.json": "${ESCAPED_CREDS_JSON}",
"input.data.format": "JSON",
"kafka.api.key": "${KEY}",
"kafka.api.secret": "${SECRET}",
"name": "GcfSinkSmtValueToKey",
"project.id": "connect-205118",
"tasks.max": "1",
"topics": "datagen-source-users-json",
"transforms": "valueToKey",
"transforms.valueToKey.type": "org.apache.kafka.connect.transforms.ValueToKey",
"transforms.valueToKey.fields": "registertime, userid"
}
変換の例:
{
"topic": "TestTopic",
"key": {
"test": "value"
},
"value": {
"userid": 1234,
"registertime": "ABC",
"ModifiedBy": "XYZ"
}
}
{
"topic": "TestTopic",
"key": {
"userid": 1234,
"registertime": "ABC",
},
"value": {
"userid": 1234,
"registertime": "ABC",
"ModifiedBy": "XYZ"
}
}