Write Modes with Tableflow in Confluent Cloud
Tableflow can operate in two distinct write modes, APPEND and UPSERT. The Tableflow write mode is determined by a combination of your topic’s Apache Kafka® cleanup.policy and Flink changelog.mode. Both of these configurations can be set in Flink CREATE TABLE or ALTER TABLE statements.
Using the Flink property names, the following combinations are supported by Tableflow:
kafka.cleanup-policy | changelog.mode | Tableflow Write Mode |
|---|---|---|
delete | append | APPEND |
compact | upsert | UPSERT |
delete, compact | upsert | UPSERT |
Write mode can’t be changed while Tableflow is enabled. To change write mode, you must disable Tableflow, change your Kafka or Flink configurations, and then re-enable Tableflow, which creates a new table.
Use with APPEND mode
When Tableflow write mode is set to APPEND, Kafka records are written indelibly to the table. Tableflow interprets every Kafka record as a new row that is added to the table exactly-once.
Use with UPSERT mode
When Tableflow write mode is set to UPSERT, the Kafka message key and partition number form the composite primary key of the Tableflow table. Upsert tables allow you to insert, update, and delete rows from a table. Tableflow retains the most recent version of data for each key in the table. This is the same functional behavior as a Flink table in upsert mode. When in upsert mode, Tableflow maintains an additional index table to improve performance at scale.
If your key size exceeds 256 bytes, Tableflow uses a hash of your key to check for uniqueness. When using the upsert write mode, you are billed on Topic-Hours and GB Processed dimensions. The GB Processed dimension also includes processing related to index operations.
If the fields of your key are stored in both the Kafka key and value, the values in the Kafka record value are used, not the values in the Kafka record key.
Note
Upsert tables have a maximum limit of 30 billion unique keys.
Performance
Tableflow tables in the upsert write mode that have fewer than 6 billion unique rows are rated to support 20,000 Kafka events per-second. Tables with between 6 billion and 8 billion unique rows are rated to support 10,000 Kafka events per second. While there is no hard limit on the number of rows in a Tableflow upsert table, tables with more than 8 billion unique rows may see a significant reduction in the maximum number of Kafka events per-second that can be materialized by the table without falling behind.
Deletes
To delete rows in a Tableflow upsert table, you must send tombstone messages to your Kafka topic. Tombstone messages are Kafka messages that have a key, but an empty value. In a compacted Kafka topic, tombstones can be used to delete specific Kafka messages within a topic. Similarly, Tableflow removes a row from the table when it encounters a tombstone message with a matching key.
Partition count increases and duplicate rows
If you increase the partition count on a compacted topic that you use with upsert mode, you might cause duplicate rows in your materialized table. Don’t increase partition count unless you understand the impact on key uniqueness.
If you increase the partition count, Kafka producers might route keys that were previously on one partition to a different partition. This affects deduplication because Kafka log compaction and Tableflow upsert logic are both scoped to individual partitions. If the same key appears in two partitions, it creates two separate rows in the materialized table.
To remove duplicate rows, send tombstone messages for the affected keys to their original partitions. Both Kafka log compaction and Tableflow then remove the outdated records, leaving only the record from the new partition.
Limitations
If you use upsert write mode with Debezium Change Data Capture, refer to Materialize Change Data Capture (CDC) Streams for current limitations.
In upsert mode, Tableflow maintains an index that contains message keys for faster insertion. The total size of all keys must not exceed the index size.
In upsert mode, Tableflow does not support evolving the key schemas.