Serialize and Deserialize data¶
Confluent Cloud for Apache Flink®️ supports records in the Avro, JSON_SR, and Protobuf formats.
Important
Confluent Cloud for Apache Flink®️ is currently available for Preview. A Preview feature is a Confluent Cloud component that is being introduced to gain early feedback from developers. Preview features can be used for evaluation and non-production testing purposes or to provide feedback to Confluent. The warranty, SLA, and Support Services provisions of your agreement with Confluent do not apply to Preview features. Confluent may discontinue providing Preview releases of the Preview features at any time in Confluent’s sole discretion. Check out Getting Help for questions, feedback and requests.
For SQL features and limitations in the preview program, see Notable Limitations in Public Preview.
Avro schemas¶
Known limitations¶
- Avro unions are generally not supported. Apache Flink® supports only unions of two elements where one of the elements is NULL. This is how Flink represents a nullable type.
- Avro enums have limited support. Flink supports reading and writing enums but treats them as a STRING type. From Flink’s perspective, enums are not distinguishable from the STRING type. You can’t create an Avro schema from Flink that has an enum field.
- Flink doesn’t support reading Avro
time-micros
as a TIME type. Flink supports TIME with precision up to3
.time-micros
are read and written as BIGINT. - These Flink types are not supported:
- INTERVAL_DAY_TIME
- INTERVAL_YEAR_MONTH
- TIMESTAMP_WITH_TIMEZONE
SQL types to Avro types¶
The following table shows the mapping of SQL types to Avro physical types.
This mapping is important for creating tables, because it defines the Avro schema that’s produced by a CREATE TABLE statement.
SQL type | Avro type | Avro logical type | Connect type annotation |
---|---|---|---|
ARRAY | array | – | – |
BIGINT | long | – | – |
BINARY / VARBINARY | bytes | – | – |
BOOLEAN | boolean | – | – |
CHAR / VARCHAR / STRING | string | – | – |
DATE | int | date | – |
DECIMAL | bytes | decimal | – |
DOUBLE | double | – | – |
FLOAT | float | – | – |
INT | int | – | – |
INTERVAL_YEAR_MONTH | – | – | – |
INTERVAL_DAY_TIME | – | – | – |
MAP (string/char/varchar key) | map | – | – |
MAP (non-character key) | array | – | array of io.confluent.connect.avro.MapEntry(key, value) |
MULTISET (string/char/varchar element) | map | – | – |
MULTISET (non-character element) | array | – | array of io.confluent.connect.avro.MapEntry(key, value) |
ROW | record 1 | – | – |
SMALLINT | int | – | int16 |
TIME | int | time-millis | – |
TIMESTAMP | long | timestamp-millis / timestamp-micros | – |
TIMESTAMP_LTZ | long | local-timestamp-millis / local-timestamp-micros | – |
TIMESTAMP_WITH_TIMEZONE | – | – | – |
TINYINT | int | – | int8 |
1 For the record
type:
- name:
org.apache.flink.avro.generated.record
- nested records name:
org.apache.flink.avro.generated.record_$fieldName
Avro types to SQL types¶
The following table shows the mapping of Avro types to SQL and Connect types.
This mapping is important when consuming/reading records with a schema that was created outside of Flink. The mapping defines the Flink table’s schema inferred from an Avro schema.
SQL supports reading and writing nullable types. A nullable type is
mapped to an Avro union(avro_type, null)
, with the avro_type
converted
from the corresponding Flink type.
Avro type | Avro logical type | SQL type | Connect type annotation |
---|---|---|---|
array | – | ARRAY | – |
array (io.confluent.connect.avro.MapEntry ) |
– | MAP | – |
boolean | – | BOOLEAN | – |
bytes | – | BYTES | – |
bytes | decimal | DECIMAL | – |
double | – | DOUBLE | – |
enum | – | STRING | – |
fixed | – | VARBINARY(size) | – |
fixed | decimal | DECIMAL | – |
fixed | duration | VARBINARY(size) | – |
float | – | FLOAT | – |
int | – | TINYINT | int8 |
int | – | SMALLINT | int16 |
int | – | INT | – |
int | date | DATE | – |
int | time-millis | TIME(3) | – |
long | – | BIGINT | – |
long | time-micros | BIGINT | – |
long | timestamp-millis | TIMESTAMP(3) | – |
long | timestamp-micros | TIMESTAMP(6) | – |
long | local_timestamp_millis | TIMESTAMP_LTZ(3) | – |
long | local_timestamp_micros | TIMESTAMP_LTZ(6) | – |
map | – | MAP (varchar key) | – |
record | – | ROW | – |
string | – | STRING | – |
string (uuid) | – | STRING | – |
union with null type (null + one other type) | – | NULLABLE(type) | – |
union (other unions) | – | – | – |
JSON schemas¶
SQL types to JSON types¶
The following table shows the mapping of SQL types to JSON types.
This mapping is important for creating tables, because it defines the JSON schema that’s produced by a CREATE TABLE statement.
SQL type | JSON type | JSON type title | Connect type annotation |
---|---|---|---|
ARRAY | ArraySchema | – | – |
BIGINT | NumberSchema | – | int64 |
BINARY | StringSchema | – | bytes |
BOOLEAN | BooleanSchema | – | – |
CHAR | StringSchema | – | – |
DATE | NumberSchema | org.apache.kafka.connect.data.Date | int32 |
DECIMAL | NumberSchema | org.apache.kafka.connect.data.Decimal | bytes |
DOUBLE | NumberSchema | – | float64 |
FLOAT | NumberSchema | – | float32 |
INT | NumberSchema | – | int32 |
MAP[VARCHAR, V] | ObjectSchema | – | map |
MULTISET[K] | ArraySchema[ObjectSchema] | – | map |
MULTISET[VARCHAR] | ObjectSchema | – | map |
ROW | ObjectSchema | – | – |
SMALLINT | NumberSchema | – | int16 |
TIME | NumberSchema | org.apache.kafka.connect.data.Time | int32 |
TIMESTAMP_LTZ | NumberSchema | org.apache.kafka.connect.data.Timestamp | int64 |
TINYINT | NumberSchema | – | int8 |
VARBINARY | StringSchema | – | bytes |
VARCHAR | StringSchema | – | – |
Notes¶
- Nullable types are expressed as oneOf(NullSchema, T).
- ObjectSchema for a MAP and MULTISET must have two fields [key, value].
- MULTISET is equivalent to MAP[K, INT] and is serialized accordingly.
JSON types to SQL types¶
The following table shows the mapping of JSON types to SQL and Connect types.
This mapping is important when consuming/reading records with a schema that was created outside of Flink. The mapping defines the Flink table’s schema inferred from a JSON schema.
JSON type | SQL type | JSON type title | Connect type annotation |
---|---|---|---|
ArraySchema | ARRAY | – | – |
ArraySchema | MAP[K, V] | – | map |
BooleanSchema | BOOLEAN | – | – |
CombinedSchema | ROW | – | – |
EnumSchema | VARCHAR | – | – |
NumberSchema | BIGINT | – | int64 |
NumberSchema(requiresInteger=true) | BIGINT | – | – |
NumberSchema | DATE | org.apache.kafka.connect.data.Date | int32 |
NumberSchema | DECIMAL | org.apache.kafka.connect.data.Decimal | bytes |
NumberSchema | DOUBLE | – | float64 |
NumberSchema(requiresInteger=false) | DOUBLE | – | – |
NumberSchema | FLOAT | – | float32 |
NumberSchema | INT | – | int32 |
NumberSchema | SMALLINT | – | int16 |
NumberSchema | TIME(3) | org.apache.kafka.connect.data.Time | int32 |
NumberSchema | TIMESTAMP_LTZ(3) | org.apache.kafka.connect.data.Timestamp | int64 |
NumberSchema | TINYINT | – | int8 |
StringSchema | VARBINARY | – | bytes |
StringSchema | VARCHAR | – | – |
ObjectSchema | MAP[VARCHAR, V] | – | map |
ObjectSchema | ROW | – | – |
Notes¶
- Nullable types are expressed as oneOf(NullSchema, T).
- ObjectSchema for a MAP and MULTISET must have two fields [key, value].
- CombinedSchema (oneOf, allOf, anyOf) is expressed as a ROW, unless it can be simplified, for example, StringSchema and ConstantSchema.
Protobuf schemas¶
SQL types to Protobuf types¶
The following table shows the mapping of SQL types to Protobuf types.
This mapping is important for creating tables, because it defines the Protobuf schema that’s produced by a CREATE TABLE statement.
When converting to a Protobuf schema, SQL marks all NULLABLE fields as optional.
SQL type | Protobuf type | Message type | Comment |
---|---|---|---|
BOOLEAN | BOOL | – | – |
TINYINT | INT32 | – | MetaProto extension: connect.type = int8 |
SMALLINT | INT32 | – | MetaProto extension: connect.type = int16 |
INT | INT32 | – | – |
BIGINT | INT64 | – | – |
FLOAT | FLOAT | – | – |
DOUBLE | DOUBLE | – | – |
CHAR | STRING | – | – |
VARCHAR | STRING | – | – |
BINARY | BYTES | – | – |
VARBINARY | BYTES | – | – |
TIMESTAMP_LTZ | MESSAGE | google.protobuf.Timestamp | – |
DATE | MESSAGE | google.type.Date | – |
TIME_WITHOUT_TIME_ZONE | MESSAGE | google.type.TimeOfDay | – |
DECIMAL | MESSAGE | confluent.type.Decimal | – |
MAP[K, V] | repeated MESSAGE | XXEntry(K key, V value) | – |
ARRAY[T] | repeated T | – | – |
ROW | MESSAGE | fieldName | – |
Protobuf types to SQL types¶
The following table shows the mapping of Protobuf types to SQL and Connect types.
This mapping is important when consuming/reading records with a schema that was created outside of Flink. The mapping defines the Flink table’s schema inferred from a Protobuf schema.
Protobuf type | SQL type | Message type | Connect type annotation |
---|---|---|---|
BOOL | BOOLEAN | – | – |
BYTES | VARBINARY | – | – |
DOUBLE | DOUBLE | – | – |
ENUM | VARCHAR | – | – |
FIXED32 / FIXED64 / SFIXED64 | BIGINT | – | – |
FLOAT | FLOAT | – | – |
INT32 / SINT32 / SFIXED32 | INT | – | – |
INT32 / SINT32 / SFIXED32 | SMALLINT | – | int16 |
INT32 / SINT32 / SFIXED32 | TINYINT | – | int8 |
INT64 / SINT64 / SFIXED64 | BIGINT | – | – |
MESSAGE | BIGINT | google.protobuf.Int64Value | – |
MESSAGE | BIGINT | google.protobuf.UInt64Value | – |
MESSAGE | BIGINT | google.protobuf.UInt32Value | – |
MESSAGE | BOOLEAN | google.protobuf.BoolValue | – |
MESSAGE | DATE | google.type.Date | – |
MESSAGE | DECIMAL | confluent.type.Decimal | – |
MESSAGE | DOUBLE | google.protobuf.DoubleValue | – |
MESSAGE | FLOAT | google.protobuf.FloatValue | – |
MESSAGE | INT | google.protobuf.Int32Value | – |
MESSAGE | MAP[type1, type2] | repeated xx.xx.XXEntry([type1] key, [type2] value) | – |
MESSAGE | TIME(3) | google.type.TimeOfDay | – |
MESSAGE | TIMESTAMP_LTZ(9) | google.protobuf.Timestamp | – |
MESSAGE | VARBINARY | google.protobuf.BytesValue | – |
MESSAGE | VARCHAR | google.protobuf.StringValue | – |
MESSAGE | ROW | – | – |
oneOf | ROW | – | – |
STRING | VARCHAR | – | – |
UINT32 / UINT64 | BIGINT | – | – |
Notes¶
Expressing something as NULLABLE or NOT NULL is not straightforward in Protobuf.
- All non-MESSAGE types are NOT NULL. If not set explicitly, the default value is assigned.
- Non-MESSAGE types marked with “optional” can be checked if they were set. If not set, SQL assumes NULL.
- MESSAGE types are all NULLABLE, which means that all fields of MESSAGE type are optional, and there is no way to ensure on a format level they are NOT NULL.
- ARRAYS can’t be NULL, and a not-set repeated field is presented as an empty list. There is no way to differentiate an empty array from NULL.