Configure a Dead Letter Queue
A Dead Letter Queue (DLQ) captures records that Confluent Cloud for Apache Flink cannot deserialize, enabling your statements to continue processing instead of failing. When a deserialization error occurs, Flink routes the problematic record to a DLQ table, where you can inspect it later.
DLQ error handling applies only to deserialization errors at the source. Errors in user-defined functions (UDFs), serialization, or windowed aggregations are not routed to the DLQ. For recommendations on handling errors inside UDFs, see Error handling best practices.
Note
Tableflow uses the same error-handling.mode and error-handling.log.target table properties and the same DLQ schema. For Tableflow-specific error handling behavior, see Error-handling mode.
Prerequisites
Access to Confluent Cloud.
A provisioned Flink compute pool.
The appropriate RBAC roles and ACL permissions. For details, see Dead Letter Queue permissions.
Configure a DLQ when creating a new table
You can enable DLQ error handling when creating a new table by setting the error-handling.mode and error-handling.log.target table properties in your CREATE TABLE statement.
CREATE TABLE my_source_table (
id INT,
name STRING,
event_time TIMESTAMP_LTZ(3)
) WITH (
'error-handling.mode' = 'log',
'error-handling.log.target' = 'my_source_table_error_log'
);
Flink attempts to create the DLQ topic and register the schema automatically. If you don’t specify error-handling.log.target, the default DLQ table name is error_log.
Configure a DLQ for an existing table
You can add DLQ error handling to an existing table by using an ALTER TABLE statement.
ALTER TABLE my_source_table SET (
'error-handling.mode' = 'log',
'error-handling.log.target' = 'my_source_table_error_log'
);
Flink attempts to create the DLQ topic and register the schema at the time the ALTER TABLE statement runs.
Important
If DLQ creation fails (for example, the topic cannot be created, the schema cannot be registered, or permissions are missing), the CREATE TABLE or ALTER TABLE statement itself does not fail. The failure surfaces later when the first deserialization error occurs and Flink cannot write the record to the DLQ, causing the job to fail.
To verify that the DLQ was created successfully, check that the DLQ topic and schema exist in your Kafka cluster and Schema Registry after running the statement.
Pre-create a DLQ outside Flink
By default, Flink creates the DLQ topic and schema automatically with default settings. If you need control over the topic configuration, such as partition count, retention time, or cleanup policy, you can pre-create the DLQ topic and schema before enabling error handling.
Option A: Create the DLQ table in Flink with a CREATE TABLE statement
Run a CREATE TABLE statement with the full DLQ schema and your desired topic configuration.
CREATE TABLE my_source_table_error_log (
`error_timestamp` TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL,
`error_code` INT NOT NULL,
`error_reason` VARCHAR(2147483647) NOT NULL,
`error_message` VARCHAR(2147483647) NOT NULL,
`error_details` MAP<VARCHAR(2147483647) NOT NULL, VARCHAR(2147483647)> NOT NULL,
`processor` VARCHAR(2147483647) NOT NULL,
`statement_name` VARCHAR(2147483647),
`affected_type` VARCHAR(2147483647) NOT NULL,
`affected_catalog` VARCHAR(2147483647),
`affected_database` VARCHAR(2147483647),
`affected_name` VARCHAR(2147483647),
`source_record` ROW<
`topic` VARCHAR(2147483647),
`partition` INT,
`offset` BIGINT,
`timestamp` TIMESTAMP(3) WITH LOCAL TIME ZONE,
`timestamp_type` VARCHAR(2147483647),
`headers` MAP<VARCHAR(2147483647) NOT NULL, VARBINARY(2147483647)>,
`key` VARBINARY(2147483647),
`value` VARBINARY(2147483647)>
)
DISTRIBUTED INTO 6 BUCKETS
WITH (
'changelog.mode' = 'append',
'kafka.cleanup-policy' = 'delete',
'kafka.retention.time' = '7 d',
'value.format' = 'avro-registry'
);
Then configure your source table to use this DLQ table:
ALTER TABLE my_source_table SET (
'error-handling.mode' = 'log',
'error-handling.log.target' = 'my_source_table_error_log'
);
Option B: Create the topic and schema manually
If you need full control and want to create the DLQ topic and schema outside of Flink, follow these steps:
Create the Kafka topic with your desired configuration (partition count, retention, cleanup policy) using the Confluent Cloud Console, CLI, or API.
Register both a key schema and a value schema for the topic in Schema Registry. The subject names must follow the pattern
<topic_name>-keyand<topic_name>-value. See Schema definitions for manual registration for the required schemas.Configure your source table to use the pre-created DLQ:
ALTER TABLE my_source_table SET ( 'error-handling.mode' = 'log', 'error-handling.log.target' = 'my_source_table_error_log' );
DLQ table schema reference
The DLQ table uses the following schema. The error_timestamp field is stored in the Kafka message key, and all other fields are stored in the message value.
Key fields
Column | Type | Nullable | Description |
|---|---|---|---|
|
| NOT NULL | When the error occurred. |
Value fields
Column | Type | Nullable | Description |
|---|---|---|---|
|
| NOT NULL | Numeric error code. |
|
| NOT NULL | Error category or reason. |
|
| NOT NULL | Human-readable error message. |
|
| NOT NULL | Additional key-value error details. |
|
| NOT NULL | The processor that encountered the error. |
|
| Nullable | Name of the statement that triggered the error. |
|
| NOT NULL | Type of the affected resource. |
|
| Nullable | Catalog of the affected table. |
|
| Nullable | Database of the affected table. |
|
| Nullable | Name of the affected table. |
|
| Nullable | The original Kafka record that caused the error. Contains nested fields: |
Schema definitions for manual registration
When pre-creating a DLQ outside of Flink, you must register both a key schema and a value schema in Schema Registry. The subject names must follow the pattern <topic_name>-key and <topic_name>-value.
Note
In the Avro schemas below, the name field uses error_log as a placeholder (matching the default DLQ table name). Replace error_log with your actual DLQ table name in the name and nested record name fields.
Key schema
{
"fields": [
{
"name": "error_timestamp",
"type": {
"logicalType": "timestamp-millis",
"type": "long"
}
}
],
"name": "error_log_key",
"namespace": "org.apache.flink.avro.generated.record",
"type": "record"
}
{
"additionalProperties": false,
"properties": {
"error_timestamp": {
"connect.index": 0,
"connect.type": "int64",
"title": "org.apache.kafka.connect.data.Timestamp",
"type": "number"
}
},
"required": ["error_timestamp"],
"title": "Record",
"type": "object"
}
syntax = "proto3";
package org.apache.flink.proto.generated.record;
import "google/protobuf/timestamp.proto";
message Record {
.google.protobuf.Timestamp error_timestamp = 1 [(confluent.field_meta) = {
params: [
{ key: "flink.version", value: "1" },
{ key: "flink.notNull", value: "true" },
{ key: "flink.precision", value: "3" }
]
}];
}
Value schema
{
"fields": [
{
"name": "error_code",
"type": "int"
},
{
"name": "error_reason",
"type": "string"
},
{
"name": "error_message",
"type": "string"
},
{
"name": "error_details",
"type": {
"type": "map",
"values": ["null", "string"]
}
},
{
"name": "processor",
"type": "string"
},
{
"default": null,
"name": "statement_name",
"type": ["null", "string"]
},
{
"name": "affected_type",
"type": "string"
},
{
"default": null,
"name": "affected_catalog",
"type": ["null", "string"]
},
{
"default": null,
"name": "affected_database",
"type": ["null", "string"]
},
{
"default": null,
"name": "affected_name",
"type": ["null", "string"]
},
{
"default": null,
"name": "source_record",
"type": [
"null",
{
"fields": [
{
"default": null,
"name": "topic",
"type": ["null", "string"]
},
{
"default": null,
"name": "partition",
"type": ["null", "int"]
},
{
"default": null,
"name": "offset",
"type": ["null", "long"]
},
{
"default": null,
"name": "timestamp",
"type": [
"null",
{
"logicalType": "timestamp-millis",
"type": "long"
}
]
},
{
"default": null,
"name": "timestamp_type",
"type": ["null", "string"]
},
{
"default": null,
"name": "headers",
"type": [
"null",
{
"type": "map",
"values": ["null", "bytes"]
}
]
},
{
"default": null,
"name": "key",
"type": ["null", "bytes"]
},
{
"default": null,
"name": "value",
"type": ["null", "bytes"]
}
],
"name": "error_log_value_source_record",
"type": "record"
}
]
}
],
"name": "error_log_value",
"namespace": "org.apache.flink.avro.generated.record",
"type": "record"
}
{
"additionalProperties": false,
"properties": {
"error_code": {
"connect.index": 0,
"connect.type": "int32",
"type": "number"
},
"error_reason": {
"connect.index": 1,
"type": "string"
},
"error_message": {
"connect.index": 2,
"type": "string"
},
"error_details": {
"additionalProperties": {
"oneOf": [
{ "type": "null" },
{ "type": "string" }
]
},
"connect.index": 3,
"connect.type": "map",
"type": "object"
},
"processor": {
"connect.index": 4,
"type": "string"
},
"statement_name": {
"connect.index": 5,
"oneOf": [
{ "type": "null" },
{ "type": "string" }
]
},
"affected_type": {
"connect.index": 6,
"type": "string"
},
"affected_catalog": {
"connect.index": 7,
"oneOf": [
{ "type": "null" },
{ "type": "string" }
]
},
"affected_database": {
"connect.index": 8,
"oneOf": [
{ "type": "null" },
{ "type": "string" }
]
},
"affected_name": {
"connect.index": 9,
"oneOf": [
{ "type": "null" },
{ "type": "string" }
]
},
"source_record": {
"connect.index": 10,
"oneOf": [
{ "type": "null" },
{
"additionalProperties": false,
"properties": {
"topic": {
"connect.index": 0,
"oneOf": [
{ "type": "null" },
{ "type": "string" }
]
},
"partition": {
"connect.index": 1,
"oneOf": [
{ "type": "null" },
{ "connect.type": "int32", "type": "number" }
]
},
"offset": {
"connect.index": 2,
"oneOf": [
{ "type": "null" },
{ "connect.type": "int64", "type": "number" }
]
},
"timestamp": {
"connect.index": 3,
"oneOf": [
{ "type": "null" },
{
"connect.type": "int64",
"title": "org.apache.kafka.connect.data.Timestamp",
"type": "number"
}
]
},
"timestamp_type": {
"connect.index": 4,
"oneOf": [
{ "type": "null" },
{ "type": "string" }
]
},
"headers": {
"connect.index": 5,
"oneOf": [
{ "type": "null" },
{
"additionalProperties": {
"oneOf": [
{ "type": "null" },
{ "connect.type": "bytes", "type": "string" }
]
},
"connect.type": "map",
"type": "object"
}
]
},
"key": {
"connect.index": 6,
"oneOf": [
{ "type": "null" },
{ "connect.type": "bytes", "type": "string" }
]
},
"value": {
"connect.index": 7,
"oneOf": [
{ "type": "null" },
{ "connect.type": "bytes", "type": "string" }
]
}
},
"title": "Record_source_record",
"type": "object"
}
]
}
},
"required": [
"error_code",
"error_reason",
"error_message",
"error_details",
"processor",
"affected_type"
],
"title": "Record",
"type": "object"
}
syntax = "proto3";
package org.apache.flink.proto.generated.record;
import "google/protobuf/timestamp.proto";
message Record {
int32 error_code = 1;
string error_reason = 2;
string error_message = 3;
repeated ErrorDetailsEntry error_details = 4;
string processor = 5;
optional string statement_name = 6;
string affected_type = 7;
optional string affected_catalog = 8;
optional string affected_database = 9;
optional string affected_name = 10;
optional source_record_Row source_record = 11;
message ErrorDetailsEntry {
string key = 1;
optional string value = 2;
}
message source_record_Row {
optional string topic = 1;
optional int32 partition = 2;
optional int64 offset = 3;
optional .google.protobuf.Timestamp timestamp = 4 [(confluent.field_meta) = {
params: [
{ key: "flink.precision", value: "3" },
{ key: "flink.version", value: "1" }
]
}];
optional string timestamp_type = 5;
headersRepeatedWrapper headers = 6 [(confluent.field_meta) = {
params: [
{ key: "flink.wrapped", value: "true" },
{ key: "flink.version", value: "1" }
]
}];
optional bytes key = 7;
optional bytes value = 8;
message headersRepeatedWrapper {
repeated ValueEntry value = 1;
message ValueEntry {
string key = 1;
optional bytes value = 2;
}
}
}
}
Inspect DLQ records
Query the DLQ table to view captured deserialization errors:
SELECT * FROM my_source_table_error_log;
Each record contains the error details and, when available, the original Kafka record (topic, partition, offset, key, and value) that caused the error.
Limitations and known behaviors
DLQ error handling applies only to deserialization errors at the source. Errors in UDFs, serialization, or windowed aggregations are not routed to the DLQ. For UDF error handling, see Error handling best practices.
Error handling is configured at the table level, not per statement. All statements reading from a table with
error-handling.modeset tologshare the same DLQ configuration.Each source table can have one DLQ target at a time. DLQ configuration is set at the source table level, not per job or per consumer.
Best practices
Use a dedicated service account for DLQ setup when consuming from shared or team-owned source topics. Because configuring a DLQ requires
DeveloperManagepermissions on the source topic, consider using a platform or admin service account to run theALTER TABLEstatement. This avoids granting elevated permissions on source topics to individual consumer service accounts.