Configure Tableflow in Confluent Cloud

Snapshot retention

Snapshot retention involves managing metadata that enables you to query a previous state of your table, also known as “time-travel queries”. Tableflow creates a snapshot every time it commits a change to your table. This includes any time Tableflow adds or updates data to your table, and when it performs maintenance tasks, like compaction.

Tableflow always maintains a minimum number of snapshots, but you can configure how long additional snapshots should be retained before they are expired by setting the retention_ms configuration. When a snapshot is past its expiration time or more than 100 snapshots exist, Tableflow removes the snapshot from the table asynchronously, as well as any data files that are no longer needed by any remaining snapshots.

Data TTL

With the data time-to-live (TTL) setting, you can configure Tableflow to automatically delete old data in your tables. Tableflow deletes table rows when they become older than the retention period that you specify. By setting the TTL, you can reduce storage costs for situations where you only need data for a limited time. For example, data TTL is useful if you analyze only recent data or if you run periodic aggregations, such as hourly or daily summaries.

Note

Data TTL in Tableflow is available to some Confluent customers as a Limited Availability feature. For those with access, Confluent fully supports data TTL and recommends it for production use. To request access, contact Confluent Support.

Tableflow measures the age of a row based on the timestamp of the corresponding Kafka record.

Data TTL differs from snapshot retention, which manages metadata for time-travel queries but does not delete rows from your table.

How data TTL works

When you configure data TTL on a table, Tableflow asynchronously deletes each row that exceeds your retention period.

First, Tableflow logically deletes expired rows in your table after a waiting period. Tableflow waits up to 24 hours after at least one row in a Parquet file expires. Then, Tableflow logically deletes the expired rows, which makes them invisible to queries. If all rows in the file expire within that time, Tableflow will delete the file. Whereas, if only some rows expire after 24 hours, Tableflow will remove those rows. This action requires compaction, which means Tableflow creates a new file without the expired rows. The waiting period minimizes two impacts on query performance: the logical deletions and the compaction.

Subsequently, Tableflow physically deletes the Parquet file. Once every snapshot that references a Parquet file expires, Tableflow physically deletes the file from storage. By waiting for snapshots to expire, Tableflow ensures that you can do time-travel queries for as long as your snapshot retention settings permit.

Configure data TTL

To configure the data TTL setting, use the Confluent Cloud API. Set the data_retention_ms parameter when you create or update a table. Specify the retention period in milliseconds. Tableflow rounds up this period to the nearest day.

The following example configures data TTL to retain data for 90 days:

{
  "spec": {
    "display_name": "high-volume-events",
    "config": {
      "data_retention_ms": "7776000000"
    },
    "table_formats": ["ICEBERG"],
    "environment": {"id": "env-xxxxx"},
    "kafka_cluster": {"id": "lkc-xxxxx"}
  }
}

Error-handling mode

Tableflow offers three modes for handling per-record materialization failures: suspend, skip, and log. The default mode is suspend. Assign the value to the error-handling.mode table property, for example:

ALTER TABLE my_table SET (
 'error-handling.mode' = 'log'
);

You can configure error-handling mode by using Tableflow configurations or the Flink error-handling.mode table property.

Suspend mode

The default mode, suspend, causes Tableflow to enter the Pause state whenever a record can’t be materialized and added to the table. This means that in situations where your topic ingests a corrupted record, Tableflow will pause processing on that record.

Failures that occur for reasons that are not record-specific always cause Tableflow to enter the Pause state, regardless of the configured error-handling.mode. This includes, but is not limited to, catalog- and storage-access related errors and invalid schema changes.

Skip mode

When the Tableflow failure strategy is set to skip, Tableflow skips over records that fail to materialize. Tableflow reports the number of skipped records in the rows_skipped metric.

Log mode

When the Tableflow failure strategy is set to log, Tableflow sends records that fail to materialize to a Dead Letter Queue (DLQ) table. The DLQ is an Apache Kafka® topic in the same cluster as your Tableflow table. When records are sent to the DLQ, they are also recorded in the rows_skipped metric.

Important

Failure handling with log mode is available only for topics that are configured with Avro or Protobuf schemas.

Error-handling log target

When the Tableflow error-handling mode is set to log, you can optionally set a specific topic by creating a new topic or using an existing topic for your DLQ. The DLQ topic must follow the DLQ topic schema.

If no destination for the DLQ is specified, Tableflow defaults to using a topic named error_log.

The account enabling or updating Tableflow to use the log error-handling mode must have write permissions on the topic that is used for the DLQ, otherwise Tableflow encounters an error and enters the Pause state.

Assign the value to the error-handling.log.target table property, for example:

ALTER TABLE my_table SET (
 'error-handling.log.target' = 'my_error_topic'
);

You can configure error-handling mode by using Tableflow configurations or the Flink error-handling.log.target table property.

DLQ topic schema

Tableflow DLQ topics must use the following Avro schema to operate correctly. If you create the topic as part of DLQ enablement, it’s automatically registered with this schema. For the full schema definitions in Avro, JSON Schema, and Protobuf (including both key and value schemas), see Schema definitions for manual registration.

DLQ topic schema
{
  "fields": [
    {
      "name": "error_timestamp",
      "type": {
        "logicalType": "timestamp-millis",
        "type": "long"
      }
    },
    {
      "name": "error_code",
      "type": "int"
    },
    {
      "name": "error_reason",
      "type": "string"
    },
    {
      "name": "error_message",
      "type": "string"
    },
    {
      "name": "error_details",
      "type": {
        "type": "map",
        "values": [
          "null",
          "string"
        ]
      }
    },
    {
      "name": "processor",
      "type": "string"
    },
    {
      "default": null,
      "name": "statement_name",
      "type": [
        "null",
        "string"
      ]
    },
    {
      "name": "affected_type",
      "type": "string"
    },
    {
      "default": null,
      "name": "affected_catalog",
      "type": [
        "null",
        "string"
      ]
    },
    {
      "default": null,
      "name": "affected_database",
      "type": [
        "null",
        "string"
      ]
    },
    {
      "default": null,
      "name": "affected_name",
      "type": [
        "null",
        "string"
      ]
    },
    {
      "default": null,
      "name": "source_record",
      "type": [
        "null",
        {
          "fields": [
            {
              "default": null,
              "doc": "The topic of the Kafka source record.",
              "name": "topic",
              "type": [
                "null",
                "string"
              ]
            },
            {
              "default": null,
              "doc": "The partition of the Kafka source record.",
              "name": "partition",
              "type": [
                "null",
                "int"
              ]
            },
            {
              "default": null,
              "doc": "The offset of the Kafka source record.",
              "name": "offset",
              "type": [
                "null",
                "long"
              ]
            },
            {
              "default": null,
              "doc": "The timestamp of the Kafka source record. The value is in milliseconds since epoch. The specific meaning of the timestamp depends on timestamp_type.",
              "name": "timestamp",
              "type": [
                "null",
                {
                  "logicalType": "timestamp-millis",
                  "type": "long"
                }
              ]
            },
            {
              "default": null,
              "doc": "The type of the timestamp in the Kafka source record. Possible values are 'CREATE_TIME' and 'LOG_APPEND_TIME'.",
              "name": "timestamp_type",
              "type": [
                "null",
                "string"
              ]
            },
            {
              "default": null,
              "doc": "The headers of the Kafka source record. The keys are strings and values are byte arrays. Note that for multiple headers with the same key, only the first one is kept in the map.",
              "name": "headers",
              "type": [
                "null",
                {
                  "type": "map",
                  "values": [
                    "null",
                    "bytes"
                  ]
                }
              ]
            },
            {
              "default": null,
              "doc": "The key of the Kafka source record. May be null if the record has no key.",
              "name": "key",
              "type": [
                "null",
                "bytes"
              ]
            },
            {
              "default": null,
              "doc": "The value of the Kafka source record. May be null if the record has no value (tombstone record).",
              "name": "value",
              "type": [
                "null",
                "bytes"
              ]
            }
          ],
          "name": "stocks_realtime_feed_dlq_value_source_record",
          "type": "record"
        }
      ]
    }
  ],
  "name": "stocks_realtime_feed_dlq_value",
  "namespace": "org.apache.flink.avro.generated.record",
  "type": "record"
}