ValueToKey¶
ここでは、Apache Kafka® SMT org.apache.kafka.connect.transforms.ValueToKey
の使用方法を説明します。
説明¶
レコードキーを、レコード値のフィールドのサブセットから形成された新しいキーに置き換えます。
例¶
以下の例は、ValueToKey
を単独で使用する方法、および 2 つ目の SMT と組み合わせて使用する方法を示しています。
フィールドのメッセージキーへの変換¶
この構成スニペットは、ValueToKey
を使用して、UserId
、city
、state
フィールドをメッセージキーに変換する方法を示しています。
"transforms": "ValueToKey",
"transforms.ValueToKey.type": "org.apache.kafka.connect.transforms.ValueToKey",
"transforms.ValueToKey.fields": "userId,city,state"
変換前 : {"userId": 12, "address": "1942 Wilhelm Boulevard", "city": "Topeka", "state": "KS", "country": "US"}
変換後: {"userId": 12, "city": "Topeka", "state": "KS"}
変換のチェーン化¶
SMT を組み合わせて使用すると、より複雑な変換を実行できます。
以下の例は、ValueToKey
と ExtractField
の SMT をチェーン化して、JDBC コネクター からのデータのキーを設定する方法を示しています。変換では、ValueToKey
によりメッセージ c1
フィールドがメッセージキーにコピーされ、ExtractField
によりそのフィールドの整数部分のみが抽出されます。
"transforms": "createKey,extractInt",
"transforms.createKey.type": "org.apache.kafka.connect.transforms.ValueToKey",
"transforms.createKey.fields": "c1",
"transforms.extractInt.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.extractInt.field": "c1"
変換前のメッセージの状態を以下に示します。
"./bin/kafka-avro-console-consumer \
--bootstrap-server localhost:9092 \
--property schema.registry.url=http://localhost:8081 \
--property print.key=true \
--from-beginning \
--topic mysql-foobar
null {"c1":{"int":1},"c2":{"string":"foo"},"create_ts":1501796305000,"update_ts":1501796305000}
null {"c1":{"int":2},"c2":{"string":"foo"},"create_ts":1501796665000,"update_ts":1501796665000}
コネクターの構成が適用された後、新しい行が MySQL テーブルに挿入(パイプ)されます。
"echo "insert into foobar (c1,c2) values (100,'bar');"|mysql --user=username --password=pw demo
以下の内容が Avro コンソールコンシューマーに表示されます。キー(行の最初の値)が変換で定義されていた c1 の値と一致していることに注目してください。
100 {"c1":{"int":100},"c2":{"string":"bar"},"create_ts":1501799535000,"update_ts":1501799535000}
プロパティ¶
名前 | 説明 | 型 | デフォルト | 指定可能な値 | 重要度 |
---|---|---|---|---|---|
fields |
レコードキーとして抽出する、レコード値のフィールド名。 | list | 空でないリスト | 高 |
述語¶
"述語" を使用することにより、一定の条件を満たすレコードのみに変換が適用されるように、変換を構成することができます。述語は変換チェーンで使用することができ、Filter(Apache Kafka) と組み合わせると、条件に基づいて特定のレコードを除外できます。詳細と例については、「述語」を参照してください。