Data Type Mappings¶
Confluent Cloud for Apache Flink®️ supports records in the Avro Schema Registry, JSON_SR, and Protobuf Schema Registry formats.
Avro schemas¶
Known limitations¶
- Avro unions are generally not supported. Apache Flink® supports only unions of two elements where one of the elements is NULL. This is how Flink represents a nullable type.
- Avro enums have limited support. Flink supports reading and writing enums but treats them as a STRING type. From Flink’s perspective, enums are not distinguishable from the STRING type. You can’t create an Avro schema from Flink that has an enum field.
- Flink doesn’t support reading Avro
time-micros
as a TIME type. Flink supports TIME with precision up to3
.time-micros
are read and written as BIGINT. - These Flink types are not supported:
- INTERVAL_DAY_TIME
- INTERVAL_YEAR_MONTH
- TIMESTAMP_WITH_TIMEZONE
Flink SQL types to Avro types¶
The following table shows the mapping of Flink SQL types to Avro physical types.
This mapping is important for creating tables, because it defines the Avro schema that’s produced by a CREATE TABLE statement.
Flink SQL type | Avro type | Avro logical type | Connect type annotation |
---|---|---|---|
ARRAY | array | – | – |
BIGINT | long | – | – |
BINARY / VARBINARY | bytes | – | – |
BOOLEAN | boolean | – | – |
CHAR / VARCHAR / STRING | string | – | – |
DATE | int | date | – |
DECIMAL | bytes | decimal | – |
DOUBLE | double | – | – |
FLOAT | float | – | – |
INT | int | – | – |
INTERVAL_YEAR_MONTH | – | – | – |
INTERVAL_DAY_TIME | – | – | – |
MAP (string/char/varchar key) | map | – | – |
MAP (non-character key) | array | – | array of io.confluent.connect.avro.MapEntry(key, value) |
MULTISET (string/char/varchar element) | map | – | – |
MULTISET (non-character element) | array | – | array of io.confluent.connect.avro.MapEntry(key, value) |
ROW | record 1 | – | – |
SMALLINT | int | – | int16 |
TIME | int | time-millis | – |
TIMESTAMP | long | timestamp-millis / timestamp-micros | – |
TIMESTAMP_LTZ | long | local-timestamp-millis / local-timestamp-micros | – |
TIMESTAMP_WITH_TIMEZONE | – | – | – |
TINYINT | int | – | int8 |
1 For the record
type:
- name:
org.apache.flink.avro.generated.record
- nested records name:
org.apache.flink.avro.generated.record_$fieldName
Avro types to Flink SQL types¶
The following table shows the mapping of Avro types to Flink SQL and Connect types.
This mapping is important when consuming/reading records with a schema that was created outside of Flink. The mapping defines the Flink table’s schema inferred from an Avro schema.
Flink SQL supports reading and writing nullable types. A nullable type is
mapped to an Avro union(avro_type, null)
, with the avro_type
converted
from the corresponding Flink type.
Avro type | Avro logical type | Flink SQL type | Connect type annotation |
---|---|---|---|
array | – | ARRAY | – |
array (io.confluent.connect.avro.MapEntry ) |
– | MAP | – |
boolean | – | BOOLEAN | – |
bytes | – | BYTES | – |
bytes | decimal | DECIMAL | – |
double | – | DOUBLE | – |
enum | – | STRING | – |
fixed | – | VARBINARY(size) | – |
fixed | decimal | DECIMAL | – |
fixed | duration | VARBINARY(size) | – |
float | – | FLOAT | – |
int | – | TINYINT | int8 |
int | – | SMALLINT | int16 |
int | – | INT | – |
int | date | DATE | – |
int | time-millis | TIME(3) | – |
long | – | BIGINT | – |
long | time-micros | BIGINT | – |
long | timestamp-millis | TIMESTAMP(3) | – |
long | timestamp-micros | TIMESTAMP(6) | – |
long | local_timestamp_millis | TIMESTAMP_LTZ(3) | – |
long | local_timestamp_micros | TIMESTAMP_LTZ(6) | – |
map | – | MAP (varchar key) | – |
record | – | ROW | – |
string | – | STRING | – |
string (uuid) | – | STRING | – |
union with null type (null + one other type) | – | NULLABLE(type) | – |
union (other unions) | – | – | – |
JSON schemas¶
Flink SQL types to JSON types¶
The following table shows the mapping of Flink SQL types to JSON types.
This mapping is important for creating tables, because it defines the JSON schema that’s produced by a CREATE TABLE statement.
Flink SQL type | JSON type | JSON type title | Connect type annotation |
---|---|---|---|
ARRAY | ArraySchema | – | – |
BIGINT | NumberSchema | – | int64 |
BINARY | StringSchema | – | bytes |
BOOLEAN | BooleanSchema | – | – |
CHAR | StringSchema | – | – |
DATE | NumberSchema | org.apache.kafka.connect.data.Date | int32 |
DECIMAL | NumberSchema | org.apache.kafka.connect.data.Decimal | bytes |
DOUBLE | NumberSchema | – | float64 |
FLOAT | NumberSchema | – | float32 |
INT | NumberSchema | – | int32 |
MAP[VARCHAR, V] | ObjectSchema | – | map |
MULTISET[K] | ArraySchema[ObjectSchema] | – | map |
MULTISET[VARCHAR] | ObjectSchema | – | map |
ROW | ObjectSchema | – | – |
SMALLINT | NumberSchema | – | int16 |
TIME | NumberSchema | org.apache.kafka.connect.data.Time | int32 |
TIMESTAMP_LTZ | NumberSchema | org.apache.kafka.connect.data.Timestamp | int64 |
TINYINT | NumberSchema | – | int8 |
VARBINARY | StringSchema | – | bytes |
VARCHAR | StringSchema | – | – |
Notes¶
- Nullable types are expressed as oneOf(NullSchema, T).
- ObjectSchema for a MAP and MULTISET must have two fields [key, value].
- MULTISET is equivalent to MAP[K, INT] and is serialized accordingly.
JSON types to Flink SQL types¶
The following table shows the mapping of JSON types to Flink SQL and Connect types.
This mapping is important when consuming/reading records with a schema that was created outside of Flink. The mapping defines the Flink table’s schema inferred from a JSON schema.
JSON type | Flink SQL type | JSON type title | Connect type annotation |
---|---|---|---|
ArraySchema | ARRAY | – | – |
ArraySchema | MAP[K, V] | – | map |
BooleanSchema | BOOLEAN | – | – |
CombinedSchema | ROW | – | – |
EnumSchema | VARCHAR | – | – |
NumberSchema | BIGINT | – | int64 |
NumberSchema(requiresInteger=true) | BIGINT | – | – |
NumberSchema | DATE | org.apache.kafka.connect.data.Date | int32 |
NumberSchema | DECIMAL | org.apache.kafka.connect.data.Decimal | bytes |
NumberSchema | DOUBLE | – | float64 |
NumberSchema(requiresInteger=false) | DOUBLE | – | – |
NumberSchema | FLOAT | – | float32 |
NumberSchema | INT | – | int32 |
NumberSchema | SMALLINT | – | int16 |
NumberSchema | TIME(3) | org.apache.kafka.connect.data.Time | int32 |
NumberSchema | TIMESTAMP_LTZ(3) | org.apache.kafka.connect.data.Timestamp | int64 |
NumberSchema | TINYINT | – | int8 |
StringSchema | VARBINARY | – | bytes |
StringSchema | VARCHAR | – | – |
ObjectSchema | MAP[VARCHAR, V] | – | map |
ObjectSchema | ROW | – | – |
Notes¶
- Nullable types are expressed as oneOf(NullSchema, T).
- ObjectSchema for a MAP and MULTISET must have two fields [key, value].
- CombinedSchema (oneOf, allOf, anyOf) is expressed as a ROW, unless it can be simplified, for example, StringSchema and ConstantSchema.
Protobuf schemas¶
Flink SQL types to Protobuf types¶
The following table shows the mapping of Flink SQL types to Protobuf types.
This mapping is important for creating tables, because it defines the Protobuf schema that’s produced by a CREATE TABLE statement.
When converting to a Protobuf schema, Flink SQL marks all NULLABLE fields as optional.
Flink SQL type | Protobuf type | Message type | Comment |
---|---|---|---|
BOOLEAN | BOOL | – | – |
TINYINT | INT32 | – | MetaProto extension: connect.type = int8 |
SMALLINT | INT32 | – | MetaProto extension: connect.type = int16 |
INT | INT32 | – | – |
BIGINT | INT64 | – | – |
FLOAT | FLOAT | – | – |
DOUBLE | DOUBLE | – | – |
CHAR | STRING | – | – |
VARCHAR | STRING | – | – |
BINARY | BYTES | – | – |
VARBINARY | BYTES | – | – |
TIMESTAMP_LTZ | MESSAGE | google.protobuf.Timestamp | – |
DATE | MESSAGE | google.type.Date | – |
TIME_WITHOUT_TIME_ZONE | MESSAGE | google.type.TimeOfDay | – |
DECIMAL | MESSAGE | confluent.type.Decimal | – |
MAP[K, V] | repeated MESSAGE | XXEntry(K key, V value) | – |
ARRAY[T] | repeated T | – | – |
ROW | MESSAGE | fieldName | – |
Protobuf types to Flink SQL types¶
The following table shows the mapping of Protobuf types to Flink SQL and Connect types.
This mapping is important when consuming/reading records with a schema that was created outside of Flink. The mapping defines the Flink table’s schema inferred from a Protobuf schema.
Protobuf type | Flink SQL type | Message type | Connect type annotation |
---|---|---|---|
BOOL | BOOLEAN | – | – |
BYTES | VARBINARY | – | – |
DOUBLE | DOUBLE | – | – |
ENUM | VARCHAR | – | – |
FIXED32 / FIXED64 / SFIXED64 | BIGINT | – | – |
FLOAT | FLOAT | – | – |
INT32 / SINT32 / SFIXED32 | INT | – | – |
INT32 / SINT32 / SFIXED32 | SMALLINT | – | int16 |
INT32 / SINT32 / SFIXED32 | TINYINT | – | int8 |
INT64 / SINT64 / SFIXED64 | BIGINT | – | – |
MESSAGE | BIGINT | google.protobuf.Int64Value | – |
MESSAGE | BIGINT | google.protobuf.UInt64Value | – |
MESSAGE | BIGINT | google.protobuf.UInt32Value | – |
MESSAGE | BOOLEAN | google.protobuf.BoolValue | – |
MESSAGE | DATE | google.type.Date | – |
MESSAGE | DECIMAL | confluent.type.Decimal | – |
MESSAGE | DOUBLE | google.protobuf.DoubleValue | – |
MESSAGE | FLOAT | google.protobuf.FloatValue | – |
MESSAGE | INT | google.protobuf.Int32Value | – |
MESSAGE | MAP[type1, type2] | repeated xx.xx.XXEntry([type1] key, [type2] value) | – |
MESSAGE | TIME(3) | google.type.TimeOfDay | – |
MESSAGE | TIMESTAMP_LTZ(9) | google.protobuf.Timestamp | – |
MESSAGE | VARBINARY | google.protobuf.BytesValue | – |
MESSAGE | VARCHAR | google.protobuf.StringValue | – |
MESSAGE | ROW | – | – |
oneOf | ROW | – | – |
STRING | VARCHAR | – | – |
UINT32 / UINT64 | BIGINT | – | – |
Notes¶
Expressing something as NULLABLE or NOT NULL is not straightforward in Protobuf.
- All non-MESSAGE types are NOT NULL. If not set explicitly, the default value is assigned.
- Non-MESSAGE types marked with “optional” can be checked if they were set. If not set, Flink SQL assumes NULL.
- MESSAGE types are all NULLABLE, which means that all fields of MESSAGE type are optional, and there is no way to ensure on a format level they are NOT NULL.
- ARRAYS can’t be NULL, and a not-set repeated field is presented as an empty list. There is no way to differentiate an empty array from NULL.