Data Type Mappings in Confluent Cloud for Apache Flink¶
Confluent Cloud for Apache Flink® supports records in the Avro Schema Registry, JSON_SR, and Protobuf Schema Registry formats.
Avro schemas¶
Known limitations¶
- Avro enums have limited support. Flink supports reading and writing enums but treats them as a STRING type. From Flink’s perspective, enums are not distinguishable from the STRING type. You can’t create an Avro schema from Flink that has an enum field.
- Flink doesn’t support reading Avro
time-micros
as a TIME type. Flink supports TIME with precision up to3
.time-micros
is read and written as BIGINT. - Field names must match Avro criteria. Avro expects field names to start
with
[A-Za-z_]
and subsequently contain only[A-Za-z0-9_]
. - These Flink types are not supported:
- INTERVAL_DAY_TIME
- INTERVAL_YEAR_MONTH
- TIMESTAMP_WITH_TIMEZONE
Flink SQL types to Avro types¶
The following table shows the mapping of Flink SQL types to Avro physical types.
This mapping is important for creating tables, because it defines the Avro schema that’s produced by a CREATE TABLE statement.
Flink SQL type | Avro type | Avro logical type | Example | Additional properties |
---|---|---|---|---|
ARRAY | array | – | {
"type" : "array",
"items" : "long"
}
|
– |
BIGINT | long | – | long |
– |
BINARY | fixed | – | {
"type" : "fixed",
"name" : "row",
"namespace" : "io.confluent",
"size" : 123
}
|
flink.maxLength (MAX_LENGTH if not set) |
BOOLEAN | boolean | – | boolean |
– |
CHAR | string | – | {
"type" : "string",
"flink.maxLength" : 123,
"flink.minLength" : 123,
"flink.version" : "1"
}
|
flink.maxLength (MAX_LENGTH if not set) |
DATE | int | date | {
"type" : "int",
"logicalType" : "date"
}
|
– |
DECIMAL | bytes | decimal | {
"type" : "bytes",
"logicalType" : "decimal",
"precision" : 6,
"scale" : 3
}
|
– |
DOUBLE | double | – | double |
– |
FLOAT | float | – | float |
– |
INT | int | – | int |
– |
MAP (character key) | map | – | {
"type" : "map",
"values" : "boolean"
}
|
– |
MAP (non-character key) | array | – | {
"type" : "array",
"items" : {
"type" : "record",
"name" : "MapEntry",
"namespace" : "io.confluent.connect.avro",
"fields" : [ {
"name" : "key",
"type" : "int"
}, {
"name" : "value",
"type" : "bytes"
} ]
}
}
|
array of io.confluent.connect.avro.MapEntry(key, value) |
MULTISET (character element) | map | – | {
"type" : "map",
"values" : "int",
"flink.type" : "multiset",
"flink.version" : "1"
}
|
flink.type : multiset |
MULTISET (non-character key) | array | – | {
"type" : "array",
"items" : {
"type" : "record",
"name" : "MapEntry",
"namespace" : "io.confluent.connect.avro",
"fields" : [ {
"name" : "key",
"type" : "long"
}, {
"name" : "value",
"type" : "int"
} ]
},
"flink.type" : "multiset",
"flink.version" : "1"
}
|
array of io.confluent.connect.avro.MapEntry(key, value) flink.type : multiset |
ROW | record 1 | – | {
"type" : "record",
"name" : "row",
"namespace" : "io.confluent",
"fields" : [ {
"name" : "f0",
"type" : "long",
"doc" : "field comment"
} ]
}
|
connect.type=int16 |
SMALLINT | int | – | {
"type" : "int",
"connect.type" : "int16"
}
|
connect.type=int16 |
STRING / VARCHAR | string | – | {
"type" : "string",
"flink.maxLength" : 123,
"flink.version" : "1"
}
|
flink.maxLength = flink.minLength (MAX_LENGTH if not set) |
TIME | int | time-millis | {
"type" : "int",
"flink.precision" : 2,
"flink.version" : "1",
"logicalType" : "time-millis"
}
|
flink.precision (default: 3, max supported: 3) |
TIMESTAMP | long | local-timestamp-millis/local-timestamp-micros | {
"type" : "long",
"flink.precision" : 2,
"flink.version" : "1",
"logicalType" : "local-timestamp-millis"
}
|
flink.precision (default: 3/6, max supported: 3/9) |
TIMESTAMP_LTZ | long | timestamp-millis / timestamp-micros | {
"type" : "long",
"flink.precision" : 2,
"flink.version" : "1",
"logicalType" : "timestamp-millis"
}
|
flink.precision (default: 3/6, max supported: 3/9) |
TINYINT | int | – | {
"type" : "int",
"connect.type" : "int8"
}
|
connect.type=int8 |
VARBINARY | bytes | – | {
"type" : "bytes",
"flink.maxLength" : 123,
"flink.version" : "1"
}
|
flink.maxLength (MAX_LENGTH if not set) |
1 For the record
type:
- name:
org.apache.flink.avro.generated.record
- nested records name:
org.apache.flink.avro.generated.record_$fieldName
Avro types to Flink SQL types¶
The following table shows the mapping of Avro types to Flink SQL and types. It shows only mappings that are not covered by the previous table. These types can’t originate from Flink SQL.
This mapping is important when consuming/reading records with a schema that was created outside of Flink. The mapping defines the Flink table’s schema inferred from an Avro schema.
Flink SQL supports reading and writing nullable types. A nullable type is
mapped to an Avro union(avro_type, null)
, with the avro_type
converted
from the corresponding Flink type.
Avro type | Avro logical type | Flink SQL type | Example |
---|---|---|---|
long | time-micros | BIGINT | – |
enum | – | STRING | – |
union with null type (null + one other type) | – | NULLABLE(type) | – |
union (other unions) | – | ROW(type_name Type0, …) | [
"long",
"string",
{
"type": "record",
"name": "User",
"namespace": "io.test1",
"fields": [
{
"name": "f0",
"type": "long"
}
]
}
]
|
string (uuid) | – | STRING | – |
fixed (duration) | – | BINARY(size) | – |
JSON schemas¶
Flink SQL types to JSON types¶
The following table shows the mapping of Flink SQL types to JSON types.
This mapping is important for creating tables, because it defines the JSON schema that’s produced by a CREATE TABLE statement.
- Nullable types are expressed as oneOf(Null, T).
- Object for a MAP and MULTISET must have two fields [key, value].
- MULTISET is equivalent to MAP[K, INT] and is serialized accordingly.
Flink SQL type | JSON type | Example | Additional properties | JSON type title |
---|---|---|---|---|
ARRAY | Array | {
"type": "array",
"items": {
"type": "number",
"title": "org.apache.kafka.connect.data.Time",
"flink.precision": 2,
"connect.type": "int32",
"flink.version": "1"
}
}
|
– | – |
BIGINT | Number | {
"type": "number",
"connect.type": "int64"
}
|
connect.type=int64 | – |
BINARY | String | {
"type": "string",
"flink.maxLength": 123,
"flink.minLength": 123,
"flink.version": "1",
"connect.type": "bytes"
}
|
connect.type=bytes flink.minLength=flink.maxLength 1 | – |
BOOLEAN | Boolean | {
"type": "array",
"items": {
"type": "number",
"title": "org.apache.kafka.connect.data.Time",
"flink.precision": 2,
"connect.type": "int32",
"flink.version": "1"
}
}
|
– | – |
CHAR | String | {
"type": "string",
"minLength": 123,
"maxLength": 123
}
|
minLength=maxLength | – |
DATE | Number | – | connect.type=int32 | org.apache.kafka.connect.data.Date |
DECIMAL | Number | – | connect.type=bytes | org.apache.kafka.connect.data.Decimal |
DOUBLE | Number | {
"type": "number",
"connect.type": "float64"
}
|
connect.type=float64 | – |
FLOAT | Number | {
"type": "number",
"connect.type": "float32"
}
|
connect.type=float32 | – |
INT | Number | {
"type": "number",
"connect.type": "int32"
}
|
connect.type=int32 | – |
MAP[K, V] | Array[Object] | {
"type": "array",
"connect.type": "map",
"items": {
"type": "object",
"properties": {
"value": {
"type": "number",
"connect.type": "int64"
},
"key": {
"type": "number",
"connect.type": "int32"
}
}
}
}
|
connect.type=map | – |
MAP[VARCHAR, V] | Object | {
"type":"object",
"connect.type":"map",
"additionalProperties":
{
"type":"number",
"connect.type":"int64"
}
}
|
connect.type=map | – |
MULTISET[K] | Array[Object] | {
"type": "array",
"connect.type": "map",
"flink.type": "multiset",
"items": {
"type": "object",
"properties": {
"value": {
"type": "number",
"connect.type": "int64"
},
"key": {
"type": "number",
"connect.type": "int32"
}
}
}
}
|
connect.type=map flink.type=multiset | – |
MULTISET[VARCHAR] | Object | {
"type": "object",
"connect.type": "map",
"flink.type": "multiset",
"additionalProperties": {
"type": "number",
"connect.type": "int64"
}
}
|
connect.type=map flink.type=multiset | – |
ROW | Object | – | – | – |
SMALLINT | Number | {
"type": "number",
"connect.type": "int16"
}
|
connect.type=int16 | – |
TIME | Number | {
"type":"number",
"title":"org.apache.kafka.connect.data.Time",
"flink.precision":2,
"connect.type":"int32",
"flink.version":"1"
}
|
connect.type=int32 flink.precision | org.apache.kafka.connect.data.Time |
TIMESTAMP | Number | {
"type":"number",
"title":"org.apache.kafka.connect.data.Timestamp",
"flink.precision":2,
"flink.type":"timestamp",
"connect.type":"int64",
"flink.version":"1"
}
|
connect.type=int64 flink.precision flink.type=timestamp | org.apache.kafka.connect.data.Timestamp |
TIMESTAMP_LTZ | Number | {
"type":"number",
"title":"org.apache.kafka.connect.data.Timestamp",
"flink.precision":2,
"connect.type":"int64",
"flink.version":"1"
}
|
connect.type=int64 flink.precision | org.apache.kafka.connect.data.Timestamp |
TINYINT | Number | {
"type": "number",
"connect.type": "int8"
}
|
connect.type=int8 | – |
VARBINARY | String | {
"type": "string",
"flink.maxLength": 123,
"flink.version": "1",
"connect.type": "bytes"
}
|
connect.type=bytes flink.maxLength 1 | – |
VARCHAR | String | {
"type": "string",
"maxLength": 123
}
|
maxLength | – |
1 Different from JSON’s minLength/maxLength, because this property describes bytes length, not string length.
JSON types to Flink SQL types¶
The following table shows the mapping of JSON types to Flink SQL types. It shows only mappings that are not covered by the previous table. These types can’t originate from Flink SQL.
This mapping is important when consuming/reading records with a schema that was created outside of Flink. The mapping defines the Flink table’s schema inferred from a JSON schema.
JSON type | Flink SQL type |
---|---|
Combined | ROW |
Enum | VARCHAR |
Number(requiresInteger=true) | BIGINT |
Number(requiresInteger=false) | DOUBLE |
Protobuf schemas¶
Flink SQL types to Protobuf types¶
The following table shows the mapping of Flink SQL types to Protobuf types.
This mapping is important for creating tables, because it defines the Protobuf schema that’s produced by a CREATE TABLE statement.
When converting to a Protobuf schema, Flink SQL marks all NULLABLE fields as optional.
In Protobuf, expressing something as NULLABLE or NOT NULL is not straightforward.
All non-MESSAGE types are NOT NULL. If not set explicitly, the default value is assigned.
Non-MESSAGE types marked with “optional” can be checked if they were set. If not set, Flink SQL assumes NULL.
MESSAGE types are all NULLABLE, which means that all fields of MESSAGE type are optional, and there is no way to ensure on a format level they are NOT NULL. To store this information, Flink uses the
flink.notNull
property, for example:message Row { .google.type.Date date = 1 [(confluent.field_meta) = { params: [ { key: "flink.version", value: "1" }, { key: "flink.notNull", value: "true" } ] }]; }
Flink SQL type | Protobuf type | Message type | Example | Comment |
---|---|---|---|---|
ARRAY[T] | repeated T | – | repeated int64 value = 1;
nullable array: arrayNullableRepeatedWrapper arrayNullable = 1 [(confluent.field_meta) = {
params: [
{
key: "flink.wrapped",
value: "true"
},
{
key: "flink.version",
value: "1"
}
]
}];
message arrayNullableRepeatedWrapper {
repeated int64 value = 1;
}
nullable elements: repeated elementNullableElementWrapper elementNullable = 2 [(confluent.field_meta) = {
params: [
{
key: "flink.wrapped",
value: "true"
},
{
key: "flink.version",
value: "1"
}
]
}];
message elementNullableElementWrapper {
optional int64 value = 1;
}
|
flink.wrapped 1 |
BIGINT | INT64 | – | optional int64 bigint = 8;
|
– |
BINARY | BYTES | – | optional bytes binary = 13 [(confluent.field_meta) = {
params: [
{
key: "flink.maxLength",
value: "123"
},
{
key: "flink.minLength",
value: "123"
},
{
key: "flink.version",
value: "1"
}
]
}];
|
flink.maxLength=flink.minLength |
BOOLEAN | BOOL | – | optional bool boolean = 2;
|
– |
CHAR | STRING | – | optional string char = 11 [(confluent.field_meta) = {
params: [
{
key: "flink.maxLength",
value: "123"
},
{
key: "flink.minLength",
value: "123"
},
{
key: "flink.version",
value: "1"
}
]
}];
|
flink.maxLength=flink.minLength |
DATE | MESSAGE | google.type.Date | optional .google.type.Date date = 17;
|
– |
DOUBLE | DOUBLE | – | optional double double = 10;
|
– |
DECIMAL | MESSAGE | confluent.type.Decimal | optional .confluent.type.Decimal decimal = 19 [(confluent.field_meta) = {
params: [
{
value: "5",
key: "precision"
},
{
value: "1",
key: "scale"
},
{
key: "flink.version",
value: "1"
}
]
}];
|
– |
FLOAT | FLOAT | – | optional float float = 9;
|
– |
INT | INT32 | – | optional int32 int = 7;
|
– |
MAP[K, V] | repeated MESSAGE | XXEntry(K key, V value) | repeated MapEntry map = 20;
message MapEntry {
optional string key = 1;
optional int64 value = 2;
}
|
flink.wrapped 1 |
MULTISET[V] | repeated MESSAGE | XXEntry(V key, int32 value) | repeated MultisetEntry multiset = 1 [(confluent.field_meta) = {
params: [
{
key: "flink.type",
value: "multiset"
},
{
key: "flink.version",
value: "1"
}
]
}];
message MultisetEntry {
optional string key = 1;
int32 value = 2;
}
|
flink.wrapped 1 flink.type=multiset |
ROW | MESSAGE | fieldName | meta_Row meta = 1;
message meta_Row {
float a = 1;
float b = 2;
}
|
– |
SMALLINT | INT32 | – | optional int32 smallInt = 6 [(confluent.field_meta) = {
doc: "smallInt comment",
params: [
{
key: "flink.version",
value: "1"
},
{
key: "connect.type",
value: "int16"
}
]
}];
|
MetaProto extension: connect.type = int16 |
TIMESTAMP | MESSAGE | google.protobuf.Timestamp | optional .google.protobuf.Timestamp timestamp_ltz_3 = 16 [(confluent.field_meta) = {
params: [
{
key: "flink.type",
value: "timestamp"
},
{
key: "flink.precision",
value: "3"
},
{
key: "flink.version",
value: "1"
}
]
}];
|
flink.precision flink.type=timestamp |
TIMESTAMP_LTZ | MESSAGE | google.protobuf.Timestamp | optional .google.protobuf.Timestamp timestamp_ltz_3 = 15 [(confluent.field_meta) = {
params: [
{
key: "flink.precision",
value: "3"
},
{
key: "flink.version",
value: "1"
}
]
}];
|
flink.precision |
TIME_WITHOUT_TIME_ZONE | MESSAGE | google.type.TimeOfDay | optional .google.type.TimeOfDay time = 18 [(confluent.field_meta) = {
params: [
{
key: "flink.precision",
value: "3"
},
{
key: "flink.version",
value: "1"
}
]
}];
|
– |
TINYINT | INT32 | – | optional int32 tinyInt = 4 [(confluent.field_meta) = {
doc: "tinyInt comment",
params: [
{
key: "flink.version",
value: "1"
},
{
key: "connect.type",
value: "int8"
}
]
}];
|
MetaProto extension: connect.type = int8 |
VARBINARY | BYTES | – | optional bytes varbinary = 14 [(confluent.field_meta) = {
params: [
{
key: "flink.maxLength",
value: "123"
},
{
key: "flink.version",
value: "1"
}
]
}];
|
flink.maxLength(default = MAX_LENGTH) |
VARCHAR | STRING | – | optional string varchar = 12 [(confluent.field_meta) = {
params: [
{
key: "flink.maxLength",
value: "123"
},
{
key: "flink.version",
value: "1"
}
]
}];
|
flink.maxLength(default = MAX_LENGTH) |
1 flink.wrapped
indicates that Flink wrappers are used to represent nullability, because Protobuf doesn’t support nullable repeated natively. For examples, see the ARRAY type.
Protobuf types to Flink SQL types¶
The following table shows the mapping of Protobuf types to Flink SQL and Connect types. It shows only mappings that are not covered by the previous table. These types can’t originate from Flink SQL.
This mapping is important when consuming/reading records with a schema that was created outside of Flink. The mapping defines the Flink table’s schema inferred from a Protobuf schema.
Protobuf type | Flink SQL type | Message type | Connect type annotation |
---|---|---|---|
FIXED32 | FIXED64 | SFIXED64 | BIGINT | – | – |
INT32 | SINT32 | SFIXED32 | INT | – | – |
INT32 | SINT32 | SFIXED32 | SMALLINT | – | int16 |
INT32 | SINT32 | SFIXED32 | TINYINT | – | int8 |
INT64 | SINT64 | BIGINT | – | – |
UINT32 | UINT64 | BIGINT | – | – |
MESSAGE | BIGINT | google.protobuf.Int64Value | – |
MESSAGE | BIGINT | google.protobuf.UInt64Value | – |
MESSAGE | BIGINT | google.protobuf.UInt32Value | – |
MESSAGE | BOOLEAN | google.protobuf.BoolValue | – |
MESSAGE | DOUBLE | google.protobuf.DoubleValue | – |
MESSAGE | FLOAT | google.protobuf.FloatValue | – |
MESSAGE | INT | google.protobuf.Int32Value | – |
MESSAGE | VARBINARY | google.protobuf.BytesValue | – |
MESSAGE | VARCHAR | google.protobuf.StringValue | – |
oneOf | ROW | – | – |