Kafka Connect Flatten (Confluent) SMT Usage Reference for Confluent Cloud

The following provides usage information for the Confluent Single Message Transforms (SMT) io.confluent.connect.transforms.Flatten.

Description

The Confluent Flatten SMT transforms a record’s key or value by converting complex nested structures (such as structs) into flat, top-level fields using a delimiter. This SMT also handles nested maps encountered within struct schemas. You can configure the behavior.on.nested.map configuration to either stop processing records (fail) if such nested maps are found, or to include them as is (include) without flattening their contents. The JSON SMT delimiter is the period . character, which is also the default.

You can use the concrete transformation type designed for the record key (io.confluent.connect.transforms.Flatten$Key) or value (io.confluent.connect.transforms.Flatten$Value).

When using an Avro schema, the default SMT delimiter period . cannot be used. Use an underscore _ for the delimiter. Use the following properties to change the delimiter:

"transforms": "flatten",
"transforms.flatten.type": "io.confluent.connect.transforms.Flatten$Value",
"transforms.flatten.delimiter": "_"

JSON example

This configuration snippet shows how to use Flatten to flatten a record’s value while including a nested map without flattening its internal structure. The default delimiter (.) is used.

"transforms": "flatten",
"transforms.flatten.type": "io.confluent.connect.transforms.Flatten$Value",
"transforms.flattenValue.behavior.on.nested.map": "INCLUDE",
"transforms.flatten.delimiter": "."

Before:

{
  "user": {
      "id": "1234",
      "attributes": { "role": "admin", "dept": "engineering" },
      "profile": {
          "address": {
          "city": "Bangalore",
          "metadata": { "zone": "east", "block": "A" }
         },
      "age": 30
      }
     }
 }

After:

{
  "user.id": "1234",
  "user.attributes": { "role": "admin", "dept": "engineering" },
  "user.profile.address.city": "Bangalore",
  "user.profile.address.metadata": { "zone": "east", "block": "A" },
  "user.profile.age": 30
 }

If transforms.flattenValue.behavior.on.nested.maps is set to fail, records with nested maps are not processed, and the connector will fail to launch.

AVRO example

The Avro schema specification only allows alphanumeric characters and the underscore _ character in field names. The configuration snippet below shows how to use Flatten to concatenate field names with the underscore _ delimiter character.

"transforms": "flatten",
"transforms.flatten.type": "org.apache.kafka.connect.transforms.Flatten$Value",
"transforms.flattenValue.behavior.on.nested.map": "INCLUDE",
"transforms.flatten.delimiter": "_"

Before:

{
  "user": {
      "id": "1234",
      "attributes": { "role": "admin", "dept": "engineering" },
      "profile": {
          "address": {
          "city": "Bangalore",
          "metadata": { "zone": "east", "block": "A" }
         },
      "age": 30
      }
     }
 }

After:

{
  "user_id": "1234",
  "user_attributes": { "role": "admin", "dept": "engineering" },
  "user_profile_address_city": "Bangalore",
  "user_profile_address_metadata": { "zone": "east", "block": "A" },
  "user_profile_age": 30
 }

If transforms.flattenValue.behavior.on.nested.maps is set to fail, records with nested maps are not processed, and the connector will fail to launch.

Tip

For additional examples, see Flatten for managed connectors.

Properties

Name Description Type Default Valid Values Importance
delimiter Delimiter to insert between field names from the input record when generating field names for the output record. string . _ medium

Predicates

Transformations can be configured with predicates so that the transformation is applied only to records which satisfy a condition. You can use predicates in a transformation chain and, when combined with the Kafka Connect Filter (Kafka) SMT Usage Reference for Confluent Cloud, predicates can conditionally filter out specific records. For details and examples, see Predicates.