Configure Tableflow in Confluent Cloud
Snapshot retention
Snapshot retention involves managing metadata that enables you to query a previous state of your table, also known as “time-travel queries”. Tableflow creates a snapshot every time it commits a change to your table. This includes any time Tableflow adds or updates data to your table, and when it performs maintenance tasks, like compaction.
Tableflow always maintains a minimum number of snapshots, but you can configure how long additional snapshots should be retained before they are expired by setting the retention_ms configuration. When a snapshot is past its expiration time or more than 1000 snapshots exist, Tableflow removes the snapshot from the table asynchronously, as well as any data files that are no longer needed by any remaining snapshots.
Assign the value to the retention_ms table property, for example:
ALTER TABLE my_table SET (
'retention_ms' = 86400000
);
Error-handling mode
Tableflow offers three modes for handling per-record materialization failures: suspend, skip, and log. The default mode is suspend. Assign the value to the error-handling.mode table property, for example:
ALTER TABLE my_table SET (
'error-handling.mode' = 'log'
);
You can configure error-handling mode by using Tableflow configurations or the Flink error-handling.mode table property.
Suspend mode
The default mode, suspend, causes Tableflow to enter the Pause state whenever a record can’t be materialized and added to the table. This means that in situations where your topic ingests a corrupted record, Tableflow will pause processing on that record.
Failures that occur for reasons that are not record-specific always cause Tableflow to enter the Pause state, regardless of the configured error-handling.mode. This includes, but is not limited to, catalog- and storage-access related errors and invalid schema changes.
Skip mode
When the Tableflow failure strategy is set to skip, Tableflow skips over records that fail to materialize. Tableflow reports the number of skipped records in the rows_skipped metric.
Log mode
When the Tableflow failure strategy is set to log, Tableflow sends records that fail to materialize to a Dead Letter Queue (DLQ) table. The DLQ is an Apache Kafka® topic in the same cluster as your Tableflow table. When records are sent to the DLQ, they are also recorded in the rows_skipped metric.
Important
Failure handling with log mode is available only for topics that are configured with Avro or Protobuf schemas.
Error-handling log target
When the Tableflow error-handling mode is set to log, you can optionally set a specific topic by creating a new topic or using an existing topic for your DLQ. The DLQ topic must follow the DLQ topic schema.
If no destination for the DLQ is specified, Tableflow defaults to using a topic named error_log.
The account enabling or updating Tableflow to use the log error-handling mode must have write permissions on the topic that is used for the DLQ, otherwise Tableflow encounters an error and enters the Pause state.
Assign the value to the error-handling.log.target table property, for example:
ALTER TABLE my_table SET (
'error-handling.log.target' = 'my_error_topic'
);
You can configure error-handling mode by using Tableflow configurations or the Flink error-handling.log.target table property.
DLQ topic schema
Tableflow DLQ topics must use the following Avro schema to operate correctly. If you create the topic as part of DLQ enablement, it’s automatically registered with this schema.
DLQ topic schema
{
"fields": [
{
"name": "error_timestamp",
"type": {
"logicalType": "timestamp-millis",
"type": "long"
}
},
{
"name": "error_code",
"type": "int"
},
{
"name": "error_reason",
"type": "string"
},
{
"name": "error_message",
"type": "string"
},
{
"name": "error_details",
"type": {
"type": "map",
"values": [
"null",
"string"
]
}
},
{
"name": "processor",
"type": "string"
},
{
"default": null,
"name": "statement_name",
"type": [
"null",
"string"
]
},
{
"name": "affected_type",
"type": "string"
},
{
"default": null,
"name": "affected_catalog",
"type": [
"null",
"string"
]
},
{
"default": null,
"name": "affected_database",
"type": [
"null",
"string"
]
},
{
"default": null,
"name": "affected_name",
"type": [
"null",
"string"
]
},
{
"default": null,
"name": "source_record",
"type": [
"null",
{
"fields": [
{
"default": null,
"doc": "The topic of the Kafka source record.",
"name": "topic",
"type": [
"null",
"string"
]
},
{
"default": null,
"doc": "The partition of the Kafka source record.",
"name": "partition",
"type": [
"null",
"int"
]
},
{
"default": null,
"doc": "The offset of the Kafka source record.",
"name": "offset",
"type": [
"null",
"long"
]
},
{
"default": null,
"doc": "The timestamp of the Kafka source record. The value is in milliseconds since epoch. The specific meaning of the timestamp depends on timestamp_type.",
"name": "timestamp",
"type": [
"null",
{
"logicalType": "timestamp-millis",
"type": "long"
}
]
},
{
"default": null,
"doc": "The type of the timestamp in the Kafka source record. Possible values are 'CREATE_TIME' and 'LOG_APPEND_TIME'.",
"name": "timestamp_type",
"type": [
"null",
"string"
]
},
{
"default": null,
"doc": "The headers of the Kafka source record. The keys are strings and values are byte arrays. Note that for multiple headers with the same key, only the first one is kept in the map.",
"name": "headers",
"type": [
"null",
{
"type": "map",
"values": [
"null",
"bytes"
]
}
]
},
{
"default": null,
"doc": "The key of the Kafka source record. May be null if the record has no key.",
"name": "key",
"type": [
"null",
"bytes"
]
},
{
"default": null,
"doc": "The value of the Kafka source record. May be null if the record has no value (tombstone record).",
"name": "value",
"type": [
"null",
"bytes"
]
}
],
"name": "stocks_realtime_feed_dlq_value_source_record",
"type": "record"
}
]
}
],
"name": "stocks_realtime_feed_dlq_value",
"namespace": "org.apache.flink.avro.generated.record",
"type": "record"
}
Note
This website includes content developed at the Apache Software Foundation under the terms of the Apache License v2.