Data Type Mappings in Confluent Cloud for Apache Flink

Confluent Cloud for Apache Flink® supports records in the Avro Schema Registry, JSON_SR, and Protobuf Schema Registry formats.

The following table shows the mapping of Flink SQL types to JSON Schema, Protobuf, and Avro types. For the mapping of Flink SQL types to Java and Python types, see Data Types in Confluent Cloud for Apache Flink.

Flink SQL type

JSON Schema type

Protobuf type

Avro type

Avro logical type

ARRAY

Array

repeated T

array

BIGINT

Number

INT64

long

BINARY

String

BYTES

fixed

BOOLEAN

Boolean

BOOL

boolean

BYTES / VARBINARY

String

BYTES

bytes

CHAR

String

STRING

string

DATE

Number

MESSAGE

int

date

DECIMAL

Number

MESSAGE

bytes

decimal

DOUBLE

Number

DOUBLE

double

FLOAT

Number

FLOAT

float

INT

Number

INT32

int

INTERVAL DAY TO SECOND

Not supported

Not supported

Not supported

INTERVAL YEAR TO MONTH

Not supported

Not supported

Not supported

MAP

Array[Object] / Object

repeated MESSAGE

map / array

MULTISET

Array[Object] / Object

repeated MESSAGE

map / array

NULL

oneOf(Null, T)

[1]

union(avro_type, null)

ROW

Object

MESSAGE

record [2]

SMALLINT

Number

INT32

int

TIME

Number

int

time-millis

TIMESTAMP

Number

MESSAGE

long

local-timestamp-millis/local-timestamp-micros

TIMESTAMP_LTZ

Number

MESSAGE

long

timestamp-millis / timestamp-micros

TINYINT

Number

INT32

int

VARCHAR / STRING

String

STRING

string

Avro schemas

Known limitations

  • Avro enums have limited support. Flink supports reading and writing enums but treats them as a STRING type. From Flink’s perspective, enums are not distinguishable from the STRING type. You can’t create an Avro schema from Flink that has an enum field.

  • Flink doesn’t support reading Avro time-micros as a TIME type. Flink supports TIME with precision up to 3. time-micros is read and written as BIGINT.

  • Field names must match Avro criteria. Avro expects field names to start with [A-Za-z_] and subsequently contain only [A-Za-z0-9_].

  • These Flink types are not supported:

    • INTERVAL_DAY_TIME

    • INTERVAL_YEAR_MONTH

    • TIMESTAMP_WITH_TIMEZONE

JSON Schema

JSON deserialization conversion behavior

When deserializing JSON data, Confluent Cloud for Apache Flink® enforces specific conversion rules when an incoming data type does not exactly match the target type defined in the Flink SQL table schema.

If an incoming JSON type cannot be safely converted to the target SQL type, the statement will fail with a deserialization error.

The following matrix shows the allowed conversions. The “From” type represents the data type in the incoming JSON record, and the “To” type represents the target data type defined in your Flink SQL table.

From/To

ARRAY

BOOLEAN

NUMBER

OBJECT

STRING

ARRAY

Allowed

Fail

Fail

Fail

Allowed [3]

BOOLEAN

Fail

Allowed

Fail

Fail

Allowed [3]

NUMBER

Fail

Fail

Allowed

Fail

Allowed [3]

OBJECT

Fail

Fail

Fail

Allowed

Allowed [3]

STRING

Fail

Allowed [4]

Allowed [5]

Fail

Allowed [3]

Conversion Rule Details:

Protobuf schema

Protobuf 3 nullable field behavior

When working with Protobuf 3 schemas in Confluent Cloud for Apache Flink, it’s important to understand how nullable fields are handled.

When converting to a Protobuf schema, Flink marks all NULLABLE fields as optional.

In Protobuf, expressing something as NULLABLE or NOT NULL is not straightforward.

  • All non-MESSAGE types are NOT NULL. If not set explicitly, the default value is assigned.

  • Non-MESSAGE types marked with optional can be checked if they were set. If not set, Flink assumes NULL.

  • MESSAGE types are all NULLABLE, which means that all fields of MESSAGE type are optional, and there is no way to ensure on a format level they are NOT NULL. To store this information, Flink uses the flink.notNull property, for example:

    message Row {
      .google.type.Date date = 1 [(confluent.field_meta) = {
        params: [
          {
            key: "flink.version",
            value: "1"
          },
          {
            key: "flink.notNull",
            value: "true"
          }
        ]
      }];
    }
    
Fields without the optional keyword

In Protobuf 3, fields without the optional keyword are treated as NOT NULL by Flink. This is because Protobuf 3 doesn’t support nullable getters/setters by default. If a field is omitted in the data, Protobuf 3 assigns the default value, which is 0 for numbers, the empty string for strings, and false for booleans.

Fields with the optional keyword

Fields marked with optional in Protobuf 3 are treated as nullable by Flink. When such a field is not set in the data, Flink interprets it as NULL.

Fields with the repeated keyword

Fields marked with repeated in Protobuf 3 are treated as arrays by Flink. The array itself is NOT NULL, but individual elements within the array can be nullable depending on their type. For MESSAGE types, elements are nullable by default. For primitive types, elements are NOT NULL.

This behavior is consistent across all streaming platforms that work with Protobuf 3, including Kafka Streams and other Confluent products, and is not specific to Flink. It’s a fundamental characteristic of the Protobuf 3 specification itself.

In a Protobuf 3 schema, if you want a field to be nullable in Flink, you must explicitly mark it as optional, for example:

message Example {
  string required_field = 1;        // NOT NULL in Flink
  optional string nullable_field = 2;  // NULLABLE in Flink
  repeated string array_field = 3;     // NOT NULL array in Flink
  repeated optional string nullable_array_field = 4;  // NOT NULL array with nullable elements
}

Changelog formats

Confluent Cloud for Apache Flink supports native interpretation of changelog formats, which enables Flink SQL to understand and process change data capture (CDC) streams from database sources.

When working with CDC data, database changes (inserts, updates, deletes) are captured and written to Apache Kafka® topics. Flink can interpret these change events to build real-time applications that react to data changes, without requiring manual change tracking or complex custom logic.

Debezium format

Confluent Cloud for Apache Flink provides native support for the Debezium CDC format, which is the standard format produced by Debezium CDC connectors like PostgreSQL CDC, MySQL CDC, SQL Server CDC, and Oracle XStream CDC.

The Debezium format wraps each change event in an envelope that contains metadata about the change operation. Flink can automatically detect and interpret this envelope structure based on the schema in Confluent Schema Registry.

Supported Debezium formats

Flink supports Debezium format with all three serialization types:

  • avro-debezium-registry - Avro serialization with Debezium envelope

  • json-debezium-registry - JSON_SR serialization with Debezium envelope

  • proto-debezium-registry - Protobuf serialization with Debezium envelope

Schema Registry requirements

The Debezium format requires an envelope schema to be registered in Schema Registry. Debezium CDC connectors automatically register the message schema when writing to Kafka topics.

If your connector doesn’t register the message schema automatically, you can use schemaless topic support.

Automatic Debezium detection

For schemas created after May 19, 2025 at 09:00 UTC, Flink automatically detects Debezium envelopes and configures tables with appropriate defaults:

  • The value.format property defaults to *-debezium-registry instead of *-registry

  • The changelog.mode property defaults to retract instead of append

Exception: If the Kafka topic has cleanup.policy set to compact, the changelog.mode is set to upsert instead.

Configure Debezium format manually

For schemas created before May 19, 2025, or to override the automatic detection, use the ALTER TABLE statement to configure the Debezium format:

-- For Avro
ALTER TABLE my_avro_table SET (
  'value.format' = 'avro-debezium-registry',
  'changelog.mode' = 'retract'
);

-- For JSON_SR
ALTER TABLE my_json_table SET (
  'value.format' = 'json-debezium-registry',
  'changelog.mode' = 'retract'
);

-- For Protobuf
ALTER TABLE my_proto_table SET (
  'value.format' = 'proto-debezium-registry',
  'changelog.mode' = 'retract'
);

Changelog modes

The changelog.mode property controls how Flink interprets change events. Choose the mode that matches your use case.

append

Handles all create, read, and update events as INSERT operations. Delete events are ignored. This mode is useful when you are working with insert-only or event-log style streams where deletions should not be applied to the derived table.

Use this mode for append-only use cases, such as audit logs or immutable event streams.

retract

Full changelog mode. Create and read events produce INSERT messages. Updates produce UPDATE_BEFORE and UPDATE_AFTER messages. Deletes are converted to DELETE messages.

This mode correctly interprets the op field in the Debezium envelope, handling all change operations accurately.

Use this mode when you need to track all change operations, including updates and deletes.

Important

Retract mode is not compatible with the after.state.only option of Debezium connectors. In after.state.only mode, the connector doesn’t emit UPDATE_BEFORE events, which are required for retract mode.

For PostgreSQL Debezium connectors, set the monitored table’s replica identity to FULL (for example, ALTER TABLE <table_name> REPLICA IDENTITY FULL;). For more information, see PostgreSQL CDC Source Connector for Confluent Cloud.

upsert

Partial changelog mode. Create, read, and update events are converted to UPDATE_AFTER messages, and delete events are converted to DELETE messages that remove the corresponding primary key. This mode groups all operations for a primary key, making it suitable for building materialized views.

Use this mode when you need to maintain the current state of each row, keyed by the primary key, reflecting inserts, updates, and deletes.

Debezium format limitations

The Debezium format in Flink has these limitations:

  • Read-only format: The Debezium format is only supported for Flink SQL sources (reading from Kafka). Writing to Kafka in Debezium format from Flink is not currently supported.

  • Schema Registry integration: The format requires schemas to be registered in Schema Registry. Direct Debezium messages without Schema Registry integration are not supported.

  • Envelope structure: The schema must include the Debezium envelope fields (after, before, and op) for automatic detection to work.

Example: Process CDC data

This example shows how to process CDC data from a PostgreSQL database.

  1. Create a Debezium PostgreSQL CDC connector to capture changes from your database. The connector writes change events to a Kafka topic with the Debezium envelope format.

  2. After the connector is running, configure the table in Flink to use the Debezium format:

    -- Convert the table to use Debezium format
    ALTER TABLE employee_changes SET (
      'value.format' = 'avro-debezium-registry',
      'changelog.mode' = 'retract'
    );
    
  3. Query the table to see all changes to employee data:

    -- View all employee changes
    SELECT * FROM employee_changes;
    
    -- Filter for specific change types (inserts, updates, deletes)
    -- Build real-time aggregations that react to changes
    SELECT
      department_id,
      COUNT(*) as employee_count
    FROM employee_changes
    GROUP BY department_id;
    

    The table schema is inferred automatically from the after schema in the Debezium envelope, exposing only the actual data fields without the envelope metadata.

For more information about using CDC with Flink, see Inferred tables.