Data Type Mappings in Confluent Cloud for Apache Flink
Confluent Cloud for Apache Flink® supports records in the Avro Schema Registry, JSON_SR, and Protobuf Schema Registry formats.
The following table shows the mapping of Flink SQL types to JSON Schema, Protobuf, and Avro types. For the mapping of Flink SQL types to Java and Python types, see Data Types in Confluent Cloud for Apache Flink.
Flink SQL type | JSON Schema type | Protobuf type | Avro type | Avro logical type |
|---|---|---|---|---|
Array | repeated T | array | – | |
Number | INT64 | long | – | |
String | BYTES | fixed | – | |
Boolean | BOOL | boolean | – | |
String | BYTES | bytes | – | |
String | STRING | string | – | |
Number | MESSAGE | int | date | |
Number | MESSAGE | bytes | decimal | |
Number | DOUBLE | double | – | |
Number | FLOAT | float | – | |
Number | INT32 | int | – | |
Not supported | Not supported | Not supported | – | |
Not supported | Not supported | Not supported | – | |
Array[Object] / Object | repeated MESSAGE | map / array | – | |
Array[Object] / Object | repeated MESSAGE | map / array | – | |
oneOf(Null, T) | union(avro_type, null) | – | ||
Object | MESSAGE | record [2] | – | |
Number | INT32 | int | – | |
Number | – | int | time-millis | |
Number | MESSAGE | long | local-timestamp-millis/local-timestamp-micros | |
Number | MESSAGE | long | timestamp-millis / timestamp-micros | |
Number | INT32 | int | – | |
String | STRING | string | – |
Avro schemas
Known limitations
Avro enums have limited support. Flink supports reading and writing enums but treats them as a STRING type. From Flink’s perspective, enums are not distinguishable from the STRING type. You can’t create an Avro schema from Flink that has an enum field.
Flink doesn’t support reading Avro
time-microsas a TIME type. Flink supports TIME with precision up to3.time-microsis read and written as BIGINT.Field names must match Avro criteria. Avro expects field names to start with
[A-Za-z_]and subsequently contain only[A-Za-z0-9_].These Flink types are not supported:
INTERVAL_DAY_TIME
INTERVAL_YEAR_MONTH
TIMESTAMP_WITH_TIMEZONE
Flink SQL types to Avro types
The following table shows the mapping of Flink SQL types to Avro physical types.
This mapping is important for creating tables, because it defines the Avro schema that’s produced by a CREATE TABLE statement.
ARRAY
Avro type:
arrayAvro logical type: –
Additional properties: –
Example:
{ "type" : "array", "items" : "long" }
BIGINT
Avro type:
longAvro logical type: –
Additional properties: –
Example:
long
BINARY
Avro type:
fixedAvro logical type: –
Additional properties:
flink.maxLength(MAX_LENGTH if not set)Example:
{ "type" : "fixed", "name" : "row", "namespace" : "io.confluent", "size" : 123 }
BOOLEAN
Avro type:
booleanAvro logical type: –
Additional properties: –
Example:
boolean
CHAR
Avro type:
stringAvro logical type: –
Additional properties:
flink.maxLength(MAX_LENGTH if not set)Example:
{ "type" : "string", "flink.maxLength" : 123, "flink.minLength" : 123, "flink.version" : "1" }
DATE
Avro type:
intAvro logical type:
dateAdditional properties: –
Example:
{ "type" : "int", "logicalType" : "date" }
DECIMAL
Avro type:
bytesAvro logical type:
decimalAdditional properties: –
Example:
{ "type" : "bytes", "logicalType" : "decimal", "precision" : 6, "scale" : 3 }
DOUBLE
Avro type:
doubleAvro logical type: –
Additional properties: –
Example:
double
FLOAT
Avro type:
floatAvro logical type: –
Additional properties: –
Example:
float
INT
Avro type:
intAvro logical type: –
Additional properties: –
Example:
int
MAP (character key)
Avro type:
mapAvro logical type: –
Additional properties: –
Example:
{ "type" : "map", "values" : "boolean" }
MAP (non-character key)
Avro type:
arrayAvro logical type: –
Additional properties: array of
io.confluent.connect.avro.MapEntry(key, value)Example:
{ "type" : "array", "items" : { "type" : "record", "name" : "MapEntry", "namespace" : "io.confluent.connect.avro", "fields" : [ { "name" : "key", "type" : "int" }, { "name" : "value", "type" : "bytes" } ] } }
MULTISET (character element)
Avro type:
mapAvro logical type: –
Additional properties:
flink.type : multisetExample:
{ "type" : "map", "values" : "int", "flink.type" : "multiset", "flink.version" : "1" }
MULTISET (non-character key)
Avro type:
arrayAvro logical type: –
Additional properties: array of
io.confluent.connect.avro.MapEntry(key, value),flink.type : multisetExample:
{ "type" : "array", "items" : { "type" : "record", "name" : "MapEntry", "namespace" : "io.confluent.connect.avro", "fields" : [ { "name" : "key", "type" : "long" }, { "name" : "value", "type" : "int" } ] }, "flink.type" : "multiset", "flink.version" : "1" }
ROW
Avro type:
recordAvro logical type: –
Additional properties:
connect.type=int16Name:
org.apache.flink.avro.generated.recordNested records name:
org.apache.flink.avro.generated.record_$fieldNameExample:
{ "type" : "record", "name" : "row", "namespace" : "io.confluent", "fields" : [ { "name" : "f0", "type" : "long", "doc" : "field comment" } ] }
SMALLINT
Avro type:
intAvro logical type: –
Additional properties:
connect.type=int16Example:
{ "type" : "int", "connect.type" : "int16" }
STRING / VARCHAR
Avro type:
stringAvro logical type: –
Additional properties:
flink.maxLength = flink.minLength(MAX_LENGTH if not set)Example:
{ "type" : "string", "flink.maxLength" : 123, "flink.version" : "1" }
TIME
Avro type:
intAvro logical type:
time-millisAdditional properties:
flink.precision(default: 3, max supported: 3)Example:
{ "type" : "int", "flink.precision" : 2, "flink.version" : "1", "logicalType" : "time-millis" }
TIMESTAMP
Avro type:
longAvro logical type:
local-timestamp-millis/local-timestamp-microsAdditional properties:
flink.precision(default: 3/6, max supported: 3/9)Example:
{ "type" : "long", "flink.precision" : 2, "flink.version" : "1", "logicalType" : "local-timestamp-millis" }
TIMESTAMP_LTZ
Avro type:
longAvro logical type:
timestamp-millis/timestamp-microsAdditional properties:
flink.precision(default: 3/6, max supported: 3/9)Example:
{ "type" : "long", "flink.precision" : 2, "flink.version" : "1", "logicalType" : "timestamp-millis" }
TINYINT
Avro type:
intAvro logical type: –
Additional properties:
connect.type=int8Example:
{ "type" : "int", "connect.type" : "int8" }
VARBINARY
Avro type:
bytesAvro logical type: –
Additional properties:
flink.maxLength(MAX_LENGTH if not set)Example:
{ "type" : "bytes", "flink.maxLength" : 123, "flink.version" : "1" }
Avro types to Flink SQL types
The following table shows the mapping of Avro types to Flink SQL and types. It shows only mappings that are not covered by the previous table. These types can’t originate from Flink SQL.
This mapping is important when consuming/reading records with a schema that was created outside of Flink. The mapping defines the Flink table’s schema inferred from an Avro schema.
Flink SQL supports reading and writing nullable types. A nullable type is mapped to an Avro union(avro_type, null), with the avro_type converted from the corresponding Flink type.
Avro type | Avro logical type | Flink SQL type | Example |
|---|---|---|---|
long | time-micros | BIGINT | – |
enum | – | STRING | – |
union with null type (null + one other type) | – | NULLABLE(type) | – |
union (other unions) | – | ROW(type_name Type0, …) | [
"long",
"string",
{
"type": "record",
"name": "User",
"namespace": "io.test1",
"fields": [
{
"name": "f0",
"type": "long"
}
]
}
]
|
string (uuid) | – | STRING | – |
fixed (duration) | – | BINARY(size) | – |
JSON Schema
Flink SQL types to JSON Schema types
The following table shows the mapping of Flink SQL types to JSON Schema types.
This mapping is important for creating tables, because it defines the JSON Schema that’s produced by a CREATE TABLE statement.
Nullable types are expressed as oneOf(Null, T).
Object for a MAP and MULTISET must have two fields [key, value].
MULTISET is equivalent to MAP[K, INT] and is serialized accordingly.
ARRAY
JSON Schema type:
ArrayAdditional properties: –
JSON type title: –
Example:
{ "type": "array", "items": { "type": "number", "title": "org.apache.kafka.connect.data.Time", "flink.precision": 2, "connect.type": "int32", "flink.version": "1" } }
BIGINT
JSON Schema type:
NumberAdditional properties:
connect.type=int64JSON type title: –
Example:
{ "type": "number", "connect.type": "int64" }
BINARY
JSON Schema type:
StringAdditional properties:
connect.type=bytesflink.minLength=flink.maxLength: Different from JSON’sminLength/maxLength, because this property describes bytes length, not string length.
JSON type title: –
Example:
{ "type": "string", "flink.maxLength": 123, "flink.minLength": 123, "flink.version": "1", "connect.type": "bytes" }
BOOLEAN
JSON Schema type:
BooleanAdditional properties: –
JSON type title: –
Example:
{ "type": "array", "items": { "type": "number", "title": "org.apache.kafka.connect.data.Time", "flink.precision": 2, "connect.type": "int32", "flink.version": "1" } }
CHAR
JSON Schema type:
StringAdditional properties:
minLength=maxLengthJSON type title: –
Example:
{ "type": "string", "minLength": 123, "maxLength": 123 }
DATE
JSON Schema type:
NumberAdditional properties:
connect.type=int32JSON type title:
org.apache.kafka.connect.data.DateExample: –
DECIMAL
JSON Schema type:
NumberAdditional properties:
connect.type=bytesJSON type title:
org.apache.kafka.connect.data.DecimalExample: –
DOUBLE
JSON Schema type:
NumberAdditional properties:
connect.type=float64JSON type title: –
Example:
{ "type": "number", "connect.type": "float64" }
FLOAT
JSON Schema type:
NumberAdditional properties:
connect.type=float32JSON type title: –
Example:
{ "type": "number", "connect.type": "float32" }
INT
JSON Schema type:
NumberAdditional properties:
connect.type=int32JSON type title: –
Example:
{ "type": "number", "connect.type": "int32" }
MAP[K, V]
JSON Schema type:
Array[Object]Additional properties:
connect.type=mapJSON type title: –
Example:
{ "type": "array", "connect.type": "map", "items": { "type": "object", "properties": { "value": { "type": "number", "connect.type": "int64" }, "key": { "type": "number", "connect.type": "int32" } } } }
MAP[VARCHAR, V]
JSON Schema type:
ObjectAdditional properties:
connect.type=mapJSON type title: –
Example:
{ "type":"object", "connect.type":"map", "additionalProperties": { "type":"number", "connect.type":"int64" } }
MULTISET[K]
JSON Schema type:
Array[Object]Additional properties:
connect.type=mapflink.type=multiset
JSON type title: The count (value) in the JSON schema must map to a Flink INT type. For MULTISET types, the count (value) in the JSON schema must map to a Flink INT type, which corresponds to
connect.type: int32in the JSON Schema. Usingconnect.type: int64causes a validation error.Example:
{ "type": "array", "connect.type": "map", "flink.type": "multiset", "items": { "type": "object", "properties": { "value": { "type": "number", "connect.type": "int32" }, "key": { "type": "number", "connect.type": "int32" } } } }
MULTISET[VARCHAR]
JSON Schema type:
ObjectAdditional properties:
connect.type=mapflink.type=multiset
JSON type title: The count (value) in the JSON schema must map to a Flink INT type. For MULTISET types, the count (value) in the JSON schema must map to a Flink INT type, which corresponds to
connect.type: int32in the JSON Schema. Usingconnect.type: int64causes a validation error.Example:
{ "type": "object", "connect.type": "map", "flink.type": "multiset", "additionalProperties": { "type": "number", "connect.type": "int32" } }
ROW
JSON Schema type:
ObjectAdditional properties: –
JSON type title: –
Example: –
SMALLINT
JSON Schema type:
NumberAdditional properties:
connect.type=int16JSON type title: –
Example:
{ "type": "number", "connect.type": "int16" }
TIME
JSON Schema type:
NumberAdditional properties:
connect.type=int32flink.precision
JSON type title:
org.apache.kafka.connect.data.TimeExample:
{ "type":"number", "title":"org.apache.kafka.connect.data.Time", "flink.precision":2, "connect.type":"int32", "flink.version":"1" }
TIMESTAMP
JSON Schema type:
NumberAdditional properties:
connect.type=int64flink.precisionflink.type=timestamp
JSON type title:
org.apache.kafka.connect.data.TimestampExample:
{ "type":"number", "title":"org.apache.kafka.connect.data.Timestamp", "flink.precision":2, "flink.type":"timestamp", "connect.type":"int64", "flink.version":"1" }
TIMESTAMP_LTZ
JSON Schema type:
NumberAdditional properties:
connect.type=int64flink.precision
JSON type title:
org.apache.kafka.connect.data.TimestampExample:
{ "type":"number", "title":"org.apache.kafka.connect.data.Timestamp", "flink.precision":2, "connect.type":"int64", "flink.version":"1" }
TINYINT
JSON Schema type:
NumberAdditional properties:
connect.type=int8JSON type title: –
Example:
{ "type": "number", "connect.type": "int8" }
VARBINARY
JSON Schema type:
StringAdditional properties:
connect.type=bytesflink.maxLength: Different from JSON’smaxLength, because this property describes bytes length, not string length.
JSON type title: –
Example:
{ "type": "string", "flink.maxLength": 123, "flink.version": "1", "connect.type": "bytes" }
VARCHAR
JSON Schema type:
StringAdditional properties:
maxLengthJSON type title: –
Example:
{ "type": "string", "maxLength": 123 }
JSON types to Flink SQL types
The following table shows the mapping of JSON types to Flink SQL types. It shows only mappings that are not covered by the previous table. These types can’t originate from Flink SQL.
This mapping is important when consuming/reading records with a schema that was created outside of Flink. The mapping defines the Flink table’s schema inferred from JSON Schema.
JSON type | Flink SQL type |
|---|---|
Combined | ROW |
Enum | VARCHAR |
Number(requiresInteger=true) | BIGINT |
Number(requiresInteger=false) | DOUBLE |
JSON deserialization conversion behavior
When deserializing JSON data, Confluent Cloud for Apache Flink® enforces specific conversion rules when an incoming data type does not exactly match the target type defined in the Flink SQL table schema.
If an incoming JSON type cannot be safely converted to the target SQL type, the statement will fail with a deserialization error.
The following matrix shows the allowed conversions. The “From” type represents the data type in the incoming JSON record, and the “To” type represents the target data type defined in your Flink SQL table.
From/To | ARRAY | BOOLEAN | NUMBER | OBJECT | STRING |
|---|---|---|---|---|---|
ARRAY | Allowed | Fail | Fail | Fail | Allowed [3] |
BOOLEAN | Fail | Allowed | Fail | Fail | Allowed [3] |
NUMBER | Fail | Fail | Allowed | Fail | Allowed [3] |
OBJECT | Fail | Fail | Fail | Allowed | Allowed [3] |
STRING | Fail | Allowed [4] | Allowed [5] | Fail | Allowed [3] |
Conversion Rule Details:
(To STRING): All incoming JSON types (ARRAY, BOOLEAN, NUMBER, OBJECT) are allowed to be interpreted as a STRING. The system serializes the original value into its JSON string representation.
(STRING to BOOLEAN): The system inspects the incoming string and attempts to convert it to a BOOLEAN. For example, the string "false" is converted to the boolean false.
(STRING to NUMBER): The system inspects the incoming string and attempts to convert it to a NUMBER. For example, the string "111" is converted to the number 111. A non-numeric string (e.g., "test") will fail the conversion and raise an error.
Protobuf schema
Flink SQL types to Protobuf types
The following table shows the mapping of Flink SQL types to Protobuf types.
This mapping is important for creating tables, because it defines the Protobuf schema that’s produced by a CREATE TABLE statement.
ARRAY[T]
Protobuf type:
repeated TMessage type: –
Additional properties:
flink.wrapped, which indicates that Flink wrappers are used to represent nullability, because Protobuf doesn’t support nullable repeated natively.Example:
repeated int64 value = 1;
Nullable array:
arrayNullableRepeatedWrapper arrayNullable = 1 [(confluent.field_meta) = { params: [ { key: "flink.wrapped", value: "true" }, { key: "flink.version", value: "1" } ] }]; message arrayNullableRepeatedWrapper { repeated int64 value = 1; }
Nullable elements:
repeated elementNullableElementWrapper elementNullable = 2 [(confluent.field_meta) = { params: [ { key: "flink.wrapped", value: "true" }, { key: "flink.version", value: "1" } ] }]; message elementNullableElementWrapper { optional int64 value = 1; }
BIGINT
Protobuf type:
INT64Message type: –
Additional properties: –
Example:
optional int64 bigint = 8;
BINARY
Protobuf type:
BYTESMessage type: –
Additional properties:
flink.maxLength=flink.minLengthExample:
optional bytes binary = 13 [(confluent.field_meta) = { params: [ { key: "flink.maxLength", value: "123" }, { key: "flink.minLength", value: "123" }, { key: "flink.version", value: "1" } ] }];
BOOLEAN
Protobuf type:
BOOLMessage type: –
Additional properties: –
Example:
optional bool boolean = 2;
CHAR
Protobuf type:
STRINGMessage type: –
Additional properties:
flink.maxLength=flink.minLengthExample:
optional string char = 11 [(confluent.field_meta) = { params: [ { key: "flink.maxLength", value: "123" }, { key: "flink.minLength", value: "123" }, { key: "flink.version", value: "1" } ] }];
DATE
Protobuf type:
MESSAGEMessage type:
google.type.DateAdditional properties: –
Example:
optional .google.type.Date date = 17;
DECIMAL
Protobuf type:
MESSAGEMessage type:
confluent.type.DecimalAdditional properties: –
Example:
optional .confluent.type.Decimal decimal = 19 [(confluent.field_meta) = { params: [ { value: "5", key: "precision" }, { value: "1", key: "scale" }, { key: "flink.version", value: "1" } ] }];
DOUBLE
Protobuf type:
DOUBLEMessage type: –
Additional properties: –
Example:
optional double double = 10;
FLOAT
Protobuf type:
FLOATMessage type: –
Additional properties: –
Example:
optional float float = 9;
INT
Protobuf type:
INT32Message type: –
Additional properties: –
Example:
optional int32 int = 7;
MAP[K, V]
Protobuf type:
repeated MESSAGEMessage type:
XXEntry(K key, V value)Additional properties:
flink.wrapped, which indicates that Flink wrappers are used to represent nullability, because Protobuf doesn’t support nullable repeated natively. For examples, see the ARRAY type.Example:
repeated MapEntry map = 20; message MapEntry { optional string key = 1; optional int64 value = 2; }
MULTISET[V]
Protobuf type:
repeated MESSAGEMessage type:
XXEntry(V key, int32 value)Additional properties:
flink.wrapped, which indicates that Flink wrappers are used to represent nullability, because Protobuf doesn’t support nullable repeated natively. For examples, see the ARRAY type.flink.type=multiset
Example:
repeated MultisetEntry multiset = 1 [(confluent.field_meta) = { params: [ { key: "flink.type", value: "multiset" }, { key: "flink.version", value: "1" } ] }]; message MultisetEntry { optional string key = 1; int32 value = 2; }
ROW
Protobuf type:
MESSAGEMessage type:
fieldNameAdditional properties: –
Example:
meta_Row meta = 1; message meta_Row { float a = 1; float b = 2; }
SMALLINT
Protobuf type:
INT32Message type: –
Additional properties: MetaProto extension:
connect.type = int16Example:
optional int32 smallInt = 6 [(confluent.field_meta) = { doc: "smallInt comment", params: [ { key: "flink.version", value: "1" }, { key: "connect.type", value: "int16" } ] }];
TIMESTAMP
Protobuf type:
MESSAGEMessage type:
google.protobuf.TimestampAdditional properties:
flink.precisionflink.type=timestamp
Example:
optional .google.protobuf.Timestamp timestamp_ltz_3 = 16 [(confluent.field_meta) = { params: [ { key: "flink.type", value: "timestamp" }, { key: "flink.precision", value: "3" }, { key: "flink.version", value: "1" } ] }];
TIMESTAMP_LTZ
Protobuf type:
MESSAGEMessage type:
google.protobuf.TimestampAdditional properties:
flink.precisionExample:
optional .google.protobuf.Timestamp timestamp_ltz_3 = 15 [(confluent.field_meta) = { params: [ { key: "flink.precision", value: "3" }, { key: "flink.version", value: "1" } ] }];
TIME_WITHOUT_TIME_ZONE
Protobuf type:
MESSAGEMessage type:
google.type.TimeOfDayAdditional properties: –
Example:
optional .google.type.TimeOfDay time = 18 [(confluent.field_meta) = { params: [ { key: "flink.precision", value: "3" }, { key: "flink.version", value: "1" } ] }];
TINYINT
Protobuf type:
INT32Message type: –
Additional properties: MetaProto extension:
connect.type = int8Example:
optional int32 tinyInt = 4 [(confluent.field_meta) = { doc: "tinyInt comment", params: [ { key: "flink.version", value: "1" }, { key: "connect.type", value: "int8" } ] }];
VARBINARY
Protobuf type:
BYTESMessage type: –
Additional properties:
flink.maxLength(default = MAX_LENGTH)Example:
optional bytes varbinary = 14 [(confluent.field_meta) = { params: [ { key: "flink.maxLength", value: "123" }, { key: "flink.version", value: "1" } ] }];
VARCHAR
Protobuf type:
STRINGMessage type: –
Additional properties:
flink.maxLength(default = MAX_LENGTH)Example:
optional string varchar = 12 [(confluent.field_meta) = { params: [ { key: "flink.maxLength", value: "123" }, { key: "flink.version", value: "1" } ] }];
Protobuf types to Flink SQL types
The following table shows the mapping of Protobuf types to Flink SQL and Connect types. It shows only mappings that are not covered by the previous table. These types can’t originate from Flink SQL.
This mapping is important when consuming/reading records with a schema that was created outside of Flink. The mapping defines the Flink table’s schema inferred from a Protobuf schema.
Protobuf type | Flink SQL type | Message type | Connect type annotation |
|---|---|---|---|
FIXED32 | FIXED64 | SFIXED64 | BIGINT | – | – |
INT32 | SINT32 | SFIXED32 | INT | – | – |
INT32 | SINT32 | SFIXED32 | SMALLINT | – | int16 |
INT32 | SINT32 | SFIXED32 | TINYINT | – | int8 |
INT64 | SINT64 | BIGINT | – | – |
UINT32 | UINT64 | BIGINT | – | – |
MESSAGE | BIGINT | google.protobuf.Int64Value | – |
MESSAGE | BIGINT | google.protobuf.UInt64Value | – |
MESSAGE | BIGINT | google.protobuf.UInt32Value | – |
MESSAGE | BOOLEAN | google.protobuf.BoolValue | – |
MESSAGE | DOUBLE | google.protobuf.DoubleValue | – |
MESSAGE | FLOAT | google.protobuf.FloatValue | – |
MESSAGE | INT | google.protobuf.Int32Value | – |
MESSAGE | VARBINARY | google.protobuf.BytesValue | – |
MESSAGE | VARCHAR | google.protobuf.StringValue | – |
oneOf | ROW | – | – |
Protobuf 3 nullable field behavior
When working with Protobuf 3 schemas in Confluent Cloud for Apache Flink, it’s important to understand how nullable fields are handled.
When converting to a Protobuf schema, Flink marks all NULLABLE fields as optional.
In Protobuf, expressing something as NULLABLE or NOT NULL is not straightforward.
All non-MESSAGE types are NOT NULL. If not set explicitly, the default value is assigned.
Non-MESSAGE types marked with
optionalcan be checked if they were set. If not set, Flink assumes NULL.MESSAGE types are all NULLABLE, which means that all fields of MESSAGE type are optional, and there is no way to ensure on a format level they are NOT NULL. To store this information, Flink uses the
flink.notNullproperty, for example:message Row { .google.type.Date date = 1 [(confluent.field_meta) = { params: [ { key: "flink.version", value: "1" }, { key: "flink.notNull", value: "true" } ] }]; }
- Fields without the
optionalkeyword In Protobuf 3, fields without the
optionalkeyword are treated as NOT NULL by Flink. This is because Protobuf 3 doesn’t support nullable getters/setters by default. If a field is omitted in the data, Protobuf 3 assigns the default value, which is 0 for numbers, the empty string for strings, andfalsefor booleans.- Fields with the
optionalkeyword Fields marked with
optionalin Protobuf 3 are treated as nullable by Flink. When such a field is not set in the data, Flink interprets it as NULL.- Fields with the
repeatedkeyword Fields marked with
repeatedin Protobuf 3 are treated as arrays by Flink. The array itself is NOT NULL, but individual elements within the array can be nullable depending on their type. For MESSAGE types, elements are nullable by default. For primitive types, elements are NOT NULL.
This behavior is consistent across all streaming platforms that work with Protobuf 3, including Kafka Streams and other Confluent products, and is not specific to Flink. It’s a fundamental characteristic of the Protobuf 3 specification itself.
In a Protobuf 3 schema, if you want a field to be nullable in Flink, you must explicitly mark it as optional, for example:
message Example {
string required_field = 1; // NOT NULL in Flink
optional string nullable_field = 2; // NULLABLE in Flink
repeated string array_field = 3; // NOT NULL array in Flink
repeated optional string nullable_array_field = 4; // NOT NULL array with nullable elements
}
Changelog formats
Confluent Cloud for Apache Flink supports native interpretation of changelog formats, which enables Flink SQL to understand and process change data capture (CDC) streams from database sources.
When working with CDC data, database changes (inserts, updates, deletes) are captured and written to Apache Kafka® topics. Flink can interpret these change events to build real-time applications that react to data changes, without requiring manual change tracking or complex custom logic.
Debezium format
Confluent Cloud for Apache Flink provides native support for the Debezium CDC format, which is the standard format produced by Debezium CDC connectors like PostgreSQL CDC, MySQL CDC, SQL Server CDC, and Oracle XStream CDC.
The Debezium format wraps each change event in an envelope that contains metadata about the change operation. Flink can automatically detect and interpret this envelope structure based on the schema in Confluent Schema Registry.
Supported Debezium formats
Flink supports Debezium format with all three serialization types:
avro-debezium-registry- Avro serialization with Debezium envelopejson-debezium-registry- JSON_SR serialization with Debezium envelopeproto-debezium-registry- Protobuf serialization with Debezium envelope
Schema Registry requirements
The Debezium format requires an envelope schema to be registered in Schema Registry. Debezium CDC connectors automatically register the message schema when writing to Kafka topics.
If your connector doesn’t register the message schema automatically, you can use schemaless topic support.
Automatic Debezium detection
For schemas created after May 19, 2025 at 09:00 UTC, Flink automatically detects Debezium envelopes and configures tables with appropriate defaults:
The
value.formatproperty defaults to*-debezium-registryinstead of*-registryThe
changelog.modeproperty defaults toretractinstead ofappend
Exception: If the Kafka topic has cleanup.policy set to compact, the changelog.mode is set to upsert instead.
Configure Debezium format manually
For schemas created before May 19, 2025, or to override the automatic detection, use the ALTER TABLE statement to configure the Debezium format:
-- For Avro
ALTER TABLE my_avro_table SET (
'value.format' = 'avro-debezium-registry',
'changelog.mode' = 'retract'
);
-- For JSON_SR
ALTER TABLE my_json_table SET (
'value.format' = 'json-debezium-registry',
'changelog.mode' = 'retract'
);
-- For Protobuf
ALTER TABLE my_proto_table SET (
'value.format' = 'proto-debezium-registry',
'changelog.mode' = 'retract'
);
Changelog modes
The changelog.mode property controls how Flink interprets change events. Choose the mode that matches your use case.
- append
Handles all create, read, and update events as INSERT operations. Delete events are ignored. This mode is useful when you are working with insert-only or event-log style streams where deletions should not be applied to the derived table.
Use this mode for append-only use cases, such as audit logs or immutable event streams.
- retract
Full changelog mode. Create and read events produce INSERT messages. Updates produce UPDATE_BEFORE and UPDATE_AFTER messages. Deletes are converted to DELETE messages.
This mode correctly interprets the
opfield in the Debezium envelope, handling all change operations accurately.Use this mode when you need to track all change operations, including updates and deletes.
Important
Retract mode is not compatible with the
after.state.onlyoption of Debezium connectors. Inafter.state.onlymode, the connector doesn’t emit UPDATE_BEFORE events, which are required for retract mode.For PostgreSQL Debezium connectors, set the monitored table’s replica identity to FULL (for example,
ALTER TABLE <table_name> REPLICA IDENTITY FULL;). For more information, see PostgreSQL CDC Source Connector for Confluent Cloud.- upsert
Partial changelog mode. Create, read, and update events are converted to UPDATE_AFTER messages, and delete events are converted to DELETE messages that remove the corresponding primary key. This mode groups all operations for a primary key, making it suitable for building materialized views.
Use this mode when you need to maintain the current state of each row, keyed by the primary key, reflecting inserts, updates, and deletes.
Debezium format limitations
The Debezium format in Flink has these limitations:
Read-only format: The Debezium format is only supported for Flink SQL sources (reading from Kafka). Writing to Kafka in Debezium format from Flink is not currently supported.
Schema Registry integration: The format requires schemas to be registered in Schema Registry. Direct Debezium messages without Schema Registry integration are not supported.
Envelope structure: The schema must include the Debezium envelope fields (
after,before, andop) for automatic detection to work.
Example: Process CDC data
This example shows how to process CDC data from a PostgreSQL database.
Create a Debezium PostgreSQL CDC connector to capture changes from your database. The connector writes change events to a Kafka topic with the Debezium envelope format.
After the connector is running, configure the table in Flink to use the Debezium format:
-- Convert the table to use Debezium format ALTER TABLE employee_changes SET ( 'value.format' = 'avro-debezium-registry', 'changelog.mode' = 'retract' );
Query the table to see all changes to employee data:
-- View all employee changes SELECT * FROM employee_changes; -- Filter for specific change types (inserts, updates, deletes) -- Build real-time aggregations that react to changes SELECT department_id, COUNT(*) as employee_count FROM employee_changes GROUP BY department_id;
The table schema is inferred automatically from the
afterschema in the Debezium envelope, exposing only the actual data fields without the envelope metadata.
For more information about using CDC with Flink, see Inferred tables.