How to Convert a Changelog to a Table in ksqlDB for Confluent Platform

Context

You have a topic or a stream of events that represent a series of changes, known as a changelog. You want a view of the data that reflects only the last change for each key. Because ksqlDB represents change over time using tables, you need a way to convert your changelog into a table. This is broadly called materializing a changelog into a table.

Materializing a changelog topic

If you have a changelog topic, and you want a view of the data that reflects the latest values for each key then simply create a table with the changelog topic using the CREATE SOURCE TABLE statement.

Let’s say that you have the following data in your changelog topic where the first row is the record that has the earliest offset and the last row is the record that has the latest offset:

+------+------+------+------+
|K     |V1    |V2    |V3    |
+------+------+------+------+
|k1    |0     |a     |true  |
|k2    |1     |b     |false |
|k1    |2     |c     |false |
|k3    |3     |d     |true  |
|k2    |4     |e     |true  |

Note that there are 2 records for the keys k1 and k2 in the changelog topic.

Make a table latest_view with four columns. k represents the key of the table. Rows with the same key represent information about the same entity. v1, v2, and v3 are various value columns.

CREATE SOURCE TABLE latest_view (
    k VARCHAR PRIMARY KEY,
    v1 INT,
    v2 VARCHAR,
    v3 BOOLEAN
) WITH (
    kafka_topic = 'changelog',
    partitions = 1,
    value_format = 'JSON'
);

Now, you can view the latest values for each key in your changelog topic by issuing a pull query against the latest_view table that you created above:

SELECT * FROM latest_view;
+------+------+------+------+
|K     |V1    |V2    |V3    |
+------+------+------+------+
|k1    |2     |c     |false |
|k2    |4     |e     |true  |
|k3    |3     |d     |true  |

Notice how for each key, the columns reflect the latest set of values.

If you want to look up the latest value for a particular key, for example, k2, issue a pull query for that key:

SELECT * FROM latest WHERE k = 'k2';
+------+------+------+------+
|K     |V1    |V2    |V3    |
+------+------+------+------+
|k2    |4     |e     |true  |

Materializing a changelog STREAM

The best way to materialize an input topic as a table is the CREATE SOURCE TABLE statement. If you already have a STREAM, you can also materialize it as a table with the CREATE TABLE ... AS ... statement. This is useful if you have a STREAM of events in ksqlDB that represents a series of changes called changelog_stream, and you want a view of the data that reflects the latest values for each key.

Begin by telling ksqlDB to start all queries from the earliest point in each topic:

SET 'auto.offset.reset' = 'earliest';

Let’s mimic adding a few records to the changelog_stream by using the INSERT INTO statement:

INSERT INTO changelog_stream (
    k, v1, v2, v3
) VALUES (
    'k1', 0, 'a', true
);

INSERT INTO changelog_stream (
    k, v1, v2, v3
) VALUES (
    'k2', 1, 'b', false
);

INSERT INTO changelog_stream (
    k, v1, v2, v3
) VALUES (
    'k1', 2, 'c', false
);

INSERT INTO changelog_stream (
    k, v1, v2, v3
) VALUES (
    'k3', 3, 'd', true
);

INSERT INTO changelog_stream (
    k, v1, v2, v3
) VALUES (
    'k2', 4, 'e', true
);

You can view the data in the changelog_stream by issuing a pull query:

SELECT * FROM changelog_stream;
+------+------+------+------+
|K     |V1    |V2    |V3    |
+------+------+------+------+
|k1    |0     |a     |true  |
|k2    |1     |b     |false |
|k1    |2     |c     |false |
|k3    |3     |d     |true  |
|k2    |4     |e     |true  |
Query terminated

To view the data that reflects only the last change for each key in changelog_stream, you can derive a table called latest_events with a CREATE TABLE AS SELECT statement in conjunction with the LATEST_BY_OFFSET aggregation:

CREATE TABLE latest_events AS
    SELECT k,
           LATEST_BY_OFFSET(v1) AS v1,
           LATEST_BY_OFFSET(v2) AS v2,
           LATEST_BY_OFFSET(v3) AS v3
    FROM changelog_stream
    GROUP BY k
    EMIT CHANGES;

Now, you can view the latest values for each key in your changelog_stream by issuing a pull query against the latest_events table that you created above:

SELECT * FROM latest_events;
+------+------+------+------+
|K     |V1    |V2    |V3    |
+------+------+------+------+
|k1    |2     |c     |false |
|k2    |4     |e     |true  |
|k3    |3     |d     |true  |

Notice how for each key, the columns reflect the latest set of values.