Configure a Dead Letter Queue

A Dead Letter Queue (DLQ) captures records that Confluent Cloud for Apache Flink cannot deserialize, enabling your statements to continue processing instead of failing. When a deserialization error occurs, Flink routes the problematic record to a DLQ table, where you can inspect it later.

DLQ error handling applies only to deserialization errors at the source. Errors in user-defined functions (UDFs), serialization, or windowed aggregations are not routed to the DLQ. For recommendations on handling errors inside UDFs, see Error handling best practices.

Note

Tableflow uses the same error-handling.mode and error-handling.log.target table properties and the same DLQ schema. For Tableflow-specific error handling behavior, see Error-handling mode.

Prerequisites

  • Access to Confluent Cloud.

  • A provisioned Flink compute pool.

  • The appropriate RBAC roles and ACL permissions. For details, see Dead Letter Queue permissions.

Configure a DLQ when creating a new table

You can enable DLQ error handling when creating a new table by setting the error-handling.mode and error-handling.log.target table properties in your CREATE TABLE statement.

CREATE TABLE my_source_table (
  id INT,
  name STRING,
  event_time TIMESTAMP_LTZ(3)
) WITH (
  'error-handling.mode' = 'log',
  'error-handling.log.target' = 'my_source_table_error_log'
);

Flink attempts to create the DLQ topic and register the schema automatically. If you don’t specify error-handling.log.target, the default DLQ table name is error_log.

Configure a DLQ for an existing table

You can add DLQ error handling to an existing table by using an ALTER TABLE statement.

ALTER TABLE my_source_table SET (
  'error-handling.mode' = 'log',
  'error-handling.log.target' = 'my_source_table_error_log'
);

Flink attempts to create the DLQ topic and register the schema at the time the ALTER TABLE statement runs.

Important

If DLQ creation fails (for example, the topic cannot be created, the schema cannot be registered, or permissions are missing), the CREATE TABLE or ALTER TABLE statement itself does not fail. The failure surfaces later when the first deserialization error occurs and Flink cannot write the record to the DLQ, causing the job to fail.

To verify that the DLQ was created successfully, check that the DLQ topic and schema exist in your Kafka cluster and Schema Registry after running the statement.

Pre-create a DLQ outside Flink

By default, Flink creates the DLQ topic and schema automatically with default settings. If you need control over the topic configuration, such as partition count, retention time, or cleanup policy, you can pre-create the DLQ topic and schema before enabling error handling.

Option A: Create the DLQ table in Flink with a CREATE TABLE statement

Run a CREATE TABLE statement with the full DLQ schema and your desired topic configuration.

CREATE TABLE my_source_table_error_log (
  `error_timestamp` TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL,
  `error_code` INT NOT NULL,
  `error_reason` VARCHAR(2147483647) NOT NULL,
  `error_message` VARCHAR(2147483647) NOT NULL,
  `error_details` MAP<VARCHAR(2147483647) NOT NULL, VARCHAR(2147483647)> NOT NULL,
  `processor` VARCHAR(2147483647) NOT NULL,
  `statement_name` VARCHAR(2147483647),
  `affected_type` VARCHAR(2147483647) NOT NULL,
  `affected_catalog` VARCHAR(2147483647),
  `affected_database` VARCHAR(2147483647),
  `affected_name` VARCHAR(2147483647),
  `source_record` ROW<
    `topic` VARCHAR(2147483647),
    `partition` INT,
    `offset` BIGINT,
    `timestamp` TIMESTAMP(3) WITH LOCAL TIME ZONE,
    `timestamp_type` VARCHAR(2147483647),
    `headers` MAP<VARCHAR(2147483647) NOT NULL, VARBINARY(2147483647)>,
    `key` VARBINARY(2147483647),
    `value` VARBINARY(2147483647)>
)
DISTRIBUTED INTO 6 BUCKETS
WITH (
  'changelog.mode' = 'append',
  'kafka.cleanup-policy' = 'delete',
  'kafka.retention.time' = '7 d',
  'value.format' = 'avro-registry'
);

Then configure your source table to use this DLQ table:

ALTER TABLE my_source_table SET (
  'error-handling.mode' = 'log',
  'error-handling.log.target' = 'my_source_table_error_log'
);

Option B: Create the topic and schema manually

If you need full control and want to create the DLQ topic and schema outside of Flink, follow these steps:

  1. Create the Kafka topic with your desired configuration (partition count, retention, cleanup policy) using the Confluent Cloud Console, CLI, or API.

  2. Register both a key schema and a value schema for the topic in Schema Registry. The subject names must follow the pattern <topic_name>-key and <topic_name>-value. See Schema definitions for manual registration for the required schemas.

  3. Configure your source table to use the pre-created DLQ:

    ALTER TABLE my_source_table SET (
      'error-handling.mode' = 'log',
      'error-handling.log.target' = 'my_source_table_error_log'
    );
    

DLQ table schema reference

The DLQ table uses the following schema. The error_timestamp field is stored in the Kafka message key, and all other fields are stored in the message value.

Key fields

Column

Type

Nullable

Description

error_timestamp

TIMESTAMP_LTZ(3)

NOT NULL

When the error occurred.

Value fields

Column

Type

Nullable

Description

error_code

INT

NOT NULL

Numeric error code.

error_reason

STRING

NOT NULL

Error category or reason.

error_message

STRING

NOT NULL

Human-readable error message.

error_details

MAP<STRING, STRING>

NOT NULL

Additional key-value error details.

processor

STRING

NOT NULL

The processor that encountered the error.

statement_name

STRING

Nullable

Name of the statement that triggered the error.

affected_type

STRING

NOT NULL

Type of the affected resource.

affected_catalog

STRING

Nullable

Catalog of the affected table.

affected_database

STRING

Nullable

Database of the affected table.

affected_name

STRING

Nullable

Name of the affected table.

source_record

ROW<...>

Nullable

The original Kafka record that caused the error. Contains nested fields: topic, partition, offset, timestamp, timestamp_type, headers, key (bytes), and value (bytes).

Schema definitions for manual registration

When pre-creating a DLQ outside of Flink, you must register both a key schema and a value schema in Schema Registry. The subject names must follow the pattern <topic_name>-key and <topic_name>-value.

Note

In the Avro schemas below, the name field uses error_log as a placeholder (matching the default DLQ table name). Replace error_log with your actual DLQ table name in the name and nested record name fields.

Key schema

{
  "fields": [
    {
      "name": "error_timestamp",
      "type": {
        "logicalType": "timestamp-millis",
        "type": "long"
      }
    }
  ],
  "name": "error_log_key",
  "namespace": "org.apache.flink.avro.generated.record",
  "type": "record"
}
{
  "additionalProperties": false,
  "properties": {
    "error_timestamp": {
      "connect.index": 0,
      "connect.type": "int64",
      "title": "org.apache.kafka.connect.data.Timestamp",
      "type": "number"
    }
  },
  "required": ["error_timestamp"],
  "title": "Record",
  "type": "object"
}
syntax = "proto3";
package org.apache.flink.proto.generated.record;

import "google/protobuf/timestamp.proto";

message Record {
  .google.protobuf.Timestamp error_timestamp = 1 [(confluent.field_meta) = {
    params: [
      { key: "flink.version", value: "1" },
      { key: "flink.notNull", value: "true" },
      { key: "flink.precision", value: "3" }
    ]
  }];
}

Value schema

{
  "fields": [
    {
      "name": "error_code",
      "type": "int"
    },
    {
      "name": "error_reason",
      "type": "string"
    },
    {
      "name": "error_message",
      "type": "string"
    },
    {
      "name": "error_details",
      "type": {
        "type": "map",
        "values": ["null", "string"]
      }
    },
    {
      "name": "processor",
      "type": "string"
    },
    {
      "default": null,
      "name": "statement_name",
      "type": ["null", "string"]
    },
    {
      "name": "affected_type",
      "type": "string"
    },
    {
      "default": null,
      "name": "affected_catalog",
      "type": ["null", "string"]
    },
    {
      "default": null,
      "name": "affected_database",
      "type": ["null", "string"]
    },
    {
      "default": null,
      "name": "affected_name",
      "type": ["null", "string"]
    },
    {
      "default": null,
      "name": "source_record",
      "type": [
        "null",
        {
          "fields": [
            {
              "default": null,
              "name": "topic",
              "type": ["null", "string"]
            },
            {
              "default": null,
              "name": "partition",
              "type": ["null", "int"]
            },
            {
              "default": null,
              "name": "offset",
              "type": ["null", "long"]
            },
            {
              "default": null,
              "name": "timestamp",
              "type": [
                "null",
                {
                  "logicalType": "timestamp-millis",
                  "type": "long"
                }
              ]
            },
            {
              "default": null,
              "name": "timestamp_type",
              "type": ["null", "string"]
            },
            {
              "default": null,
              "name": "headers",
              "type": [
                "null",
                {
                  "type": "map",
                  "values": ["null", "bytes"]
                }
              ]
            },
            {
              "default": null,
              "name": "key",
              "type": ["null", "bytes"]
            },
            {
              "default": null,
              "name": "value",
              "type": ["null", "bytes"]
            }
          ],
          "name": "error_log_value_source_record",
          "type": "record"
        }
      ]
    }
  ],
  "name": "error_log_value",
  "namespace": "org.apache.flink.avro.generated.record",
  "type": "record"
}
{
  "additionalProperties": false,
  "properties": {
    "error_code": {
      "connect.index": 0,
      "connect.type": "int32",
      "type": "number"
    },
    "error_reason": {
      "connect.index": 1,
      "type": "string"
    },
    "error_message": {
      "connect.index": 2,
      "type": "string"
    },
    "error_details": {
      "additionalProperties": {
        "oneOf": [
          { "type": "null" },
          { "type": "string" }
        ]
      },
      "connect.index": 3,
      "connect.type": "map",
      "type": "object"
    },
    "processor": {
      "connect.index": 4,
      "type": "string"
    },
    "statement_name": {
      "connect.index": 5,
      "oneOf": [
        { "type": "null" },
        { "type": "string" }
      ]
    },
    "affected_type": {
      "connect.index": 6,
      "type": "string"
    },
    "affected_catalog": {
      "connect.index": 7,
      "oneOf": [
        { "type": "null" },
        { "type": "string" }
      ]
    },
    "affected_database": {
      "connect.index": 8,
      "oneOf": [
        { "type": "null" },
        { "type": "string" }
      ]
    },
    "affected_name": {
      "connect.index": 9,
      "oneOf": [
        { "type": "null" },
        { "type": "string" }
      ]
    },
    "source_record": {
      "connect.index": 10,
      "oneOf": [
        { "type": "null" },
        {
          "additionalProperties": false,
          "properties": {
            "topic": {
              "connect.index": 0,
              "oneOf": [
                { "type": "null" },
                { "type": "string" }
              ]
            },
            "partition": {
              "connect.index": 1,
              "oneOf": [
                { "type": "null" },
                { "connect.type": "int32", "type": "number" }
              ]
            },
            "offset": {
              "connect.index": 2,
              "oneOf": [
                { "type": "null" },
                { "connect.type": "int64", "type": "number" }
              ]
            },
            "timestamp": {
              "connect.index": 3,
              "oneOf": [
                { "type": "null" },
                {
                  "connect.type": "int64",
                  "title": "org.apache.kafka.connect.data.Timestamp",
                  "type": "number"
                }
              ]
            },
            "timestamp_type": {
              "connect.index": 4,
              "oneOf": [
                { "type": "null" },
                { "type": "string" }
              ]
            },
            "headers": {
              "connect.index": 5,
              "oneOf": [
                { "type": "null" },
                {
                  "additionalProperties": {
                    "oneOf": [
                      { "type": "null" },
                      { "connect.type": "bytes", "type": "string" }
                    ]
                  },
                  "connect.type": "map",
                  "type": "object"
                }
              ]
            },
            "key": {
              "connect.index": 6,
              "oneOf": [
                { "type": "null" },
                { "connect.type": "bytes", "type": "string" }
              ]
            },
            "value": {
              "connect.index": 7,
              "oneOf": [
                { "type": "null" },
                { "connect.type": "bytes", "type": "string" }
              ]
            }
          },
          "title": "Record_source_record",
          "type": "object"
        }
      ]
    }
  },
  "required": [
    "error_code",
    "error_reason",
    "error_message",
    "error_details",
    "processor",
    "affected_type"
  ],
  "title": "Record",
  "type": "object"
}
syntax = "proto3";
package org.apache.flink.proto.generated.record;

import "google/protobuf/timestamp.proto";

message Record {
  int32 error_code = 1;
  string error_reason = 2;
  string error_message = 3;
  repeated ErrorDetailsEntry error_details = 4;
  string processor = 5;
  optional string statement_name = 6;
  string affected_type = 7;
  optional string affected_catalog = 8;
  optional string affected_database = 9;
  optional string affected_name = 10;
  optional source_record_Row source_record = 11;

  message ErrorDetailsEntry {
    string key = 1;
    optional string value = 2;
  }

  message source_record_Row {
    optional string topic = 1;
    optional int32 partition = 2;
    optional int64 offset = 3;
    optional .google.protobuf.Timestamp timestamp = 4 [(confluent.field_meta) = {
      params: [
        { key: "flink.precision", value: "3" },
        { key: "flink.version", value: "1" }
      ]
    }];
    optional string timestamp_type = 5;
    headersRepeatedWrapper headers = 6 [(confluent.field_meta) = {
      params: [
        { key: "flink.wrapped", value: "true" },
        { key: "flink.version", value: "1" }
      ]
    }];
    optional bytes key = 7;
    optional bytes value = 8;

    message headersRepeatedWrapper {
      repeated ValueEntry value = 1;

      message ValueEntry {
        string key = 1;
        optional bytes value = 2;
      }
    }
  }
}

Inspect DLQ records

Query the DLQ table to view captured deserialization errors:

SELECT * FROM my_source_table_error_log;

Each record contains the error details and, when available, the original Kafka record (topic, partition, offset, key, and value) that caused the error.

Limitations and known behaviors

  • DLQ error handling applies only to deserialization errors at the source. Errors in UDFs, serialization, or windowed aggregations are not routed to the DLQ. For UDF error handling, see Error handling best practices.

  • Error handling is configured at the table level, not per statement. All statements reading from a table with error-handling.mode set to log share the same DLQ configuration.

  • Each source table can have one DLQ target at a time. DLQ configuration is set at the source table level, not per job or per consumer.

Best practices

  • Use a dedicated service account for DLQ setup when consuming from shared or team-owned source topics. Because configuring a DLQ requires DeveloperManage permissions on the source topic, consider using a platform or admin service account to run the ALTER TABLE statement. This avoids granting elevated permissions on source topics to individual consumer service accounts.