Materialize Change Data Capture (CDC) Streams with Tableflow in Confluent Cloud

Tableflow supports the Debezium change log format and the append-only change log mode. Change data capture (CDC) change logs can be ingested into Apache Kafka® topics using any Debezium CDC source connector, such as those for MySQL, PostgreSQL, or SQL Server. After the CDC events are written to Kafka, you can enable Tableflow on those topics to process and materialize the data efficiently.

To ensure compatibility, the CDC connector must be explicitly configured with After-state only = true, so that only the latest state of each change event is captured in the stream.

Configure Debezium for CDC

Tableflow does not natively support the *-debezium-registry value format in append and upsert write modes, but you can still use Tableflow in CDC workloads that use Debezium source connectors, such as those for MySQL, PostgreSQL, or SQL Server. There are two recommended workflows today, using Confluent Cloud for Apache Flink® to decode or use the after.state.only connector property.

Use with Flink decoding layer

To use Tableflow in the upsert write mode with Debezium messages you should use Confluent Cloud for Apache Flink as a decoding layer. Confluent Cloud for Apache Flink supports value.format = *-debezium-registry which automatically opens the Debezium envelope and creates a table that replicates the CDC source schema. You can then use Confluent Cloud for Apache Flink to copy the decoded table into another topic and enable Tableflow on that topic.

Here are the steps:

  1. Create a compacted topic and a Debezium source connector.

  2. Register the Debezium schema as a data contract on the topic.

  3. Run the following Flink statement:

    SHOW CREATE TABLE <topic-name>;
    
  4. Copy the previous statement and change changelog.mode to upsert and value.format to avro-registry. Then create a new table with the modified statement.

  5. Create a Flink job which inserts the data from the source table into the new table, for example:

    INSERT INTO <new-topic> SELECT * FROM <source-topic>;
    

You can now enable Tableflow on the newly created topic and query your data without manually handling the Debezium envelope.

Use without Flink decoding layer

You can also use Tableflow with Debezium source connectors by setting after.state.only to true, which results in Debezium producing Kafka messages that closely resemble the schema of your source CDC table. This option enables you to use Tableflow with Debezium messages without Confluent Cloud for Apache Flink as a decoding layer.

  • When using the Tableflow append write mode, you should have tombstones.on.delete set to false, or have your error handling mode set to skip.
  • When using the Tableflow upsert write mode, you should have tombstones.on.delete set to true, otherwise rows won’t be deleted from the table when they are deleted from the source.

Note

This website includes content developed at the Apache Software Foundation under the terms of the Apache License v2.