Schemas with Tableflow in Confluent Cloud

Schemas play a crucial role in defining the structure and format of materialized tables in Tableflow.

With Tableflow, schemas are central to the process of tiering Kafka topics into Apache Iceberg™ tables. When a Kafka topic is enabled for Tableflow, the data in the topic is transformed into an Iceberg table by using the schema associated with the topic. If no schema is configured, the default binary key-value schema is applied. Topics that have multiple schemas are materialized into a binary key-value format. This schema-driven tiering ensures that the data is correctly formatted and can be efficiently queried using the Iceberg REST Catalog.

Type mappings

Table representation of a Kafka topic

It’s important to recognize that Tableflow creates a table representation of a Kafka topic. Tableflow automatically uses the latest schema in Schema Registry to create the associated table schema. Topics and their schema represent the source of truth for the table representation of the underlying data.

For example, given the following Avro schema in Schema Registry:

{
  "fields": [
    {
      "name": "side",
      "type": "string"
    },
    {
      "name": "quantity",
      "type": "int"
    },
    {
      "name": "symbol",
      "type": "string"
    },
    {
      "name": "price",
      "type": "int"
    },
    {
      "name": "account",
      "type": "string"
    },
    {
      "name": "userid",
      "type": "string"
    }
  ],
  "name": "StockTrade",
   "type": "record"
}

Tableflow generates the following table DDL:

CREATE TABLE cluster-id.topic_name (
  key binary,
  side string,
  quantity int,
  symbol string,
  price int,
  account string,
  userid string,
);

You can use CREATE TABLE statements in Confluent Cloud for Apache Flink® to generate a table-compatible schema in Schema Registry. For example, the following Flink SQL DDL results in the same Avro schema and table schema shown previously:

CREATE TABLE `environment`.`cluster`.`topicname` (
  `key` VARBINARY(2147483647),
  `side` VARCHAR(2147483647),
  `quantity`,
  `symbol` VARCHAR(2147483647),
  `price` INT,
  `account` VARCHAR(2147483647),
  `userid` VARCHAR(2147483647)
);

Key schemas

In the default append usage for Tableflow, key schemas act as additional fields in the table definition.

For example, given the following KEY and VALUE schema:

{
  "fields": [
    {
      "name": "userid",
      "type": "int"
    }
  ],
  "name": "tableflowschema_key",
  "namespace": "org.apache.flink.avro.generated.record",
  "type": "record"
}

{
  "fields": [
    {
      "default": null,
      "name": "textbox",
      "type": [
        "null",
        "string"
      ]
    }
  ],
  "name": "tableflowschema_value",
  "namespace": "org.apache.flink.avro.generated.record",
  "type": "record"
}

Tableflow generates the following table DDL:

CREATE TABLE cluster-id.topicname (
  userid int,
  textbox string
);

Schema compatibility and evolution

Tableflow supports schema evolution by using Confluent Schema Registry as the source of truth. You can define schemas and set compatibility modes within the Schema Registry. When evolving event schemas for Kafka topics, schemas must adhere to the defined compatibility rules.

Consumers of tables materialized by Tableflow can access them only in read-only mode, meaning consumers can query the data but can’t modify the schema. Schema evolution is managed at the Kafka event level and validated through Schema Registry before being applied during materialization. Tableflow enforces schema consistency, ensuring that any changes originate from the data source and adhere to predefined evolution rules.

Screenshot of AWS Glue Console showing Tableflow integration

Tableflow validates schema changes against the Schema Registry during table materialization to ensure compliance. If Tableflow encounters data that doesn’t conform to the specified schema evolution rules, it suspends materialization for the affected topic.

When Tableflow is enabled on an existing topic, it utilizes the current schema and materializes the data from the beginning of the log. For this, Tableflow must be able to read the source topic from the beginning. You should use a FULL_TRANSITIVE schema compatibility mode in Schema Registry.

Tableflow supports the following schema changes.

  • Add optional fields without default values. When you add a new column, you must provide an optional field to ensure backward compatibility with data corresponding to older versions of the schema. Iceberg does not fully support default values.

    The code example shows an optional field in Avro format.

    {
      "name": "firstName",
      "type": [
        "null",
        "string"
      ]
    }
    
  • Type widening for the following types.

    • int to long
    • float to double
    • Decimal precision increase

Other schema changes are not supported in backward compatibility mode.

Compatibility types other than backward are not supported.

Using schemas without Schema Registry serializers

Tableflow supports cases in which there is a schema in Schema Registry using a TopicNameStrategy but the data has not been serialized with a Schema Registry serializer. This means there is no magic byte present in the Kafka message. This enables retroactively applying schema to a topic without requiring application changes. The data must still be serialized in the same format as the schema in Schema Registry, which means that if the Kafka data is serialized in Avro format, you must use an Avro schema in Schema Registry.

The following example shows how to accomplish this.

Example: Enable Tableflow without using a Schema Registry Serializer

Suppose you have published data that is serialized using the following Avro schema.

// Product.svsc
{
  "type": "record",
  "name": "Product",
  "namespace": "com.example",
  "fields": [
    { "name": "id", "type": "int" },
    { "name": "name", "type": "string" },
    { "name": "description", "type": "string" }
  ]
}

The following example Java producer code publishes Avro-serialized data using the provided schema but does not rely on Schema Registry.

Properties props = new Properties();
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
...

// Create and serialize an Avro Product instance
Product product = new Product(1, "Product-1", "Description for Product-1");
byte[] avroBytes;
try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream()) {
    BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(outputStream, null);
    DatumWriter<Product> writer = new SpecificDatumWriter<>(Product.class);
    writer.write(product, encoder);
    encoder.flush();
    avroBytes = outputStream.toByteArray();
} catch (IOException e) {
    throw new RuntimeException("Serialization error: " + e.getMessage(), e);
}

// Publish the message
ProducerRecord<String, byte[]> record = new ProducerRecord<>(topic, String.valueOf(product.getId()), avroBytes);
producer.send(record, (metadata, exception) -> {
    if (exception == null) {
        System.out.println("Sent: " + product.getName() + " -> Topic: " + metadata.topic() + " Offset: " + metadata.offset());
    } else {
        exception.printStackTrace();
    }
});

producer.flush();
producer.close();

Once you publish data to this topic, to enable Tableflow, follow these steps.

  1. In Confluent Cloud Console, navigate to the topic that has data serialized using the previous Avro schema. Click Enable Tableflow, and you are prompted to create a schema.
  2. Create a new schema using the same format as the data serialized in this topic, ensuring it matches the correct schema type, which in this example is Avro.
  3. Continue with the next steps and enable Tableflow on the topic. Tableflow can now materialize the data using the configured schema.

Schema limitations with Tableflow

  • Schema strategy: Only TopicNameStrategy is supported.

  • No schema changes are allowed for keys.

  • Dropping columns is not supported.

  • Conditional schemas are not supported.

  • Cyclic schemas are not supported.

  • Adding new nested fields to an existing schema is not supported.

  • Avro specific limitations:

    • Tableflow does not support the enum type. An Enum is parsed as a string.
    • Tableflow does not support timestamp-micros. This type is parsed as bigint / long.
    • Tableflow does not support uuid. This type is parsed as string.
    • Tableflow does not support duration. This type is parsed as binary / fixed.
    • union(null, type) becomes nullable(type).
    • union(type1,type2,...) becomes row(type1, type2, ...).
    • Tableflow does not support raw union schema, for example, {["string", "int"]}
    • Tableflow does not support multiset schemas.
  • JSON specific limitations:

    • Tableflow does not support AllOf, AnyOf, and OneOf schemas.

    • If then else schemas are not supported.

    • NullSchema is not supported.

    • JSON schemas with extra records outside of the schema may throw a null pointer exception or a cast exception in the deserializer.

      For example, if you have fields a, b, and c, and you submit a record with a = 1, random field = null, random field is interpreted as b, and if b isn’t nullable, an exception is thrown. The Kafka serializer permits this record to be written.

Avro schema type mapping

The following sections describe all of the Avro types that are supported by Tableflow in Confluent Cloud. They include primitives, complex types, logical types, Connect types and types supported by Confluent Cloud for Apache Flink.

For an Avro schema registered in Schema Registry, the root type must always be a record.

Avro primitive types

The following section describes Tableflow support for materializing primitive types.

Avro types are shown with corresponding Iceberg types and example Avro schemas.

boolean bytes
double fixed
float int
string  

boolean

  • Iceberg type: int
  • Support status: Supported
Example Avro schema
{
  "name" : "column0",
  "type" : "boolean"
}

bytes

  • Iceberg type: binary
  • Support status: Supported
Example Avro schema
{
  "name" : "column0",
  "type" : "bytes"
}

double

  • Iceberg type: double
  • Support status: Supported
Example Avro schema
{
  "name" : "column0",
  "type" : "double"
}

fixed

  • Iceberg type: fixed
  • Support status: Supported
Example Avro schema
{
  "name" : "column0",
  "type" : "fixed"
}

float

  • Iceberg type: float
  • Support status: Supported
Example Avro schema
{
  "name" : "column0",
  "type" : "float"
}

int

  • Iceberg type: int
  • Support status: Supported
Example Avro schema
{
  "name" : "column0",
  "type" : "int"
}

string

  • Iceberg type: string
  • Support status: Supported
Example Avro schema
{
  "name" : "column0",
  "type" : "string"
}

Avro logical types

The following section describes Tableflow support for materializing Avro logical types.

Avro types are shown with corresponding Iceberg types and example Avro schemas.

date decimal
duration local-timestamp-micros
local-timestamp-millis time-micros
time-millis timestamp-micros
timestamp-millis uuid

date

  • Iceberg type: date
  • Support status: Supported
Value range
  • min = -2147483648 (Integer.MIN_VALUE)
  • max = 2147483647 (Integer.MAX_VALUE)

The date type represents the number of days from the unix epoch, 1 January 1970.

Example Avro schema
{
  "name": "column0",
  "type": {
    "type": "int",
    "logicalType": "date"
  }
}

decimal

  • Iceberg type: decimal
  • Support status: Supported
Value range
  • min = -10 (precision-scale) + 1
  • max = 10 (precision-scale) − 1
Additional properties
  • precision (required) min value is 1
  • precision (required) max value is 38
  • scale (optional) default value is 0, must be less than or equal to precision

There is no validation to ensure that the required precision property is present in the schema.

The Iceberg decimal type can map either to the Avro bytes or fixed logical types.

Example Avro schema
{
  "name": "column0",
  "type": {
    "type": "bytes",
    "logicalType": "decimal",
    "precision": 12,
    "scale": 10
  }
}
{
  "name": "column0",
  "type": {
    "type": "fixed",
    "name": "fixedColumn",
    "namespace": "",
    "size": 5,
    "logicalType": "decimal",
    "precision": 31,
    "scale": 18
  }
}

duration

  • Iceberg type: fixed
  • Support status: Supported

Even though duration is part of the Avro spec, it is not implemented in the Avro Java libraries, so Tableflow interprets it as a fixed type.

Example Avro schema
{
  "name": "column",
  "type": {
    "type": "fixed",
    "logicalType": "duration",
    "name" : "fixedColumn",
    "size": 12
  }
}

local-timestamp-micros

  • Iceberg type: timestamp adjustToUTC = false
  • Support status: Supported

local timestamp (microsecond precision) represents a timestamp in a local timezone. The long value stores the number of microseconds from 1 January 1970 00:00:00.000000.

Value range
  • min = 0
  • max = 9223372036854775807 (Long.MAX_VALUE)
Additional properties

flink.precision → default/min/max = 6/0/6

  • flink.precision default value is 6
  • flink.precision minimum value is 0
  • flink.precision maximum value is 6

If flink.precision <= 3, the value is interpreted as milliseconds instead of microseconds during Avro-to-row-data conversion.

If flink.precision >=4 and <=6, the value is interpreted as microseconds.

flink.precision > 6 is not supported.

Example Avro schema
{
  "name": "column0",
  "type": {
    "type": "long",
    "logicalType": "local-timestamp-micros",
    "flink.precision": 5,
    "flink.version": 1,
    "arg.properties": {
      "range": {
        "min": 0,
        "max": 9223372036854775807
      }
    }
  }
}

local-timestamp-millis

  • Iceberg type: timestamp adjustToUTC = false : timestamp without zone
  • Support status: Supported (millisecond precision not supported)
Value range
  • min = -9223372036854775
  • max = 9223372036854775 (Long.MAX_VALUE/1000)
Additional properties
  • flink.precision default value is 3
  • flink.precision minimum value is 0
  • flink.precision maximum value is 3

flink.precision > 3 is not supported.

Because this is a millisecond value, there is a chance for long overflow and value to become negative, so accepted values are only within the specified range.

Negative values are supported.

Example Avro schema
{
  "name": "column0",
  "type": {
    "type": "long",
    "logicalType": "local-timestamp-millis",
    "flink.precision": 2,
    "flink.version": 1,
    "arg.properties": {
      "range": {
        "min": -9223372036854775,
        "max": 9223372036854775
      }
    }
  }
}

time-micros

  • Iceberg type: long
  • Support status: Supported
Value range
  • min = -9223372036854775807
  • max = 9223372036854775807 (Long.MAX_VALUE)

Flink doesn’t support reading Avro time-micros as a TIME type. Flink supports TIME with precision up to 3. time-micros is read and written as BIGINT.

The flink.precision value has no significance, because the value is interpreted as BIGINT and not TIME.

Example Avro schema
{
  "name": "column0",
  "type": {
    "type": "long",
    "logicalType": "time-micros",
    "flink.precision": 1,
    "flink.version": 1,
    "arg.properties": {
      "range": {
        "min": -9223372036854775807,
        "max": 9223372036854775807
      }
    }
  }
}

time-millis

  • Iceberg type: time (time of day without date, timezone)
  • Support status: Supported (millisecond precision not supported, will be microsecond precision)
Value range
  • min = 0
  • max = 86400000 (number of milliseconds in a day)
Additional properties
  • flink.precision default value is 3
  • flink.precision minimum value is 0
  • flink.precision maximum value is 3

flink.precision > 3 is not supported.

Important

Materialisation occurs even if the value is outside the range, but Iceberg reading will fail and the table will become unusable.

Example Avro schema
{
  "name": "column0",
  "type": {
    "type": "int",
    "logicalType": "time-milllis",
    "flink.precision": 1,
    "flink.version": 1,
    "arg.properties": {
      "range": {
        "min": 0,
        "max": 86400000
      }
    }
  }
}

timestamp-micros

  • Iceberg type: timestamptz adjustToUTC = true, microsecond precision with timezone
  • Support status: Supported

Represents a timestamp, with microsecond precision, independent of a particular time zone or calendar. The long stores the number of microseconds from the unix epoch, 1 January 1970 00:00:00.000000 UTC.

Value range
  • min = 0
  • max = 9223372036854775807 (Long.MAX_VALUE)
Additional properties
  • flink.precision default value is 6
  • flink.precision minimum value is 0
  • flink.precision maximum value is 6

If flink.precision <= 3, the value is interpreted as milliseconds instead of microseconds during Avro-to-row-data conversion.

If flink.precision >=4 and <=6, the value is interpreted as microseconds.

flink.precision > 6 is not supported.

Negative values are not supported.

Example Avro schema
{
  "name": "column0",
  "type": {
    "type": "long",
    "logicalType": "timestamp-micros",
    "flink.precision": 5,
    "flink.version": 1,
    "arg.properties": {
      "range": {
        "min": 0,
        "max": 9223372036854775807
      }
    }
  }
}

timestamp-millis

  • Iceberg type: timestamptz microsecond precision with timezone
  • Support status: Supported (millisecond precision not supported)

Represents a timestamp with millisecond precision, independent of a particular time zone or calendar. The long stores the number of milliseconds from the unix epoch, 1 January 1970 00:00:00.000 UTC.

Value range
  • min = -9223372036854775
  • max = 9223372036854775 (Long.MAX_VALUE/1000)
Additional properties
  • flink.precision default value is 3
  • flink.precision minimum value is 0
  • flink.precision maximum value is 3

flink.precision > 3 is not supported.

Because this is a millisecond value, there is a chance for long overflow and value to become negative, so accepted values are only within the specified range.

Negative values are supported.

Example Avro schema
{
  "name": "column0",
  "type": {
    "type": "long",
    "logicalType": "timestamp-millis",
    "flink.precision": 2,
    "flink.version": 1,
    "arg.properties": {
      "range": {
        "min": -9223372036854775,
        "max": 9223372036854775
      }
    }
  }
}

uuid

  • Iceberg type: string
  • Support status: Supported
Value range
  • min = 0
  • max = 86400000 (number of milliseconds in a day)
Additional properties
  • flink.maxLength
  • flink.minLength

flink.maxLength or flink.minLength don’t have any significance, because there is no validation to ensure that string length is within the range.

According to the Avro spec, the string must conform with RFC-4122, but there is no validation to ensure this. Any string value is accepted even if the logical type is uuid.

Example Avro schema
{
  "name": "column0",
  "type": {
    "type": "string",
    "arg.properties": {
      "regex": "[a-zA-Z]*",
      "length": {
        "min": 15,
        "max": 20
      }
    },
    "logicalType": "uuid",
    "flink.maxLength": 10,
    "flink.minLength": 5,
    "flink.version": "1"
  }
}

Avro complex types

The following section describes Tableflow support for materializing Avro complex types.

Avro types are shown with corresponding Iceberg types and example Avro schemas.

array enum
fixed map
record union
union (general)  

array

  • Iceberg type: list
  • Support status: Supported
Example Avro schema
{
  "type": "array",
  "items" : "string",
  "default": []
}

enum

  • Iceberg type: string
  • Support status: Supported
Example Avro schema
{
  "type": "enum",
  "name": "Suit",
  "symbols" : ["SPADES", "HEARTS", "DIAMONDS", "CLUBS"]
}

fixed

  • Iceberg type: binary
  • Support status: Supported
Additional properties
  • size: number of bytes per value
Example Avro schema
{
  "type": "fixed",
  "size": 16,
  "name": "md5"
}

map

  • Iceberg type: map
  • Support status: Supported
Example Avro schema
{
  "type": "map",
  "values" : "long",
  "default": {}
}

record

  • Iceberg type: struct
  • Support status: Supported
Example Avro schema
{
  "type": "record",
  "name": "LongList",
  "aliases": ["LinkedLongs"],                      // old name for this
  "fields" : [
    {"name": "value", "type": "long"},             // each element has a long
    {"name": "next", "type": ["null", "LongList"]} // optional next element
  ]
}

union

  • Iceberg type: union
  • Support status: Supported
Example Avro schema
["null", "string"]

union (general)

  • Iceberg type: struct
  • Support status: Supported
Example Avro schema
[
  "long",
  "string"
]

Confluent-specific types

The following section describes Tableflow support for materializing Connect and Confluent Cloud for Apache Flink types.

Confluent-specific types are shown with corresponding Iceberg types and example Avro schemas.

array

  • Iceberg type: map
  • Support status: Supported
Example schema
{
  "type" : "array",
  "items" : {
    "type" : "record",
    "name" : "MapEntry",
    "namespace" : "io.confluent.connect.avro",
    "fields" : [ {
      "name" : "key",
      "type" : "int"
    }, {
      "name" : "value",
      "type" : "bytes"
    } ]
  }
}

int8

  • Iceberg type: int
  • Support status: Supported
Additional properties
  • connect.type

The int8 type is represented as TINYINT in Flink.

Example schema
{
  "name": "column0",
  "type": {
    "type": "int",
    "connect.type": "int8"
}

int16

  • Iceberg type: int
  • Support status: Not supported
Additional properties
  • connect.type

The int16 type is represented as SMALLINT in Flink.

Example schema
{
  "name": "column0",
  "type": {
    "type": "int",
    "connect.type": "int16"
}

multiset

  • Iceberg type: map
  • Support status: Not supported
Example schema
{
  "type" : "map",
  "values" : "int",
  "flink.type" : "multiset",
  "flink.version" : "1"
}
{
  "type" : "array",
  "items" : {
    "type" : "record",
    "name" : "MapEntry",
    "namespace" : "io.confluent.connect.avro",
    "fields" : [ {
      "name" : "key",
      "type" : "long"
    }, {
      "name" : "value",
      "type" : "int"
    } ]
  },
  "flink.type" : "multiset",
  "flink.version" : "1"
}

JSON schema type mapping

The following section describes Tableflow support for materializing JSON schema types.

JSON schema types are shown with corresponding Iceberg types and example JSON schemas.

ARRAY BIGINT
BINARY BOOLEAN
CHAR DATE
DECIMAL DOUBLE
FLOAT INT
MAP_K_V MAP_VARCHAR_V
MULTISET[K] MULTISET[VARCHAR]
NUMBER ROW
SMALLINT TIME
TIMESTAMP TIMESTAMP_LTZ
TINYINT VARBINARY
VARCHAR  

ARRAY

  • Iceberg type: required list<time>
  • Support status: Supported
JSON input schema
"ARRAY": {
  "type": "array",
  "items": {
    "type": "number",
    "title": "org.apache.kafka.connect.data.Time",
    "flink.precision": 2,
    "connect.type": "int32",
    "flink.version": "1"
  }
}

BIGINT

  • Iceberg type: required long
  • Support status: Supported
JSON input schema
"BIGINT": {
  "type": "number",
  "connect.type": "int64"
}

BINARY

  • Iceberg type: required string
  • Support status: Supported
JSON input schema
"BINARY": {
  "type": "string",
  "connect.type": "bytes",
  "flink.minLength": 123,
  "flink.maxLength": 123,
  "flink.version": "1"
}

BOOLEAN

  • Iceberg type: required boolean
  • Support status: Supported
JSON input schema
"BOOLEAN": {
  "type": "boolean"
}

CHAR

  • Iceberg type: required string
  • Support status: Supported
JSON input schema
"CHAR": {
  "type": "string",
  "minLength": 123,
  "maxLength": 123
}

DATE

  • Iceberg type: optional date
  • Support status: Supported
JSON input schema
"DATE": {
  "type": "number",
  "connect.type": "int32",
  "title": "org.apache.kafka.connect.data.Date"
}

DECIMAL

  • Iceberg type: optional decimal(10, 2)
  • Support status: Supported
JSON input schema
"DECIMAL": {
  "type": "number",
  "connect.type": "bytes",
  "title": "org.apache.kafka.connect.data.Decimal",
  "connect.parameters": {
    "scale": "2"
  }
}

DOUBLE

  • Iceberg type: required double
  • Support status: Supported
JSON input schema
"DOUBLE": {
  "type": "number",
  "connect.type": "float64"
}

FLOAT

  • Iceberg type: required float
  • Support status: Supported
JSON input schema
"FLOAT": {
  "type": "number",
  "connect.type": "float32"
}

INT

  • Iceberg type: required int
  • Support status: Supported
JSON input schema
"INT": {
  "type": "number",
  "connect.type": "int32"
}

MAP_K_V

  • Iceberg type: required map<int, long>
  • Support status: Supported
JSON input schema
"MAP_K_V": {
  "type": "array",
  "connect.type": "map",
  "items": {
    "type": "object",
    "properties": {
      "key": {
        "type": "number",
        "connect.type": "int32"
      },
      "value": {
        "type": "number",
        "connect.type": "int64"
      }
    }
  }
}

MAP_VARCHAR_V

  • Iceberg type: required map<string, long>
  • Support status: Supported
JSON input schema
"MAP_VARCHAR_V": {
  "type": "object",
  "connect.type": "map",
  "additionalProperties": {
    "type": "number",
    "connect.type": "int64"
  }
}

MULTISET[K]

  • Iceberg type: n/a
  • Support status: Not supported
JSON input schema
"MULTISET[K]": {
  "type": "array",
  "connect.type": "map",
  "flink.type": "multiset",
  "items": {
    "type": "object",
    "properties": {
      "value": {
        "type": "number",
        "connect.type": "int64"
      },
      "key": {
        "type": "number",
        "connect.type": "int32"
      }
    }
  }
}

MULTISET[VARCHAR]

  • Iceberg type: n/a
  • Support status: Not supported
JSON input schema
"MULTISET[VARCHAR]": {
{
  "type": "object",
  "connect.type": "map",
  "flink.type": "multiset",
  "additionalProperties": {
    "type": "number",
    "connect.type": "int64"
  }
}

NUMBER

  • Iceberg type: required double
  • Support status: Supported
JSON input schema
"NUMBER" : {
  "type" : "number",
  "nullable" : true
}

ROW

  • Iceberg type: required struct<36: field1: optional string, 37: field2: optional int, 38: field3: optional boolean>
  • Support status: Supported
JSON input schema
"ROW": {
  "type": "object",
  "properties": {
    "field1": {
      "type": "string"
    },
    "field2": {
      "type": "number",
      "connect.type": "int32"
    },
    "field3": {
      "type": "boolean"
    }
  }
}

SMALLINT

  • Iceberg type: required int
  • Support status: Supported
JSON input schema
"SMALLINT": {
  "type": "number",
  "connect.type": "int16"
}

TIME

  • Iceberg type: optional time
  • Support status: Supported
JSON input schema
"TIME": {
  "type": "number",
  "title": "org.apache.kafka.connect.data.Time",
  "flink.precision": 2,
  "connect.type": "int32",
  "flink.version": "1"
}

TIMESTAMP

  • Iceberg type: optional timestamp
  • Support status: Supported
JSON input schema
"TIMESTAMP": {
  "type":"number",
  "flink.precision":2,
  "flink.type":"timestamp",
  "connect.type":"int64",
  "flink.version":"1"
}

TIMESTAMP_LTZ

  • Iceberg type: required timestamptz
  • Support status: Supported
JSON input schema
"TIMESTAMP_LTZ": {
  "type": "number",
  "title": "org.apache.kafka.connect.data.Timestamp",
  "flink.precision": 2,
  "connect.type": "int64",
  "flink.version": "1"
}

TINYINT

  • Iceberg type: required int
  • Support status: Supported
JSON input schema
"TINYINT": {
  "type": "number",
  "connect.type": "int8"
}

VARBINARY

  • Iceberg type: required binary
  • Support status: Supported
JSON input schema
"VARBINARY": {
  "type": "string",
  "connect.type": "bytes",
  "flink.maxLength": 123,
  "flink.version": "1"
}

VARCHAR

  • Iceberg type: required string
  • Support status: Supported
JSON input schema
"VARCHAR": {
  "type": "string",
  "maxLength": 123
}

Protobuf schema type mapping

The following section describes Tableflow support for materializing Protobuf schema types.

Protobuf schema types are shown with corresponding Iceberg types and example Protobuf schemas.

ARRAY BOOLEAN
BIGINT BINARY
CHAR DATE
DECIMAL DOUBLE
FLOAT INT
MAP_K_V ROW
SMALLINT TIMESTAMP
TIMESTAMP_LTZ TIME_WITHOUT_TIME_ZONE
TINYINT VARCHAR

ARRAY

  • Iceberg type: required list<long>
  • Support status: Supported
Protobuf schema
repeated int64 ARRAY = 19;

BOOLEAN

  • Iceberg type: optional boolean
  • Support status: Supported
Protobuf schema
optional bool BOOLEAN = 3;

BIGINT

  • Iceberg type: optional long
  • Support status: Supported
Protobuf schema
optional int64 BIGINT = 1;

BINARY

  • Iceberg type: optional fixed[6]
  • Support status: Supported
Protobuf schema
optional bytes BINARY = 2 [(confluent.field_meta) = {
    doc: "Example field documentation",
    params: [
      {
        key: "flink.maxLength",
        value: "6"
      },
      {
        key: "flink.minLength",
        value: "6"
      },
      {
        key: "flink.version",
        value: "1"
      }
    ]
  }];

CHAR

  • Iceberg type: optional string
  • Support status: Supported
Protobuf schema
optional string CHAR = 4 [(confluent.field_meta) = {
  params: [
    {
      key: "flink.version",
      value: "1"
    },
    {
      key: "flink.minLength",
      value: "123"
    },
    {
      key: "flink.maxLength",
      value: "123"
    }
  ]
}];

DATE

  • Iceberg type: optional date
  • Support status: Supported
Protobuf schema
optional .google.type.Date DATE = 5;

DECIMAL

  • Iceberg type: optional decimal(5, 1)
  • Support status: Supported
Protobuf schema
optional .confluent.type.Decimal DECIMAL = 7 [(confluent.field_meta) = {
  params: [
    {
      key: "flink.version",
      value: "1"
    },
    {
      value: "1",
      key: "scale"
    },
    {
      value: "5",
      key: "precision"
    }
  ]
}];

DOUBLE

  • Iceberg type: optional double
  • Support status: Supported
Protobuf schema
optional double DOUBLE = 6;

FLOAT

  • Iceberg type: optional float
  • Support status: Supported
Protobuf schema
optional float FLOAT = 8;

INT

  • Iceberg type: optional float
  • Support status: Supported
Protobuf schema
optional int32 INT = 9;

MAP_K_V

  • Iceberg type: required map<string, long>
  • Support status: Supported
Protobuf schema
repeated MapEntry MAP_K_V = 10;

message MapEntry {
  option map_entry = true;

  optional string key = 1;
  optional int64 value = 2;
}

ROW

  • Iceberg type: optional struct<32: a: optional string, 33: b: optional double>
  • Support status: Supported
Protobuf schema
optional meta_Row ROW = 11;

message meta_Row {
  optional string a = 1;
  optional double b = 2;
}

SMALLINT

  • Iceberg type: optional int
  • Support status: Supported
Protobuf schema
optional int32 SMALLINT = 12 [(confluent.field_meta) = {
    params: [
      {
        key: "flink.version",
        value: "1"
      },
      {
        key: "connect.type",
        value: "int16"
      }
    ]
  }];

TIMESTAMP

  • Iceberg type: optional timestamp
  • Support status: Supported
Protobuf schema
optional .google.protobuf.Timestamp TIMESTAMP = 13 [(confluent.field_meta) = {
  params: [
    {
      key: "flink.version",
      value: "1"
    },
    {
      key: "flink.type",
      value: "timestamp"
    },
    {
      key: "flink.precision",
      value: "3"
    }
  ]
}];

TIMESTAMP_LTZ

  • Iceberg type: optional timestamptz
  • Support status: Supported
Protobuf schema
optional .google.protobuf.Timestamp TIMESTAMP_LTZ = 14 [(confluent.field_meta) = {
  params: [
    {
      key: "flink.precision",
      value: "3"
    },
    {
      key: "flink.version",
      value: "1"
    }
  ]
}];

TIME_WITHOUT_TIME_ZONE

  • Iceberg type: optional time
  • Support status: Supported
Protobuf schema
optional .google.type.TimeOfDay TIME_WITHOUT_TIME_ZONE = 15 [(confluent.field_meta) = {
  params: [
    {
      key: "flink.precision",
      value: "3"
    },
    {
      key: "flink.version",
      value: "1"
    }
  ]
}];

TINYINT

  • Iceberg type: optional binary
  • Support status: Supported
Protobuf schema
optional int32 TINYINT = 16 [(confluent.field_meta) = {
    params: [
      {
        key: "flink.version",
        value: "1"
      },
      {
        key: "connect.type",
        value: "int8"
      }
    ]
  }];

VARCHAR

  • Iceberg type: optional string
  • Support status: Supported
Protobuf schema
optional string VARCHAR = 18 [(confluent.field_meta) = {
  params: [
    {
      key: "flink.maxLength",
      value: "123"
    },
    {
      key: "flink.version",
      value: "1"
    }
  ]
}];