重要

このページの日本語コンテンツは古くなっている可能性があります。最新の英語版コンテンツをご覧になるには、こちらをクリックしてください。

マネージド型コネクターで使用する Single Message Transforms

Confluent Cloud マネージド型コネクターは、1 つ以上の Single Message Transforms(SMT)を使用するようコネクターを構成して、メッセージの値、キー、およびヘッダーに簡単で軽微な変更を加えることができます。これは、フィールドの挿入、情報のマスク、イベントのルーティング、その他の小規模なデータの調整 などに役立つ場合があります。SMT は、ソースコネクターでもシンクコネクターでも使用できます。

ソースコネクターで複数の SMT が使用される場合、Connect がコネクターで生成された各ソースレコードを最初の SMT に渡すと、変更が加えられます。この更新済みのソースレコードがチェーン内の次の SMT に渡されます。このようにして、チェーン内の残りの SMT も順に実行されます。最終的に更新されたソースレコードが バイナリ形式(バイト)に変換 され、Apache Kafka® に書き込まれます。

シンクコネクターで複数の SMT が使用される場合、Connect はまず Kafka からレコードを読み取り、バイトをシンクレコードに変換します。次に、Connect が、レコードを SMT に渡すと、変更が加えられます。更新済みのシンクレコードがチェーン内の次の SMT に渡されます。このようにして残りの SMT も実行されます。最終的に更新されたシンクレコードがシンクコネクターに渡されて、処理されます。

Single Message Transforms

マネージド型コネクターで使用する Single Message Transforms

注釈

  • SMT は非常に便利ですが、シンプルなデータ変換にのみ使用してください。より洗練された変換やデータ統合は、ksqlDB および Kafka ストリーム処理 を使用して処理してください。
  • 詳細については、「SMT の制限」を参照してください。

Cloud Console で SMT を使用する方法

重要

多くのコネクターで、アップデートされた UI を利用できます。コネクターの UI の SMT 部分が以下の例のようになっている場合、コネクターではアップデートされた UI が使用されています。

アップデートされた SMT UI

SMT UI のアップデート

以下の UI の違いに注意してください。

  • Transforms オプションは UI の詳細構成セクションにあります。SMT を追加するには、Show advanced configurations をクリックします。
  • Add a single message transform ボタンが 1 つあります。UI は異なりますが、SMT を作成するための機能は、以下の手順で説明されているものから変更されていません。
  • Predicate 機能は現在、アップデートされた UI では構成できません。CLI を使用してコネクター構成を作成する場合は、述語を作成できます。

以下の例の手順は、Confluent Cloud Console を使用した SMT の使用方法です。この手順では、Datagen Source Connector for Confluent Cloud を使用した例を示します。

注釈

以下の手順では、既に Cloud Console でコネクターの構成を開始しているものとします。

  1. Cloud Console のコネクター構成の UI には、SMT をサポートする 2 つの初期フィールド、Transforms および Predicates があります。Transforms フィールドに、SMT のわかりやすい名前(エイリアス)を入力します。たとえば、カスタマー ID をマスクする(MaskField$Value)場合は、エイリアスとして mask_userid と入力できます。

    Single Message Transforms の初期 UI フィールド

    変換

  2. Add Transform ボタンをクリックします。変換 org.apache.kafka.connect.transforms.MaskField$Value を選択すると、Transforms ダイアログボックスが表示されます。

    Single Message Transform の Mask フィールド

    Mask フィールドのオプション

    • fields (必須): マスクするレコードフィールドの名前を入力します。この例では、フィールド userid がマスクされます。
    • negate (省略可能): negate オプションは述語を反転します(使用される場合)。true を選択すると、"定義された述語の条件と一致しない" レコードにのみ述語が適用されます。使用しない場合は、false がデフォルトとして使用されます。
    • predicate (省略可能): 述語オプションを使用すると、SMT に述語の条件を使用する、または使用しないことができます。この SMT で述語の条件を使用する場合は、Predicates フィールドに、作成したエイリアスを入力します。
    • replacement (省略可能): このフィールドでは、置換文字列を設定します。この例では、userid が置き換えられ、****** と表示されます。

    注釈

    SMT 構成プロパティは、選択した変換タイプによってさまざまです。変換の構成プロパティと説明については、SMT の説明 を参照してください。

  3. 最初の変換の入力が終わったら、必要に応じて SMT を追加できます。

    ちなみに

    SMT は、Transforms フィールドにエイリアスが表示されている順に処理されます。

  4. Predicates の使用はオプションです。述語を追加する場合は、このフィールドに、使用する述語のエイリアスを入力します。述語のエイリアスは、SMT のプロパティの構成時に使用されます。この例では、述語のエイリアス tombstone_records が、述語 org.apache.kafka.connect.transforms.predicates.RecordIsTombstone に使用されます。

    Single Message Transforms の初期 UI フィールド

    述語

    述語を SMT に適用すると、述語に応じてコネクターが SMT を条件付きで適用します。述語では、コネクターが処理対象の各メッセージの評価に使用する述語の条件を指定します。レコードが評価される際に、コネクターは、まず述語の条件に対してレコードをチェックします。レコードが条件を満たしている場合、コネクターは、SMT をレコードに適用します。述語と一致しないレコードは、変更されないままで渡されます。negate オプションは述語を反転します。negatetrue を設定すると、"定義された述語の条件と一致しない" レコードにのみ述語が適用されます。

  5. 変換と述語の追加が終わったら、Next をクリックします。コネクター構成が表示されます。

    Single Message Transforms のコネクター構成

    コネクターの構成

  6. (省略可能) Data preview ボタン をクリックし、"record" セクションのデータ出力が想定どおりになっていることを確認します。プレビューの生成には数分かかることがあります。データのプレビューが想定どおりだった場合は、戻って、認証情報を追加し、コネクターを起動します。

コネクターが起動されると、トピックに変換済みのレコードが表示されます。

Single Message Transforms が適用されたレコード

変換済みのレコード

Confluent CLI で SMT を使用する方法

CLI を使用して読み込む JSON 構成ファイルを作成するには、以下のドキュメントを参照してください。

  • マネージド型コネクター のドキュメントには、各コネクターの構成プロパティおよび JSON ファイルの例が含まれています。
  • SMT のドキュメント には、基本的なコネクター構成に追加する必要がある変換の構成プロパティが含まれています。
  • 構成と変換の例については、「SMT の例」を参照してください。

失敗したレコード

ソースコネクター の場合、SMT の構成に誤りがあると、コネクターが失敗します。以下にトラブルシューティング情報を示します。

シンクコネクター の場合、コネクターの実行が継続されます。失敗したレコードは Confluent Cloud デッドレターキュー に送信されます。DLQ には、失敗したレコードの完全なスタックトレースが表示されます。

注釈

詳細については、「変換エイリアスの検証」を参照してください。

SMT およびデータプレビュー

SMT の場合、データプレビュー では、Kafka レコードに適用される変換ごとにレコードが作成されます。複数の変換の場合、各レコードが、Kafka レコードに順次適用された変換の 1 つを示します。このため、n 個の変換を構成した場合、データプレビュー は、n + 1 個のレコードを生成します。変換をプレビューする場合は、この点に注意してください。

以下の例では、ソースコネクターに 2 つの SMT が構成されています。以下の例で、変換ステップは、total_step:n 中の current_step:n で表されています。エントリ transformation_name および transformation_type は、ステップで適用される変換を示しています。

  1. 1 つ目のレコードは、コネクターで実際に受信されたソースレコードです。変換はまだ適用されていません。

    Single Message Transforms とデータプレビュー(1/3)

    変換は未適用

  2. 最初の変換(ValueToKey)が Kafka レコードに適用されます。Kafka レコードキーが、gender フィールド値として設定されます。

    Single Message Transforms とデータプレビュー(2/3)

    1 つ目の変換を適用

  3. 2 つ目の変換(MaskField$Value)が Kafka レコードに適用されます。Kafka レコード値が変換され、gender フィールド値が削除されます。

    Single Message Transforms とデータプレビュー(3/3)

    2 つ目の変換を適用

SMT の制限

次の制限事項に注意してください。

変換エイリアスの検証

いくつかのマネージド型コネクターでは、既に内部変換が設定されています。コネクターの内部構成テンプレートに存在するエイリアスと競合する変換エイリアスを追加した場合、Connect によって、以下の例のような検証エラーがスローされます。

Invalid value [internalxform, ..., internalxform] for configuration transforms: Duplicate alias provided.

上のエラーメッセージ内の internalxform は、追加したエイリアスと競合する、構成テンプレート内の内部エイリアスです。変換では、以下のエイリアスは使用できません。

マネージド型コネクター コネクタープラグイン 内部エイリアス
Amazon S3 Sink S3_SINK requireTimestampTransform
Google BigQuery Sink BigQuerySink requireMapTransform
HTTP Sink HttpSink requireTimestampTransform
Microsoft SQL Server CDC Source(Debezium) SqlServerCDC unwrap
MySQL CDC Source(Debezium) MySqlCDC unwrap
Postgres CDC Source(Debezium) PostgresCDC unwrap

サポートされない変換

  • 以下の SMT は現在、マネージド型コネクターではサポートされていません。
    • RegexRouter: ソースコネクターの場合、TopicRegexRouter SMT を使用する方法があります。
    • InsertHeader
    • DropHeaders
  • 一部の シンク コネクターでは、以下の変換をサポートしていません。
    • org.apache.kafka.connect.transforms.TimestampRouter
    • io.confluent.connect.transforms.MessageTimestampRouter
    • io.confluent.connect.transforms.ExtractTopic$Header
    • io.confluent.connect.transforms.ExtractTopic$Key
    • io.confluent.connect.transforms.ExtractTopic$Value
    • io.confluent.connect.cloud.transforms.TopicRegexRouter
  • 一部の ソース コネクターでは、以下の変換をサポートしていません。
    • org.apache.kafka.connect.transforms.HoistField$Value
    • org.apache.kafka.connect.transforms.ValueToKey

SMT リスト

注釈

すべてのマネージド型コネクターで、リストされている SMT がすべてサポートされているわけではありません。「サポートされない変換」を参照してください。

変換 説明
Cast フィールドや、キーまたは値の全体を特定の型に変換(キャスト)します(整数のフィールドを狭範囲の整数のフィールドに変換するなど)。
Drop レコードの 1 つのキーまたは値をドロップ(破棄)して null を設定します。
DropHeaders 現在、マネージド型コネクターでは利用できません。 各レコードから 1 つ以上のヘッダーをドロップします。
ExtractField スキーマが存在する場合は Struct から、スキーマのないデータの場合は Map から、指定されたフィールドを抽出します。null 値はすべてそのまま渡されます。
ExtractTopic レコードトピックを、キーまたは値から派生された新しいトピックに置き換えます。
Filter(Apache Kafka) すべてのレコードをドロップします。述語 と組み合わせて使用します。
Filter(Confluent) 構成可能な filter.condition と一致するレコードを含めるか、ドロップします。
Flatten ネストしたデータ構造をフラット化します。これにより、階層ごとにフィールド名が区切り文字で連結されて、それぞれのフィールド名が生成されます(区切り文字は変更することができます)。
GzipDecompress Not currently available for managed connectors. Gzip-decompress the entire byteArray key or value input.
HeaderFrom Not currently available for managed connectors. Moves or copies fields in a record key or value into the record's header.
HoistField スキーマが存在する場合は Struct の、スキーマのないデータの場合は Map の、指定されたフィールド名を使用してデータをラップします。
InsertField レコードメタデータの属性または構成された静的な値を使用してフィールドを挿入します。
InsertHeader 現在、マネージド型コネクターでは利用できません。 レコードヘッダーとしてリテラル値を挿入します。
MaskField 指定されたフィールドを、そのフィールドの型に対して有効な null 値でマスクします。
MessageTimeStampRouter 元のトピックの値とレコードのタイムスタンプフィールドに応じて、レコードのトピックフィールドを更新します。
RegexRouter 現在、マネージド型コネクターでは利用できません。 構成されている正規表現と置換文字列を使用して、レコードのトピックを更新します。
ReplaceField フィールドにフィルターをかけるか、フィールドの名前を変更します。
SetSchemaMetadata レコードのキーまたは値のスキーマに対して、スキーマ名とバージョンのいずれか、または両方を設定します。
TimestampConverter Unix エポック、文字列、Connect の Date 型および Timestamp 型など、タイムスタンプのフォーマットを別のフォーマットに変換します。
TimestampRouter 元のトピックの値とレコードのタイムスタンプに応じて、レコードのトピックフィールドを更新します。
TombstoneHandler tombstone レコードを管理します。tombstone レコードとは、ValueSchema の有無に関係なく、値フィールド全体が null になっているレコードと定義されています。
TopicRegexRouter マネージド型ソースコネクターのみで利用できます。 構成されている正規表現と置換文字列を使用して、レコードのトピックをアップデートします。
ValueToKey レコードキーを、レコード値のフィールドのサブセットから形成された新しいキーに置き換えます。

SMT の例

The following examples show connector configuration and transformation examples for source and sink connectors. The examples show configurations for the Datagen Source and the Google Cloud Functions Sink connectors.

注釈

  • 一部の SMT(Cast など)には、レコード値とレコードキーのいずれかに適用されるものがあります。Cast を例として使用すると、コネクターは、org.apache.kafka.connect.transforms.Cast$Value を Kafka レコード値に適用します。また、org.apache.kafka.connect.transforms.Cast$Key を Kafka レコードキーに適用します。
  • SMT の定義、構成プロパティの値、その他の例については、SMT のドキュメント を参照してください。

Cast

構成の例:

{
  "connector.class": "DatagenSource",
  "kafka.api.key": "${KEY}",
  "kafka.api.secret": "${SECRET}",
  "kafka.topic": "datagen-source-smt-cast",
  "max.interval": "3000",
  "name": "DatagenSourceSmtCast",
  "output.data.format": "JSON",
  "quickstart": "ORDERS",
  "tasks.max": "1",
  "transforms": "castValues",
  "transforms.castValues.type": "org.apache.kafka.connect.transforms.Cast$Value",
  "transforms.castValues.spec": "zipcode:float64, orderid:string, orderunits:int32"
}

変換の例:

{
  "ordertime": 1512446289869,
  "orderid": 417794,
  "itemid": "Item_430",
  "orderunits": 5.085317150755766,
  "address": {
    "city": "City_19",
    "state": "State_88",
    "zipcode": 72688
  }
}

Drop

構成の例:

{
  "connector.class": "DatagenSource",
  "kafka.api.key": "${KEY}",
  "kafka.api.secret": "${SECRET}",
  "kafka.topic": "datagen-source-smt-drop",
  "max.interval": "3000",
  "name": "DatagenSourceSmtDrop",
  "output.data.format": "JSON",
  "quickstart": "ORDERS",
  "tasks.max": "1",
  "transforms": "dropValue",
  "transforms.dropValue.type": "io.confluent.connect.transforms.Drop$Value"
}

変換の例:

{
  "ordertime": 1512446289869,
  "orderid": 417794,
  "itemid": "Item_430",
  "orderunits": 5.085317150755766,
  "address": {
    "city": "City_19",
    "state": "State_88",
    "zipcode": 72688
  }
}

Extract Field

構成の例:

{
  "connector.class": "DatagenSource",
  "kafka.api.key": "${KEY}",
  "kafka.api.secret": "${SECRET}",
  "kafka.topic": "datagen-source-smt-extract-field",
  "max.interval": "3000",
  "name": "DatagenSourceSmtExtractField",
  "output.data.format": "JSON",
  "quickstart": "ORDERS",
  "tasks.max": "1",
  "transforms": "extractAddress",
  "transforms.extractAddress.type": "org.apache.kafka.connect.transforms.ExtractField$Value",
  "transforms.extractAddress.field": "address"
}

変換の例:

{
  "ordertime": 1512446289869,
  "orderid": 417794,
  "itemid": "Item_430",
  "orderunits": 5.085317150755766,
  "address": {
    "city": "City_19",
    "state": "State_88",
    "zipcode": 72688
  }
}

Extract Topic

構成の例:

{
  "connector.class": "DatagenSource",
  "kafka.api.key": "${KEY}",
  "kafka.api.secret": "${SECRET}",
  "kafka.topic": "datagen-source-smt-extract-topic",
  "max.interval": "3000",
  "name": "DatagenSourceSmtExtractTopic",
  "output.data.format": "JSON",
  "quickstart": "ORDERS",
  "tasks.max": "1",
  "transforms": "setTopic",
  "transforms.setTopic.type": "io.confluent.connect.transforms.ExtractTopic$Value",
  "transforms.setTopic.field": "itemid"
}

変換の例:

{
  "ordertime": 1512446289869,
  "orderid": 417794,
  "itemid": "Item_430",
  "orderunits": 5.085317150755766,
  "address": {
    "city": "City_19",
    "state": "State_88",
    "zipcode": 72688
  }
}

Filter(Apache Kafka)

注釈

追加の Tombstone 述語を表示します。

構成の例:

{
  "connector.class": "DatagenSource",
  "kafka.api.key": "${KEY}",
  "kafka.api.secret": "${SECRET}",
  "kafka.topic": "datagen-source-smt-filter-ak",
  "max.interval": "3000",
  "name": "DatagenSourceSmtFilterAk",
  "output.data.format": "JSON",
  "quickstart": "ORDERS",
  "tasks.max": "1",
  "predicates": "isNull",
  "predicates.isNull.type": "org.apache.kafka.connect.transforms.predicates.RecordIsTombstone",
  "transforms": "dropValue, filterNull",
  "transforms.dropValue.type": "io.confluent.connect.transforms.Drop$Value",
  "transforms.filterNull.type": "org.apache.kafka.connect.transforms.Filter",
  "transforms.filterNull.predicate": "isNull"
}

変換の例:

{
  "ordertime": 1512446289869,
  "orderid": 417794,
  "itemid": "Item_430",
  "orderunits": 5.085317150755766,
  "address": {
    "city": "City_19",
    "state": "State_88",
    "zipcode": 72688
  }
}

Filter(Confluent)

構成の例:

{
  "connector.class": "DatagenSource",
  "kafka.api.key": "${KEY}",
  "kafka.api.secret": "${SECRET}",
  "kafka.topic": "datagen-source-smt-filter-cp",
  "max.interval": "3000",
  "name": "DatagenSourceSmtFilterCp",
  "output.data.format": "JSON",
  "quickstart": "RATINGS",
  "tasks.max": "1",
  "transforms": "filterValue",
  "transforms.filterValue.filter.condition": "$[?(@.channel == 'ios')]",
  "transforms.filterValue.filter.type": "include",
  "transforms.filterValue.type": "io.confluent.connect.transforms.Filter$Value"
}

変換の例:

[
  {
    "rating_id": 140,
    "user_id": 3,
    "stars": 4,
    "route_id": 7425,
    "rating_time": 1669,
    "channel": "ios",
    "message": "thank you for the most friendly, helpful experience today at your new lounge"
  },
  {
    "rating_id": 491,
    "user_id": 7,
    "stars": 4,
    "route_id": 3302,
    "rating_time": 5881,
    "channel": "iOS-test",
    "message": "more peanuts please"
  }
]

Flatten

構成の例:

{
  "connector.class": "DatagenSource",
  "kafka.api.key": "${KEY}",
  "kafka.api.secret": "${SECRET}",
  "kafka.topic": "datagen-source-smt-flatten",
  "max.interval": "3000",
  "name": "DatagenSourceSmtFlatten",
  "output.data.format": "JSON",
  "quickstart": "ORDERS",
  "tasks.max": "1",
  "transforms": "flatten",
  "transforms.flatten.type": "org.apache.kafka.connect.transforms.Flatten$Value",
  "transforms.flatten.delimiter": "_"
}

変換の例:

{
  "ordertime": 1491310657544,
  "orderid": 9,
  "itemid": "Item_826",
  "orderunits": 4.188698361592631,
  "address": {
    "city": "City_73",
    "state": "State_47",
    "zipcode": 54450
  }
}

Hoist Field

構成の例:

{
  "connector.class": "DatagenSource",
  "kafka.api.key": "${KEY}",
  "kafka.api.secret": "${SECRET}",
  "kafka.topic": "datagen-source-smt-hoist-field",
  "max.interval": "3000",
  "name": "DatagenSourceSmtHoistField",
  "output.data.format": "JSON",
  "quickstart": "ORDERS",
  "tasks.max": "1",
  "transforms": "hoist",
  "transforms.hoist.type": "org.apache.kafka.connect.transforms.HoistField$Value",
  "transforms.hoist.field": "wrapperField"
}

変換の例:

{
  "registertime": 1506959883575,
  "userid": "User_2",
  "regionid": "Region_1",
  "gender": "MALE"
}

Insert Field

構成の例:

{
  "connector.class": "DatagenSource",
  "kafka.api.key": "${KEY}",
  "kafka.api.secret": "${SECRET}",
  "kafka.topic": "datagen-source-smt-insert-field",
  "max.interval": "3000",
  "name": "DatagenSourceSmtInsertField",
  "output.data.format": "JSON",
  "quickstart": "ORDERS",
  "tasks.max": "1",
  "transforms": "insert",
  "transforms.insert.type": "org.apache.kafka.connect.transforms.InsertField$Value",
  "transforms.insert.partition.field": "PartitionField",
  "transforms.insert.static.field": "InsertedStaticField",
  "transforms.insert.static.value": "SomeValue",
  "transforms.insert.timestamp.field": "TimestampField",
  "transforms.insert.topic.field": "TopicField"
}

変換の例:

{
  "registertime": 1506959883575,
  "userid": "User_2",
  "regionid": "Region_1",
  "gender": "MALE"
}

Mask Field

構成の例:

{
  "connector.class": "DatagenSource",
  "kafka.api.key": "${KEY}",
  "kafka.api.secret": "${SECRET}",
  "kafka.topic": "datagen-source-smt-mask-field",
  "max.interval": "3000",
  "name": "DatagenSourceSmtMaskField",
  "output.data.format": "JSON",
  "quickstart": "USERS",
  "tasks.max": "1",
  "transforms": "mask",
  "transforms.mask.type": "org.apache.kafka.connect.transforms.MaskField$Value",
  "transforms.mask.fields": "gender",
  "transforms.mask.replacement": "REDACTED"
}

変換の例:

{
  "registertime": 1499746213074,
  "userid": "User_5",
  "regionid": "Region_4",
  "gender": "MALE"
}

Message Timestamp Router

構成の例:

現時点では使用不可

変換の例:

{
  "key": "User_8",
  "value": "{registertime=2021-09-03, gender=OTHER, regionid=Region_7, userid=User_8}",
  "topic": "msg_timestamp_router_topic",
  "partition": 0,
  "offset": 812925,
  "timestamp": 1628486671963
}

Replace Field

構成の例:

{
  "connector.class": "DatagenSource",
  "kafka.api.key": "${KEY}",
  "kafka.api.secret": "${SECRET}",
  "kafka.topic": "datagen-source-smt-replace-field",
  "max.interval": "3000",
  "name": "DatagenSourceSmtReplaceField",
  "output.data.format": "JSON",
  "quickstart": "USERS",
  "tasks.max": "1",
  "transforms": "replacefield",
  "transforms.replacefield.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
  "transforms.replacefield.exclude": "userid",
  "transforms.replacefield.include": "regionid",
  "transforms.replacefield.renames": "regionid:Region_Id"
}

変換の例:

{
  "topic": "TestTopic",
  "key": {
    "test": "value"
  },
  "value": {
    "userid": 1234,
    "regionid": "Region_3",
    "ModifiedBy": "XYZ"
  }
}

Set Schema Metadata

構成の例:

{
  "connector.class": "DatagenSource",
  "kafka.api.key": "${KEY}",
  "kafka.api.secret": "${SECRET}",
  "kafka.topic": "datagen-source-smt-set-schema-metadata",
  "max.interval": "3000",
  "name": "DatagenSourceSmtSetSchemaMetadata",
  "output.data.format": "AVRO",
  "quickstart": "ORDERS",
  "tasks.max": "1",
  "transforms": "setSchemaMetadata",
  "transforms.setSchemaMetadata.type": "org.apache.kafka.connect.transforms.SetSchemaMetadata$Value",
  "transforms.setSchemaMetadata.schema.name": "schema_name",
  "transforms.setSchemaMetadata.schema.version": "12"
}

変換の例:

{
  "connect.name": "ksql.users",
  "fields": [
    {
      "name": "registertime",
      "type": "long"
    },
    {
      "name": "userid",
      "type": "string"
    },
    {
      "name": "regionid",
      "type": "string"
    },
    {
      "name": "gender",
      "type": "string"
    }
  ],
  "name": "users",
  "namespace": "ksql",
  "type": "record"
}

Timestamp Converter

構成の例:

{
  "connector.class": "DatagenSource",
  "kafka.api.key": "${KEY}",
  "kafka.api.secret": "${SECRET}",
  "kafka.topic": "datagen-source-smt-ts-converter",
  "max.interval": "3000",
  "name": "DatagenSourceSmtTimestampConverter",
  "output.data.format": "JSON",
  "quickstart": "ORDERS",
  "tasks.max": "1",
  "transforms": "tsConverter",
  "transforms.tsConverter.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
  "transforms.tsConverter.field": "ordertime",
  "transforms.tsConverter.format": "yyyy-MM-dd",
  "transforms.tsConverter.target.type": "string"
}

変換の例:

{
  "topic": "TestTopic",
  "key": {
    "test": "value"
  },
  "value": {
    "ordertime": 1628035200000,
    "orderid": "ABC",
    "itemid": "XYZ"
  }
}

Timestamp Router

構成の例:

現時点では使用不可

変換の例:

{
  "key": "User_8",
  "value": "{registertime=1491516816009, gender=FEMALE, regionid=Region_4, userid=User_8}",
  "topic": "topic"
}

Tombstone Handler

構成の例:

{
  "connector.class": "DatagenSource",
  "kafka.api.key": "${KEY}",
  "kafka.api.secret": "${SECRET}",
  "kafka.topic": "datagen-source-smt-tombstone-handler",
  "max.interval": "3000",
  "name": "DatagenSourceSmtTombstoneHandler",
  "output.data.format": "JSON",
  "quickstart": "ORDERS",
  "tasks.max": "1",
  "transforms": "dropValue,tombstoneFail",
  "transforms.dropValue.type": "io.confluent.connect.transforms.Drop$Value",
  "transforms.tombstoneFail.type": "io.confluent.connect.transforms.TombstoneHandler",
  "transforms.tombstoneFail.behavior": "fail"
}

変換の例:

{
  "topic": "TestTopic",
  "key": {
    "test": "value"
  },
  "value": null
}

トピック正規表現ルーター

構成の例:

{
  "connector.class": "DatagenSource",
  "kafka.api.key": "${KEY}",
  "kafka.api.secret": "${SECRET}",
  "kafka.topic": "datagen-source-users-json",
  "max.interval": "3000",
  "name": "DatagenSourceSmtTopicRegexRouter",
  "output.data.format": "JSON",
  "quickstart": "USERS",
  "tasks.max": "1",
  "transforms": "addPrefixRegex",
  "transforms.addPrefixRegex.type": "io.confluent.connect.cloud.transforms.TopicRegexRouter",
  "transforms.addPrefixRegex.regex": ".*",
  "transforms.addPrefixRegex.replacement": "prefix_$0"
}

変換の例:

{
  "key": "User_8",
  "value": "{registertime=2021-09-03, gender=OTHER, regionid=Region_7, userid=User_8}",
  "topic": "datagen-source-users-json",
  "partition": 0,
  "offset": 812925,
  "timestamp": 1628486671963
}

Value To Key

構成の例:

{
  "connector.class": "DatagenSource",
  "kafka.api.key": "${KEY}",
  "kafka.api.secret": "${SECRET}",
  "kafka.topic": "datagen-source-smt-value-to-key",
  "max.interval": "3000",
  "name": "DatagenSourceSmtValueToKey",
  "output.data.format": "JSON",
  "quickstart": "USERS",
  "tasks.max": "1",
  "transforms": "valueToKey",
  "transforms.valueToKey.type": "org.apache.kafka.connect.transforms.ValueToKey",
  "transforms.valueToKey.fields": "registertime, userid"
}

変換の例:

{
  "topic": "TestTopic",
  "key": {
    "test": "value"
  },
  "value": {
    "userid": 1234,
    "registertime": "ABC",
    "ModifiedBy": "XYZ"
  }
}