Data Type Mappings in Confluent Cloud for Apache Flink
Confluent Cloud for Apache Flink® supports records in the Avro Schema Registry, JSON_SR, and Protobuf Schema Registry formats.
Avro schemas
Known limitations
Avro enums have limited support. Flink supports reading and writing enums but treats them as a STRING type. From Flink’s perspective, enums are not distinguishable from the STRING type. You can’t create an Avro schema from Flink that has an enum field.
Flink doesn’t support reading Avro
time-microsas a TIME type. Flink supports TIME with precision up to3.time-microsis read and written as BIGINT.Field names must match Avro criteria. Avro expects field names to start with
[A-Za-z_]and subsequently contain only[A-Za-z0-9_].These Flink types are not supported:
INTERVAL_DAY_TIME
INTERVAL_YEAR_MONTH
TIMESTAMP_WITH_TIMEZONE
Flink SQL types to Avro types
The following table shows the mapping of Flink SQL types to Avro physical types.
This mapping is important for creating tables, because it defines the Avro schema that’s produced by a CREATE TABLE statement.
ARRAY
Avro type:
arrayAvro logical type: –
Additional properties: –
Example:
{ "type" : "array", "items" : "long" }
BIGINT
Avro type:
longAvro logical type: –
Additional properties: –
Example:
long
BINARY
Avro type:
fixedAvro logical type: –
Additional properties:
flink.maxLength(MAX_LENGTH if not set)Example:
{ "type" : "fixed", "name" : "row", "namespace" : "io.confluent", "size" : 123 }
BOOLEAN
Avro type:
booleanAvro logical type: –
Additional properties: –
Example:
boolean
CHAR
Avro type:
stringAvro logical type: –
Additional properties:
flink.maxLength(MAX_LENGTH if not set)Example:
{ "type" : "string", "flink.maxLength" : 123, "flink.minLength" : 123, "flink.version" : "1" }
DATE
Avro type:
intAvro logical type:
dateAdditional properties: –
Example:
{ "type" : "int", "logicalType" : "date" }
DECIMAL
Avro type:
bytesAvro logical type:
decimalAdditional properties: –
Example:
{ "type" : "bytes", "logicalType" : "decimal", "precision" : 6, "scale" : 3 }
DOUBLE
Avro type:
doubleAvro logical type: –
Additional properties: –
Example:
double
FLOAT
Avro type:
floatAvro logical type: –
Additional properties: –
Example:
float
INT
Avro type:
intAvro logical type: –
Additional properties: –
Example:
int
MAP (character key)
Avro type:
mapAvro logical type: –
Additional properties: –
Example:
{ "type" : "map", "values" : "boolean" }
MAP (non-character key)
Avro type:
arrayAvro logical type: –
Additional properties: array of
io.confluent.connect.avro.MapEntry(key, value)Example:
{ "type" : "array", "items" : { "type" : "record", "name" : "MapEntry", "namespace" : "io.confluent.connect.avro", "fields" : [ { "name" : "key", "type" : "int" }, { "name" : "value", "type" : "bytes" } ] } }
MULTISET (character element)
Avro type:
mapAvro logical type: –
Additional properties:
flink.type : multisetExample:
{ "type" : "map", "values" : "int", "flink.type" : "multiset", "flink.version" : "1" }
MULTISET (non-character key)
Avro type:
arrayAvro logical type: –
Additional properties: array of
io.confluent.connect.avro.MapEntry(key, value),flink.type : multisetExample:
{ "type" : "array", "items" : { "type" : "record", "name" : "MapEntry", "namespace" : "io.confluent.connect.avro", "fields" : [ { "name" : "key", "type" : "long" }, { "name" : "value", "type" : "int" } ] }, "flink.type" : "multiset", "flink.version" : "1" }
ROW
Avro type:
recordAvro logical type: –
Additional properties:
connect.type=int16Name:
org.apache.flink.avro.generated.recordNested records name:
org.apache.flink.avro.generated.record_$fieldNameExample:
{ "type" : "record", "name" : "row", "namespace" : "io.confluent", "fields" : [ { "name" : "f0", "type" : "long", "doc" : "field comment" } ] }
SMALLINT
Avro type:
intAvro logical type: –
Additional properties:
connect.type=int16Example:
{ "type" : "int", "connect.type" : "int16" }
STRING / VARCHAR
Avro type:
stringAvro logical type: –
Additional properties:
flink.maxLength = flink.minLength(MAX_LENGTH if not set)Example:
{ "type" : "string", "flink.maxLength" : 123, "flink.version" : "1" }
TIME
Avro type:
intAvro logical type:
time-millisAdditional properties:
flink.precision(default: 3, max supported: 3)Example:
{ "type" : "int", "flink.precision" : 2, "flink.version" : "1", "logicalType" : "time-millis" }
TIMESTAMP
Avro type:
longAvro logical type:
local-timestamp-millis/local-timestamp-microsAdditional properties:
flink.precision(default: 3/6, max supported: 3/9)Example:
{ "type" : "long", "flink.precision" : 2, "flink.version" : "1", "logicalType" : "local-timestamp-millis" }
TIMESTAMP_LTZ
Avro type:
longAvro logical type:
timestamp-millis/timestamp-microsAdditional properties:
flink.precision(default: 3/6, max supported: 3/9)Example:
{ "type" : "long", "flink.precision" : 2, "flink.version" : "1", "logicalType" : "timestamp-millis" }
TINYINT
Avro type:
intAvro logical type: –
Additional properties:
connect.type=int8Example:
{ "type" : "int", "connect.type" : "int8" }
VARBINARY
Avro type:
bytesAvro logical type: –
Additional properties:
flink.maxLength(MAX_LENGTH if not set)Example:
{ "type" : "bytes", "flink.maxLength" : 123, "flink.version" : "1" }
Avro types to Flink SQL types
The following table shows the mapping of Avro types to Flink SQL and types. It shows only mappings that are not covered by the previous table. These types can’t originate from Flink SQL.
This mapping is important when consuming/reading records with a schema that was created outside of Flink. The mapping defines the Flink table’s schema inferred from an Avro schema.
Flink SQL supports reading and writing nullable types. A nullable type is mapped to an Avro union(avro_type, null), with the avro_type converted from the corresponding Flink type.
Avro type | Avro logical type | Flink SQL type | Example |
|---|---|---|---|
long | time-micros | BIGINT | – |
enum | – | STRING | – |
union with null type (null + one other type) | – | NULLABLE(type) | – |
union (other unions) | – | ROW(type_name Type0, …) | [
"long",
"string",
{
"type": "record",
"name": "User",
"namespace": "io.test1",
"fields": [
{
"name": "f0",
"type": "long"
}
]
}
]
|
string (uuid) | – | STRING | – |
fixed (duration) | – | BINARY(size) | – |
JSON Schema
Flink SQL types to JSON Schema types
The following table shows the mapping of Flink SQL types to JSON Schema types.
This mapping is important for creating tables, because it defines the JSON Schema that’s produced by a CREATE TABLE statement.
Nullable types are expressed as oneOf(Null, T).
Object for a MAP and MULTISET must have two fields [key, value].
MULTISET is equivalent to MAP[K, INT] and is serialized accordingly.
ARRAY
JSON Schema type:
ArrayAdditional properties: –
JSON type title: –
Example:
{ "type": "array", "items": { "type": "number", "title": "org.apache.kafka.connect.data.Time", "flink.precision": 2, "connect.type": "int32", "flink.version": "1" } }
BIGINT
JSON Schema type:
NumberAdditional properties:
connect.type=int64JSON type title: –
Example:
{ "type": "number", "connect.type": "int64" }
BINARY
JSON Schema type:
StringAdditional properties:
connect.type=bytesflink.minLength=flink.maxLength: Different from JSON’sminLength/maxLength, because this property describes bytes length, not string length.
JSON type title: –
Example:
{ "type": "string", "flink.maxLength": 123, "flink.minLength": 123, "flink.version": "1", "connect.type": "bytes" }
BOOLEAN
JSON Schema type:
BooleanAdditional properties: –
JSON type title: –
Example:
{ "type": "array", "items": { "type": "number", "title": "org.apache.kafka.connect.data.Time", "flink.precision": 2, "connect.type": "int32", "flink.version": "1" } }
CHAR
JSON Schema type:
StringAdditional properties:
minLength=maxLengthJSON type title: –
Example:
{ "type": "string", "minLength": 123, "maxLength": 123 }
DATE
JSON Schema type:
NumberAdditional properties:
connect.type=int32JSON type title:
org.apache.kafka.connect.data.DateExample: –
DECIMAL
JSON Schema type:
NumberAdditional properties:
connect.type=bytesJSON type title:
org.apache.kafka.connect.data.DecimalExample: –
DOUBLE
JSON Schema type:
NumberAdditional properties:
connect.type=float64JSON type title: –
Example:
{ "type": "number", "connect.type": "float64" }
FLOAT
JSON Schema type:
NumberAdditional properties:
connect.type=float32JSON type title: –
Example:
{ "type": "number", "connect.type": "float32" }
INT
JSON Schema type:
NumberAdditional properties:
connect.type=int32JSON type title: –
Example:
{ "type": "number", "connect.type": "int32" }
MAP[K, V]
JSON Schema type:
Array[Object]Additional properties:
connect.type=mapJSON type title: –
Example:
{ "type": "array", "connect.type": "map", "items": { "type": "object", "properties": { "value": { "type": "number", "connect.type": "int64" }, "key": { "type": "number", "connect.type": "int32" } } } }
MAP[VARCHAR, V]
JSON Schema type:
ObjectAdditional properties:
connect.type=mapJSON type title: –
Example:
{ "type":"object", "connect.type":"map", "additionalProperties": { "type":"number", "connect.type":"int64" } }
MULTISET[K]
JSON Schema type:
Array[Object]Additional properties:
connect.type=mapflink.type=multiset
JSON type title: The count (value) in the JSON schema must map to a Flink INT type. For MULTISET types, the count (value) in the JSON schema must map to a Flink INT type, which corresponds to
connect.type: int32in the JSON Schema. Usingconnect.type: int64causes a validation error.Example:
{ "type": "array", "connect.type": "map", "flink.type": "multiset", "items": { "type": "object", "properties": { "value": { "type": "number", "connect.type": "int32" }, "key": { "type": "number", "connect.type": "int32" } } } }
MULTISET[VARCHAR]
JSON Schema type:
ObjectAdditional properties:
connect.type=mapflink.type=multiset
JSON type title: The count (value) in the JSON schema must map to a Flink INT type. For MULTISET types, the count (value) in the JSON schema must map to a Flink INT type, which corresponds to
connect.type: int32in the JSON Schema. Usingconnect.type: int64causes a validation error.Example:
{ "type": "object", "connect.type": "map", "flink.type": "multiset", "additionalProperties": { "type": "number", "connect.type": "int32" } }
ROW
JSON Schema type:
ObjectAdditional properties: –
JSON type title: –
Example: –
SMALLINT
JSON Schema type:
NumberAdditional properties:
connect.type=int16JSON type title: –
Example:
{ "type": "number", "connect.type": "int16" }
TIME
JSON Schema type:
NumberAdditional properties:
connect.type=int32flink.precision
JSON type title:
org.apache.kafka.connect.data.TimeExample:
{ "type":"number", "title":"org.apache.kafka.connect.data.Time", "flink.precision":2, "connect.type":"int32", "flink.version":"1" }
TIMESTAMP
JSON Schema type:
NumberAdditional properties:
connect.type=int64flink.precisionflink.type=timestamp
JSON type title:
org.apache.kafka.connect.data.TimestampExample:
{ "type":"number", "title":"org.apache.kafka.connect.data.Timestamp", "flink.precision":2, "flink.type":"timestamp", "connect.type":"int64", "flink.version":"1" }
TIMESTAMP_LTZ
JSON Schema type:
NumberAdditional properties:
connect.type=int64flink.precision
JSON type title:
org.apache.kafka.connect.data.TimestampExample:
{ "type":"number", "title":"org.apache.kafka.connect.data.Timestamp", "flink.precision":2, "connect.type":"int64", "flink.version":"1" }
TINYINT
JSON Schema type:
NumberAdditional properties:
connect.type=int8JSON type title: –
Example:
{ "type": "number", "connect.type": "int8" }
VARBINARY
JSON Schema type:
StringAdditional properties:
connect.type=bytesflink.maxLength: Different from JSON’smaxLength, because this property describes bytes length, not string length.
JSON type title: –
Example:
{ "type": "string", "flink.maxLength": 123, "flink.version": "1", "connect.type": "bytes" }
VARCHAR
JSON Schema type:
StringAdditional properties:
maxLengthJSON type title: –
Example:
{ "type": "string", "maxLength": 123 }
JSON types to Flink SQL types
The following table shows the mapping of JSON types to Flink SQL types. It shows only mappings that are not covered by the previous table. These types can’t originate from Flink SQL.
This mapping is important when consuming/reading records with a schema that was created outside of Flink. The mapping defines the Flink table’s schema inferred from JSON Schema.
JSON type | Flink SQL type |
|---|---|
Combined | ROW |
Enum | VARCHAR |
Number(requiresInteger=true) | BIGINT |
Number(requiresInteger=false) | DOUBLE |
JSON deserialization conversion behavior
When deserializing JSON data, Confluent Cloud for Apache Flink® enforces specific conversion rules when an incoming data type does not exactly match the target type defined in the Flink SQL table schema.
If an incoming JSON type cannot be safely converted to the target SQL type, the statement will fail with a deserialization error.
The following matrix shows the allowed conversions. The “From” type represents the data type in the incoming JSON record, and the “To” type represents the target data type defined in your Flink SQL table.
From/To | ARRAY | BOOLEAN | NUMBER | OBJECT | STRING |
|---|---|---|---|---|---|
ARRAY | Allowed | Fail | Fail | Fail | Allowed [1] |
BOOLEAN | Fail | Allowed | Fail | Fail | Allowed [1] |
NUMBER | Fail | Fail | Allowed | Fail | Allowed [1] |
OBJECT | Fail | Fail | Fail | Allowed | Allowed [1] |
STRING | Fail | Allowed [2] | Allowed [3] | Fail | Allowed [1] |
Conversion Rule Details:
Protobuf schema
Flink SQL types to Protobuf types
The following table shows the mapping of Flink SQL types to Protobuf types.
This mapping is important for creating tables, because it defines the Protobuf schema that’s produced by a CREATE TABLE statement.
ARRAY[T]
Protobuf type:
repeated TMessage type: –
Additional properties:
flink.wrapped, which indicates that Flink wrappers are used to represent nullability, because Protobuf doesn’t support nullable repeated natively.Example:
repeated int64 value = 1;
Nullable array:
arrayNullableRepeatedWrapper arrayNullable = 1 [(confluent.field_meta) = { params: [ { key: "flink.wrapped", value: "true" }, { key: "flink.version", value: "1" } ] }]; message arrayNullableRepeatedWrapper { repeated int64 value = 1; }
Nullable elements:
repeated elementNullableElementWrapper elementNullable = 2 [(confluent.field_meta) = { params: [ { key: "flink.wrapped", value: "true" }, { key: "flink.version", value: "1" } ] }]; message elementNullableElementWrapper { optional int64 value = 1; }
BIGINT
Protobuf type:
INT64Message type: –
Additional properties: –
Example:
optional int64 bigint = 8;
BINARY
Protobuf type:
BYTESMessage type: –
Additional properties:
flink.maxLength=flink.minLengthExample:
optional bytes binary = 13 [(confluent.field_meta) = { params: [ { key: "flink.maxLength", value: "123" }, { key: "flink.minLength", value: "123" }, { key: "flink.version", value: "1" } ] }];
BOOLEAN
Protobuf type:
BOOLMessage type: –
Additional properties: –
Example:
optional bool boolean = 2;
CHAR
Protobuf type:
STRINGMessage type: –
Additional properties:
flink.maxLength=flink.minLengthExample:
optional string char = 11 [(confluent.field_meta) = { params: [ { key: "flink.maxLength", value: "123" }, { key: "flink.minLength", value: "123" }, { key: "flink.version", value: "1" } ] }];
DATE
Protobuf type:
MESSAGEMessage type:
google.type.DateAdditional properties: –
Example:
optional .google.type.Date date = 17;
DECIMAL
Protobuf type:
MESSAGEMessage type:
confluent.type.DecimalAdditional properties: –
Example:
optional .confluent.type.Decimal decimal = 19 [(confluent.field_meta) = { params: [ { value: "5", key: "precision" }, { value: "1", key: "scale" }, { key: "flink.version", value: "1" } ] }];
DOUBLE
Protobuf type:
DOUBLEMessage type: –
Additional properties: –
Example:
optional double double = 10;
FLOAT
Protobuf type:
FLOATMessage type: –
Additional properties: –
Example:
optional float float = 9;
INT
Protobuf type:
INT32Message type: –
Additional properties: –
Example:
optional int32 int = 7;
MAP[K, V]
Protobuf type:
repeated MESSAGEMessage type:
XXEntry(K key, V value)Additional properties:
flink.wrapped, which indicates that Flink wrappers are used to represent nullability, because Protobuf doesn’t support nullable repeated natively. For examples, see the ARRAY type.Example:
repeated MapEntry map = 20; message MapEntry { optional string key = 1; optional int64 value = 2; }
MULTISET[V]
Protobuf type:
repeated MESSAGEMessage type:
XXEntry(V key, int32 value)Additional properties:
flink.wrapped, which indicates that Flink wrappers are used to represent nullability, because Protobuf doesn’t support nullable repeated natively. For examples, see the ARRAY type.flink.type=multiset
Example:
repeated MultisetEntry multiset = 1 [(confluent.field_meta) = { params: [ { key: "flink.type", value: "multiset" }, { key: "flink.version", value: "1" } ] }]; message MultisetEntry { optional string key = 1; int32 value = 2; }
ROW
Protobuf type:
MESSAGEMessage type:
fieldNameAdditional properties: –
Example:
meta_Row meta = 1; message meta_Row { float a = 1; float b = 2; }
SMALLINT
Protobuf type:
INT32Message type: –
Additional properties: MetaProto extension:
connect.type = int16Example:
optional int32 smallInt = 6 [(confluent.field_meta) = { doc: "smallInt comment", params: [ { key: "flink.version", value: "1" }, { key: "connect.type", value: "int16" } ] }];
TIMESTAMP
Protobuf type:
MESSAGEMessage type:
google.protobuf.TimestampAdditional properties:
flink.precisionflink.type=timestamp
Example:
optional .google.protobuf.Timestamp timestamp_ltz_3 = 16 [(confluent.field_meta) = { params: [ { key: "flink.type", value: "timestamp" }, { key: "flink.precision", value: "3" }, { key: "flink.version", value: "1" } ] }];
TIMESTAMP_LTZ
Protobuf type:
MESSAGEMessage type:
google.protobuf.TimestampAdditional properties:
flink.precisionExample:
optional .google.protobuf.Timestamp timestamp_ltz_3 = 15 [(confluent.field_meta) = { params: [ { key: "flink.precision", value: "3" }, { key: "flink.version", value: "1" } ] }];
TIME_WITHOUT_TIME_ZONE
Protobuf type:
MESSAGEMessage type:
google.type.TimeOfDayAdditional properties: –
Example:
optional .google.type.TimeOfDay time = 18 [(confluent.field_meta) = { params: [ { key: "flink.precision", value: "3" }, { key: "flink.version", value: "1" } ] }];
TINYINT
Protobuf type:
INT32Message type: –
Additional properties: MetaProto extension:
connect.type = int8Example:
optional int32 tinyInt = 4 [(confluent.field_meta) = { doc: "tinyInt comment", params: [ { key: "flink.version", value: "1" }, { key: "connect.type", value: "int8" } ] }];
VARBINARY
Protobuf type:
BYTESMessage type: –
Additional properties:
flink.maxLength(default = MAX_LENGTH)Example:
optional bytes varbinary = 14 [(confluent.field_meta) = { params: [ { key: "flink.maxLength", value: "123" }, { key: "flink.version", value: "1" } ] }];
VARCHAR
Protobuf type:
STRINGMessage type: –
Additional properties:
flink.maxLength(default = MAX_LENGTH)Example:
optional string varchar = 12 [(confluent.field_meta) = { params: [ { key: "flink.maxLength", value: "123" }, { key: "flink.version", value: "1" } ] }];
Protobuf types to Flink SQL types
The following table shows the mapping of Protobuf types to Flink SQL and Connect types. It shows only mappings that are not covered by the previous table. These types can’t originate from Flink SQL.
This mapping is important when consuming/reading records with a schema that was created outside of Flink. The mapping defines the Flink table’s schema inferred from a Protobuf schema.
Protobuf type | Flink SQL type | Message type | Connect type annotation |
|---|---|---|---|
FIXED32 | FIXED64 | SFIXED64 | BIGINT | – | – |
INT32 | SINT32 | SFIXED32 | INT | – | – |
INT32 | SINT32 | SFIXED32 | SMALLINT | – | int16 |
INT32 | SINT32 | SFIXED32 | TINYINT | – | int8 |
INT64 | SINT64 | BIGINT | – | – |
UINT32 | UINT64 | BIGINT | – | – |
MESSAGE | BIGINT | google.protobuf.Int64Value | – |
MESSAGE | BIGINT | google.protobuf.UInt64Value | – |
MESSAGE | BIGINT | google.protobuf.UInt32Value | – |
MESSAGE | BOOLEAN | google.protobuf.BoolValue | – |
MESSAGE | DOUBLE | google.protobuf.DoubleValue | – |
MESSAGE | FLOAT | google.protobuf.FloatValue | – |
MESSAGE | INT | google.protobuf.Int32Value | – |
MESSAGE | VARBINARY | google.protobuf.BytesValue | – |
MESSAGE | VARCHAR | google.protobuf.StringValue | – |
oneOf | ROW | – | – |
Protobuf 3 nullable field behavior
When working with Protobuf 3 schemas in Confluent Cloud for Apache Flink, it’s important to understand how nullable fields are handled.
When converting to a Protobuf schema, Flink marks all NULLABLE fields as optional.
In Protobuf, expressing something as NULLABLE or NOT NULL is not straightforward.
All non-MESSAGE types are NOT NULL. If not set explicitly, the default value is assigned.
Non-MESSAGE types marked with
optionalcan be checked if they were set. If not set, Flink assumes NULL.MESSAGE types are all NULLABLE, which means that all fields of MESSAGE type are optional, and there is no way to ensure on a format level they are NOT NULL. To store this information, Flink uses the
flink.notNullproperty, for example:message Row { .google.type.Date date = 1 [(confluent.field_meta) = { params: [ { key: "flink.version", value: "1" }, { key: "flink.notNull", value: "true" } ] }]; }
- Fields without the
optionalkeyword In Protobuf 3, fields without the
optionalkeyword are treated as NOT NULL by Flink. This is because Protobuf 3 doesn’t support nullable getters/setters by default. If a field is omitted in the data, Protobuf 3 assigns the default value, which is 0 for numbers, the empty string for strings, andfalsefor booleans.- Fields with the
optionalkeyword Fields marked with
optionalin Protobuf 3 are treated as nullable by Flink. When such a field is not set in the data, Flink interprets it as NULL.- Fields with the
repeatedkeyword Fields marked with
repeatedin Protobuf 3 are treated as arrays by Flink. The array itself is NOT NULL, but individual elements within the array can be nullable depending on their type. For MESSAGE types, elements are nullable by default. For primitive types, elements are NOT NULL.
This behavior is consistent across all streaming platforms that work with Protobuf 3, including Kafka Streams and other Confluent products, and is not specific to Flink. It’s a fundamental characteristic of the Protobuf 3 specification itself.
In a Protobuf 3 schema, if you want a field to be nullable in Flink, you must explicitly mark it as optional, for example:
message Example {
string required_field = 1; // NOT NULL in Flink
optional string nullable_field = 2; // NULLABLE in Flink
repeated string array_field = 3; // NOT NULL array in Flink
repeated optional string nullable_array_field = 4; // NOT NULL array with nullable elements
}