ValueToKey

ここでは、Apache Kafka® SMT org.apache.kafka.connect.transforms.ValueToKey の使用方法を説明します。

説明

レコードキーを、レコード値のフィールドのサブセットから形成された新しいキーに置き換えます。

以下の例は、ValueToKey を単独で使用する方法、および 2 つ目の SMT と組み合わせて使用する方法を示しています。

フィールドのメッセージキーへの変換

この構成スニペットは、ValueToKey を使用して、UserIdcitystate フィールドをメッセージキーに変換する方法を示しています。

"transforms": "ValueToKey",
"transforms.ValueToKey.type": "org.apache.kafka.connect.transforms.ValueToKey",
"transforms.ValueToKey.fields": "userId,city,state"

変換前 : {"userId": 12, "address": "1942 Wilhelm Boulevard", "city": "Topeka", "state": "KS", "country": "US"}

変換後: {"userId": 12, "city": "Topeka", "state": "KS"}

変換のチェーン化

SMT を組み合わせて使用すると、より複雑な変換を実行できます。

以下の例は、ValueToKeyExtractField の SMT をチェーン化して、JDBC コネクター からのデータのキーを設定する方法を示しています。変換では、ValueToKey によりメッセージ c1 フィールドがメッセージキーにコピーされ、ExtractField によりそのフィールドの整数部分のみが抽出されます。

"transforms": "createKey,extractInt",
"transforms.createKey.type": "org.apache.kafka.connect.transforms.ValueToKey",
"transforms.createKey.fields": "c1",
"transforms.extractInt.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.extractInt.field": "c1"

変換前のメッセージの状態を以下に示します。

"./bin/kafka-avro-console-consumer \
                              --bootstrap-server localhost:9092 \
                              --property schema.registry.url=http://localhost:8081 \
                              --property print.key=true \
                              --from-beginning \
                              --topic mysql-foobar

null {"c1":{"int":1},"c2":{"string":"foo"},"create_ts":1501796305000,"update_ts":1501796305000}
null {"c1":{"int":2},"c2":{"string":"foo"},"create_ts":1501796665000,"update_ts":1501796665000}

コネクターの構成が適用された後、新しい行が MySQL テーブルに挿入(パイプ)されます。

"echo "insert into foobar (c1,c2) values (100,'bar');"|mysql --user=username --password=pw demo

以下の内容が Avro コンソールコンシューマーに表示されます。キー(行の最初の値)が変換で定義されていた c1 の値と一致していることに注目してください。

100 {"c1":{"int":100},"c2":{"string":"bar"},"create_ts":1501799535000,"update_ts":1501799535000}

プロパティ

名前 説明 デフォルト 指定可能な値 重要度
fields レコードキーとして抽出する、レコード値のフィールド名。 list   空でないリスト

述語

"述語" を使用することにより、一定の条件を満たすレコードのみに変換が適用されるように、変換を構成することができます。述語は変換チェーンで使用することができ、Filter(Apache Kafka) と組み合わせると、条件に基づいて特定のレコードを除外できます。詳細と例については、「述語」を参照してください。