Schemas with Tableflow in Confluent Cloud¶
Schemas play a crucial role in defining the structure and format of materialized tables in Tableflow.
With Tableflow, schemas are central to the process of tiering Kafka topics into Apache Iceberg™ tables. When a Kafka topic is enabled for Tableflow, the data in the topic is transformed into an Iceberg table by using the schema associated with the topic. If no schema is configured, the default binary key-value schema is applied. Topics that have multiple schemas are materialized into a binary key-value format. This schema-driven tiering ensures that the data is correctly formatted and can be efficiently queried using the Iceberg REST Catalog.
- Table representation of a Kafka topic
- Key schemas
- Schema compatibility and evolution
- Using schemas without Schema Registry serializers
- Schema limitations with Tableflow
- Type mappings
Table representation of a Kafka topic¶
It’s important to recognize that Tableflow creates a table representation of a Kafka topic. Tableflow automatically uses the latest schema in Schema Registry to create the associated table schema. Topics and their schema represent the source of truth for the table representation of the underlying data.
For example, given the following Avro schema in Schema Registry:
{
"fields": [
{
"name": "side",
"type": "string"
},
{
"name": "quantity",
"type": "int"
},
{
"name": "symbol",
"type": "string"
},
{
"name": "price",
"type": "int"
},
{
"name": "account",
"type": "string"
},
{
"name": "userid",
"type": "string"
}
],
"name": "StockTrade",
"type": "record"
}
Tableflow generates the following table DDL:
CREATE TABLE cluster-id.topic_name (
key binary,
side string,
quantity int,
symbol string,
price int,
account string,
userid string,
);
You can use CREATE TABLE statements in Confluent Cloud for Apache Flink® to generate a table-compatible schema in Schema Registry. For example, the following Flink SQL DDL results in the same Avro schema and table schema shown previously:
CREATE TABLE `environment`.`cluster`.`topicname` (
`key` VARBINARY(2147483647),
`side` VARCHAR(2147483647),
`quantity`,
`symbol` VARCHAR(2147483647),
`price` INT,
`account` VARCHAR(2147483647),
`userid` VARCHAR(2147483647)
);
Key schemas¶
In the default append
usage for Tableflow, key schemas act as additional
fields in the table definition.
For example, given the following KEY and VALUE schema:
{
"fields": [
{
"name": "userid",
"type": "int"
}
],
"name": "tableflowschema_key",
"namespace": "org.apache.flink.avro.generated.record",
"type": "record"
}
{
"fields": [
{
"default": null,
"name": "textbox",
"type": [
"null",
"string"
]
}
],
"name": "tableflowschema_value",
"namespace": "org.apache.flink.avro.generated.record",
"type": "record"
}
Tableflow generates the following table DDL:
CREATE TABLE cluster-id.topicname (
userid int,
textbox string
);
Schema compatibility and evolution¶
Tableflow supports schema evolution by using Confluent Schema Registry as the source of truth. You can define schemas and set compatibility modes within the Schema Registry. When evolving event schemas for Kafka topics, schemas must adhere to the defined compatibility rules.
Consumers of tables materialized by Tableflow can access them only in read-only mode, meaning consumers can query the data but can’t modify the schema. Schema evolution is managed at the Kafka event level and validated through Schema Registry before being applied during materialization. Tableflow enforces schema consistency, ensuring that any changes originate from the data source and adhere to predefined evolution rules.
Tableflow validates schema changes against the Schema Registry during table materialization to ensure compliance. If Tableflow encounters data that doesn’t conform to the specified schema evolution rules, it suspends materialization for the affected topic.
When Tableflow is enabled on an existing topic, it utilizes the current schema and materializes the data from the beginning of the log. For this, Tableflow must be able to read the source topic from the beginning. You should use a FULL_TRANSITIVE schema compatibility mode in Schema Registry.
Tableflow supports the following schema changes.
Add optional fields without default values. When you add a new column, you must provide an optional field to ensure backward compatibility with data corresponding to older versions of the schema. Iceberg does not fully support default values.
The code example shows an optional field in Avro format.
{ "name": "firstName", "type": [ "null", "string" ] }
Type widening for the following types.
int
tolong
float
todouble
Decimal
precision increase
Other schema changes are not supported in backward compatibility mode.
Compatibility types other than backward are not supported.
Using schemas without Schema Registry serializers¶
Tableflow supports cases in which there is a schema in Schema Registry using a TopicNameStrategy but the data has not been serialized with a Schema Registry serializer. This means there is no magic byte present in the Kafka message. This enables retroactively applying schema to a topic without requiring application changes. The data must still be serialized in the same format as the schema in Schema Registry, which means that if the Kafka data is serialized in Avro format, you must use an Avro schema in Schema Registry.
The following example shows how to accomplish this.
Example: Enable Tableflow without using a Schema Registry Serializer¶
Suppose you have published data that is serialized using the following Avro schema.
// Product.svsc
{
"type": "record",
"name": "Product",
"namespace": "com.example",
"fields": [
{ "name": "id", "type": "int" },
{ "name": "name", "type": "string" },
{ "name": "description", "type": "string" }
]
}
The following example Java producer code publishes Avro-serialized data using the provided schema but does not rely on Schema Registry.
Properties props = new Properties();
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
...
// Create and serialize an Avro Product instance
Product product = new Product(1, "Product-1", "Description for Product-1");
byte[] avroBytes;
try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream()) {
BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(outputStream, null);
DatumWriter<Product> writer = new SpecificDatumWriter<>(Product.class);
writer.write(product, encoder);
encoder.flush();
avroBytes = outputStream.toByteArray();
} catch (IOException e) {
throw new RuntimeException("Serialization error: " + e.getMessage(), e);
}
// Publish the message
ProducerRecord<String, byte[]> record = new ProducerRecord<>(topic, String.valueOf(product.getId()), avroBytes);
producer.send(record, (metadata, exception) -> {
if (exception == null) {
System.out.println("Sent: " + product.getName() + " -> Topic: " + metadata.topic() + " Offset: " + metadata.offset());
} else {
exception.printStackTrace();
}
});
producer.flush();
producer.close();
Once you publish data to this topic, to enable Tableflow, follow these steps.
- In Confluent Cloud Console, navigate to the topic that has data serialized using the previous Avro schema. Click Enable Tableflow, and you are prompted to create a schema.
- Create a new schema using the same format as the data serialized in this topic, ensuring it matches the correct schema type, which in this example is Avro.
- Continue with the next steps and enable Tableflow on the topic. Tableflow can now materialize the data using the configured schema.
Schema limitations with Tableflow¶
Schema strategy: Only
TopicNameStrategy
is supported.No schema changes are allowed for keys.
Dropping columns is not supported.
Conditional schemas are not supported.
Cyclic schemas are not supported.
Adding new nested fields to an existing schema is not supported.
Avro specific limitations:
- Tableflow does not support the
enum
type. An Enum is parsed as astring
. - Tableflow does not support
timestamp-micros
. This type is parsed asbigint
/long
. - Tableflow does not support
uuid
. This type is parsed asstring
. - Tableflow does not support
duration
. This type is parsed asbinary
/fixed
. union(null, type)
becomesnullable(type)
.union(type1,type2,...)
becomesrow(type1, type2, ...)
.- Tableflow does not support raw union schema, for example,
{["string", "int"]}
- Tableflow does not support multiset schemas.
- Tableflow does not support the
JSON specific limitations:
Tableflow does not support
AllOf
,AnyOf
, andOneOf
schemas.If then else
schemas are not supported.NullSchema
is not supported.JSON schemas with extra records outside of the schema may throw a null pointer exception or a cast exception in the deserializer.
For example, if you have fields
a
,b
, andc
, and you submit a record witha = 1, random field = null
,random field
is interpreted asb
, and ifb
isn’t nullable, an exception is thrown. The Kafka serializer permits this record to be written.
Avro schema type mapping¶
The following sections describe all of the Avro types that are supported by Tableflow in Confluent Cloud. They include primitives, complex types, logical types, Connect types and types supported by Confluent Cloud for Apache Flink.
For an Avro schema registered in Schema Registry, the root type must always be a record.
Avro primitive types¶
The following section describes Tableflow support for materializing primitive types.
Avro types are shown with corresponding Iceberg types and example Avro schemas.
boolean | bytes |
double | fixed |
float | int |
string |
boolean¶
- Iceberg type: int
- Support status: Supported
- Example Avro schema
{ "name" : "column0", "type" : "boolean" }
bytes¶
- Iceberg type: binary
- Support status: Supported
- Example Avro schema
{ "name" : "column0", "type" : "bytes" }
double¶
- Iceberg type: double
- Support status: Supported
- Example Avro schema
{ "name" : "column0", "type" : "double" }
fixed¶
- Iceberg type: fixed
- Support status: Supported
- Example Avro schema
{ "name" : "column0", "type" : "fixed" }
float¶
- Iceberg type: float
- Support status: Supported
- Example Avro schema
{ "name" : "column0", "type" : "float" }
int¶
- Iceberg type: int
- Support status: Supported
- Example Avro schema
{ "name" : "column0", "type" : "int" }
string¶
- Iceberg type: string
- Support status: Supported
- Example Avro schema
{ "name" : "column0", "type" : "string" }
Avro logical types¶
The following section describes Tableflow support for materializing Avro logical types.
Avro types are shown with corresponding Iceberg types and example Avro schemas.
date | decimal |
duration | local-timestamp-micros |
local-timestamp-millis | time-micros |
time-millis | timestamp-micros |
timestamp-millis | uuid |
date¶
- Iceberg type: date
- Support status: Supported
- Value range
- min = -2147483648 (Integer.MIN_VALUE)
- max = 2147483647 (Integer.MAX_VALUE)
The date
type represents the number of days from the unix epoch,
1 January 1970.
- Example Avro schema
{ "name": "column0", "type": { "type": "int", "logicalType": "date" } }
decimal¶
- Iceberg type: decimal
- Support status: Supported
- Value range
- min = -10 (precision-scale) + 1
- max = 10 (precision-scale) − 1
- Additional properties
- precision (required) min value is 1
- precision (required) max value is 38
- scale (optional) default value is 0, must be less than or equal to precision
There is no validation to ensure that the required precision property is present in the schema.
The Iceberg
decimal
type can map either to the Avrobytes
orfixed
logical types.- Example Avro schema
{ "name": "column0", "type": { "type": "bytes", "logicalType": "decimal", "precision": 12, "scale": 10 } }
{ "name": "column0", "type": { "type": "fixed", "name": "fixedColumn", "namespace": "", "size": 5, "logicalType": "decimal", "precision": 31, "scale": 18 } }
duration¶
- Iceberg type: fixed
- Support status: Supported
Even though duration is part of the Avro spec, it is not implemented in the Avro Java libraries, so Tableflow interprets it as a fixed type.
- Example Avro schema
{ "name": "column", "type": { "type": "fixed", "logicalType": "duration", "name" : "fixedColumn", "size": 12 } }
local-timestamp-micros¶
- Iceberg type: timestamp adjustToUTC = false
- Support status: Supported
local timestamp (microsecond precision) represents a timestamp in a local
timezone. The long
value stores the number of microseconds from
1 January 1970 00:00:00.000000.
- Value range
- min = 0
- max = 9223372036854775807 (Long.MAX_VALUE)
- Additional properties
flink.precision → default/min/max = 6/0/6
- flink.precision default value is 6
- flink.precision minimum value is 0
- flink.precision maximum value is 6
If flink.precision <= 3, the value is interpreted as milliseconds instead of microseconds during Avro-to-row-data conversion.
If flink.precision >=4 and <=6, the value is interpreted as microseconds.
flink.precision > 6 is not supported.
- Example Avro schema
{ "name": "column0", "type": { "type": "long", "logicalType": "local-timestamp-micros", "flink.precision": 5, "flink.version": 1, "arg.properties": { "range": { "min": 0, "max": 9223372036854775807 } } } }
local-timestamp-millis¶
- Iceberg type: timestamp adjustToUTC = false : timestamp without zone
- Support status: Supported (millisecond precision not supported)
- Value range
- min = -9223372036854775
- max = 9223372036854775 (Long.MAX_VALUE/1000)
- Additional properties
- flink.precision default value is 3
- flink.precision minimum value is 0
- flink.precision maximum value is 3
flink.precision > 3 is not supported.
Because this is a millisecond value, there is a chance for long overflow and value to become negative, so accepted values are only within the specified range.
Negative values are supported.
- Example Avro schema
{ "name": "column0", "type": { "type": "long", "logicalType": "local-timestamp-millis", "flink.precision": 2, "flink.version": 1, "arg.properties": { "range": { "min": -9223372036854775, "max": 9223372036854775 } } } }
time-micros¶
- Iceberg type: long
- Support status: Supported
- Value range
- min = -9223372036854775807
- max = 9223372036854775807 (Long.MAX_VALUE)
Flink doesn’t support reading Avro time-micros as a TIME type. Flink supports TIME with precision up to 3. time-micros is read and written as BIGINT.
The flink.precision value has no significance, because the value is interpreted as BIGINT and not TIME.
- Example Avro schema
{ "name": "column0", "type": { "type": "long", "logicalType": "time-micros", "flink.precision": 1, "flink.version": 1, "arg.properties": { "range": { "min": -9223372036854775807, "max": 9223372036854775807 } } } }
time-millis¶
- Iceberg type: time (time of day without date, timezone)
- Support status: Supported (millisecond precision not supported, will be microsecond precision)
- Value range
- min = 0
- max = 86400000 (number of milliseconds in a day)
- Additional properties
- flink.precision default value is 3
- flink.precision minimum value is 0
- flink.precision maximum value is 3
flink.precision > 3 is not supported.
Important
Materialisation occurs even if the value is outside the range, but Iceberg reading will fail and the table will become unusable.
- Example Avro schema
{ "name": "column0", "type": { "type": "int", "logicalType": "time-milllis", "flink.precision": 1, "flink.version": 1, "arg.properties": { "range": { "min": 0, "max": 86400000 } } } }
timestamp-micros¶
- Iceberg type: timestamptz adjustToUTC = true, microsecond precision with timezone
- Support status: Supported
Represents a timestamp, with microsecond precision, independent of a particular
time zone or calendar. The long
stores the number of microseconds from the
unix epoch, 1 January 1970 00:00:00.000000 UTC.
- Value range
- min = 0
- max = 9223372036854775807 (Long.MAX_VALUE)
- Additional properties
- flink.precision default value is 6
- flink.precision minimum value is 0
- flink.precision maximum value is 6
If flink.precision <= 3, the value is interpreted as milliseconds instead of microseconds during Avro-to-row-data conversion.
If flink.precision >=4 and <=6, the value is interpreted as microseconds.
flink.precision > 6 is not supported.
Negative values are not supported.
- Example Avro schema
{ "name": "column0", "type": { "type": "long", "logicalType": "timestamp-micros", "flink.precision": 5, "flink.version": 1, "arg.properties": { "range": { "min": 0, "max": 9223372036854775807 } } } }
timestamp-millis¶
- Iceberg type: timestamptz microsecond precision with timezone
- Support status: Supported (millisecond precision not supported)
Represents a timestamp with millisecond precision, independent of a particular
time zone or calendar. The long
stores the number of milliseconds from the
unix epoch, 1 January 1970 00:00:00.000 UTC.
- Value range
- min = -9223372036854775
- max = 9223372036854775 (Long.MAX_VALUE/1000)
- Additional properties
- flink.precision default value is 3
- flink.precision minimum value is 0
- flink.precision maximum value is 3
flink.precision > 3 is not supported.
Because this is a millisecond value, there is a chance for long overflow and value to become negative, so accepted values are only within the specified range.
Negative values are supported.
- Example Avro schema
{ "name": "column0", "type": { "type": "long", "logicalType": "timestamp-millis", "flink.precision": 2, "flink.version": 1, "arg.properties": { "range": { "min": -9223372036854775, "max": 9223372036854775 } } } }
uuid¶
- Iceberg type: string
- Support status: Supported
- Value range
- min = 0
- max = 86400000 (number of milliseconds in a day)
- Additional properties
- flink.maxLength
- flink.minLength
flink.maxLength or flink.minLength don’t have any significance, because there is no validation to ensure that string length is within the range.
According to the Avro spec, the string must conform with RFC-4122, but there is no validation to ensure this. Any string value is accepted even if the logical type is uuid.
- Example Avro schema
{ "name": "column0", "type": { "type": "string", "arg.properties": { "regex": "[a-zA-Z]*", "length": { "min": 15, "max": 20 } }, "logicalType": "uuid", "flink.maxLength": 10, "flink.minLength": 5, "flink.version": "1" } }
Avro complex types¶
The following section describes Tableflow support for materializing Avro complex types.
Avro types are shown with corresponding Iceberg types and example Avro schemas.
array | enum |
fixed | map |
record | union |
union (general) |
array¶
- Iceberg type: list
- Support status: Supported
- Example Avro schema
{ "type": "array", "items" : "string", "default": [] }
enum¶
- Iceberg type: string
- Support status: Supported
- Example Avro schema
{ "type": "enum", "name": "Suit", "symbols" : ["SPADES", "HEARTS", "DIAMONDS", "CLUBS"] }
fixed¶
- Iceberg type: binary
- Support status: Supported
- Additional properties
- size: number of bytes per value
- Example Avro schema
{ "type": "fixed", "size": 16, "name": "md5" }
map¶
- Iceberg type: map
- Support status: Supported
- Example Avro schema
{ "type": "map", "values" : "long", "default": {} }
record¶
- Iceberg type: struct
- Support status: Supported
- Example Avro schema
{ "type": "record", "name": "LongList", "aliases": ["LinkedLongs"], // old name for this "fields" : [ {"name": "value", "type": "long"}, // each element has a long {"name": "next", "type": ["null", "LongList"]} // optional next element ] }
union (general)¶
- Iceberg type: struct
- Support status: Supported
- Example Avro schema
[ "long", "string" ]
Confluent-specific types¶
The following section describes Tableflow support for materializing Connect and Confluent Cloud for Apache Flink types.
Confluent-specific types are shown with corresponding Iceberg types and example Avro schemas.
array¶
- Iceberg type: map
- Support status: Supported
- Example schema
{ "type" : "array", "items" : { "type" : "record", "name" : "MapEntry", "namespace" : "io.confluent.connect.avro", "fields" : [ { "name" : "key", "type" : "int" }, { "name" : "value", "type" : "bytes" } ] } }
int8¶
- Iceberg type: int
- Support status: Supported
- Additional properties
- connect.type
The int8 type is represented as TINYINT in Flink.
- Example schema
{ "name": "column0", "type": { "type": "int", "connect.type": "int8" }
int16¶
- Iceberg type: int
- Support status: Not supported
- Additional properties
- connect.type
The int16 type is represented as SMALLINT in Flink.
- Example schema
{ "name": "column0", "type": { "type": "int", "connect.type": "int16" }
multiset¶
- Iceberg type: map
- Support status: Not supported
- Example schema
{ "type" : "map", "values" : "int", "flink.type" : "multiset", "flink.version" : "1" }
{ "type" : "array", "items" : { "type" : "record", "name" : "MapEntry", "namespace" : "io.confluent.connect.avro", "fields" : [ { "name" : "key", "type" : "long" }, { "name" : "value", "type" : "int" } ] }, "flink.type" : "multiset", "flink.version" : "1" }
JSON schema type mapping¶
The following section describes Tableflow support for materializing JSON schema types.
JSON schema types are shown with corresponding Iceberg types and example JSON schemas.
ARRAY | BIGINT |
BINARY | BOOLEAN |
CHAR | DATE |
DECIMAL | DOUBLE |
FLOAT | INT |
MAP_K_V | MAP_VARCHAR_V |
MULTISET[K] | MULTISET[VARCHAR] |
NUMBER | ROW |
SMALLINT | TIME |
TIMESTAMP | TIMESTAMP_LTZ |
TINYINT | VARBINARY |
VARCHAR |
ARRAY¶
- Iceberg type: required list<time>
- Support status: Supported
- JSON input schema
"ARRAY": { "type": "array", "items": { "type": "number", "title": "org.apache.kafka.connect.data.Time", "flink.precision": 2, "connect.type": "int32", "flink.version": "1" } }
BIGINT¶
- Iceberg type: required long
- Support status: Supported
- JSON input schema
"BIGINT": { "type": "number", "connect.type": "int64" }
BINARY¶
- Iceberg type: required string
- Support status: Supported
- JSON input schema
"BINARY": { "type": "string", "connect.type": "bytes", "flink.minLength": 123, "flink.maxLength": 123, "flink.version": "1" }
BOOLEAN¶
- Iceberg type: required boolean
- Support status: Supported
- JSON input schema
"BOOLEAN": { "type": "boolean" }
CHAR¶
- Iceberg type: required string
- Support status: Supported
- JSON input schema
"CHAR": { "type": "string", "minLength": 123, "maxLength": 123 }
DATE¶
- Iceberg type: optional date
- Support status: Supported
- JSON input schema
"DATE": { "type": "number", "connect.type": "int32", "title": "org.apache.kafka.connect.data.Date" }
DECIMAL¶
- Iceberg type: optional decimal(10, 2)
- Support status: Supported
- JSON input schema
"DECIMAL": { "type": "number", "connect.type": "bytes", "title": "org.apache.kafka.connect.data.Decimal", "connect.parameters": { "scale": "2" } }
DOUBLE¶
- Iceberg type: required double
- Support status: Supported
- JSON input schema
"DOUBLE": { "type": "number", "connect.type": "float64" }
FLOAT¶
- Iceberg type: required float
- Support status: Supported
- JSON input schema
"FLOAT": { "type": "number", "connect.type": "float32" }
INT¶
- Iceberg type: required int
- Support status: Supported
- JSON input schema
"INT": { "type": "number", "connect.type": "int32" }
MAP_K_V¶
- Iceberg type: required map<int, long>
- Support status: Supported
- JSON input schema
"MAP_K_V": { "type": "array", "connect.type": "map", "items": { "type": "object", "properties": { "key": { "type": "number", "connect.type": "int32" }, "value": { "type": "number", "connect.type": "int64" } } } }
MAP_VARCHAR_V¶
- Iceberg type: required map<string, long>
- Support status: Supported
- JSON input schema
"MAP_VARCHAR_V": { "type": "object", "connect.type": "map", "additionalProperties": { "type": "number", "connect.type": "int64" } }
MULTISET[K]¶
- Iceberg type: n/a
- Support status: Not supported
- JSON input schema
"MULTISET[K]": { "type": "array", "connect.type": "map", "flink.type": "multiset", "items": { "type": "object", "properties": { "value": { "type": "number", "connect.type": "int64" }, "key": { "type": "number", "connect.type": "int32" } } } }
MULTISET[VARCHAR]¶
- Iceberg type: n/a
- Support status: Not supported
- JSON input schema
"MULTISET[VARCHAR]": { { "type": "object", "connect.type": "map", "flink.type": "multiset", "additionalProperties": { "type": "number", "connect.type": "int64" } }
NUMBER¶
- Iceberg type: required double
- Support status: Supported
- JSON input schema
"NUMBER" : { "type" : "number", "nullable" : true }
ROW¶
- Iceberg type: required struct<36: field1: optional string, 37: field2: optional int, 38: field3: optional boolean>
- Support status: Supported
- JSON input schema
"ROW": { "type": "object", "properties": { "field1": { "type": "string" }, "field2": { "type": "number", "connect.type": "int32" }, "field3": { "type": "boolean" } } }
SMALLINT¶
- Iceberg type: required int
- Support status: Supported
- JSON input schema
"SMALLINT": { "type": "number", "connect.type": "int16" }
TIME¶
- Iceberg type: optional time
- Support status: Supported
- JSON input schema
"TIME": { "type": "number", "title": "org.apache.kafka.connect.data.Time", "flink.precision": 2, "connect.type": "int32", "flink.version": "1" }
TIMESTAMP¶
- Iceberg type: optional timestamp
- Support status: Supported
- JSON input schema
"TIMESTAMP": { "type":"number", "flink.precision":2, "flink.type":"timestamp", "connect.type":"int64", "flink.version":"1" }
TIMESTAMP_LTZ¶
- Iceberg type: required timestamptz
- Support status: Supported
- JSON input schema
"TIMESTAMP_LTZ": { "type": "number", "title": "org.apache.kafka.connect.data.Timestamp", "flink.precision": 2, "connect.type": "int64", "flink.version": "1" }
TINYINT¶
- Iceberg type: required int
- Support status: Supported
- JSON input schema
"TINYINT": { "type": "number", "connect.type": "int8" }
VARBINARY¶
- Iceberg type: required binary
- Support status: Supported
- JSON input schema
"VARBINARY": { "type": "string", "connect.type": "bytes", "flink.maxLength": 123, "flink.version": "1" }
VARCHAR¶
- Iceberg type: required string
- Support status: Supported
- JSON input schema
"VARCHAR": { "type": "string", "maxLength": 123 }
Protobuf schema type mapping¶
The following section describes Tableflow support for materializing Protobuf schema types.
Protobuf schema types are shown with corresponding Iceberg types and example Protobuf schemas.
ARRAY | BOOLEAN |
BIGINT | BINARY |
CHAR | DATE |
DECIMAL | DOUBLE |
FLOAT | INT |
MAP_K_V | ROW |
SMALLINT | TIMESTAMP |
TIMESTAMP_LTZ | TIME_WITHOUT_TIME_ZONE |
TINYINT | VARCHAR |
ARRAY¶
- Iceberg type: required list<long>
- Support status: Supported
- Protobuf schema
repeated int64 ARRAY = 19;
BOOLEAN¶
- Iceberg type: optional boolean
- Support status: Supported
- Protobuf schema
optional bool BOOLEAN = 3;
BIGINT¶
- Iceberg type: optional long
- Support status: Supported
- Protobuf schema
optional int64 BIGINT = 1;
BINARY¶
- Iceberg type: optional fixed[6]
- Support status: Supported
- Protobuf schema
optional bytes BINARY = 2 [(confluent.field_meta) = { doc: "Example field documentation", params: [ { key: "flink.maxLength", value: "6" }, { key: "flink.minLength", value: "6" }, { key: "flink.version", value: "1" } ] }];
CHAR¶
- Iceberg type: optional string
- Support status: Supported
- Protobuf schema
optional string CHAR = 4 [(confluent.field_meta) = { params: [ { key: "flink.version", value: "1" }, { key: "flink.minLength", value: "123" }, { key: "flink.maxLength", value: "123" } ] }];
DATE¶
- Iceberg type: optional date
- Support status: Supported
- Protobuf schema
optional .google.type.Date DATE = 5;
DECIMAL¶
- Iceberg type: optional decimal(5, 1)
- Support status: Supported
- Protobuf schema
optional .confluent.type.Decimal DECIMAL = 7 [(confluent.field_meta) = { params: [ { key: "flink.version", value: "1" }, { value: "1", key: "scale" }, { value: "5", key: "precision" } ] }];
DOUBLE¶
- Iceberg type: optional double
- Support status: Supported
- Protobuf schema
optional double DOUBLE = 6;
FLOAT¶
- Iceberg type: optional float
- Support status: Supported
- Protobuf schema
optional float FLOAT = 8;
MAP_K_V¶
- Iceberg type: required map<string, long>
- Support status: Supported
- Protobuf schema
repeated MapEntry MAP_K_V = 10; message MapEntry { option map_entry = true; optional string key = 1; optional int64 value = 2; }
ROW¶
- Iceberg type: optional struct<32: a: optional string, 33: b: optional double>
- Support status: Supported
- Protobuf schema
optional meta_Row ROW = 11; message meta_Row { optional string a = 1; optional double b = 2; }
SMALLINT¶
- Iceberg type: optional int
- Support status: Supported
- Protobuf schema
optional int32 SMALLINT = 12 [(confluent.field_meta) = { params: [ { key: "flink.version", value: "1" }, { key: "connect.type", value: "int16" } ] }];
TIMESTAMP¶
- Iceberg type: optional timestamp
- Support status: Supported
- Protobuf schema
optional .google.protobuf.Timestamp TIMESTAMP = 13 [(confluent.field_meta) = { params: [ { key: "flink.version", value: "1" }, { key: "flink.type", value: "timestamp" }, { key: "flink.precision", value: "3" } ] }];
TIMESTAMP_LTZ¶
- Iceberg type: optional timestamptz
- Support status: Supported
- Protobuf schema
optional .google.protobuf.Timestamp TIMESTAMP_LTZ = 14 [(confluent.field_meta) = { params: [ { key: "flink.precision", value: "3" }, { key: "flink.version", value: "1" } ] }];
TIME_WITHOUT_TIME_ZONE¶
- Iceberg type: optional time
- Support status: Supported
- Protobuf schema
optional .google.type.TimeOfDay TIME_WITHOUT_TIME_ZONE = 15 [(confluent.field_meta) = { params: [ { key: "flink.precision", value: "3" }, { key: "flink.version", value: "1" } ] }];
TINYINT¶
- Iceberg type: optional binary
- Support status: Supported
- Protobuf schema
optional int32 TINYINT = 16 [(confluent.field_meta) = { params: [ { key: "flink.version", value: "1" }, { key: "connect.type", value: "int8" } ] }];
VARCHAR¶
- Iceberg type: optional string
- Support status: Supported
- Protobuf schema
optional string VARCHAR = 18 [(confluent.field_meta) = { params: [ { key: "flink.maxLength", value: "123" }, { key: "flink.version", value: "1" } ] }];