Transform a Stream With KSQL

KSQL enables streaming transformations, which you can use to convert streaming data from one format to another in real time. With a streaming transformation, not only is every record that arrives on the source stream converted, but you can configure KSQL so that all previously existing records in the stream are converted.

Tip

Run the following to tell KSQL to read from the beginning of the topic:

SET 'auto.offset.reset' = 'earliest';

You can skip this if you have already run it within your current KSQL CLI session.

Transform a Stream By Using the WITH Clause

These are the aspects of a stream that you can change when you transform to a new stream:

  • The data format for message values
  • The number of partitions
  • The number of replicas
  • The timestamp field and/or the timestamp format
  • The new stream's underlying Apache Kafka® topic name

For this example, imagine that you want to create a new stream by transforming a pageviews stream in the following way:

  • The viewtime column value is used as the record timestamp in the new stream’s underlying Kafka topic.
  • The new stream’s Kafka topic has five partitions.
  • The data in the new stream is in JSON format.
  • A new column is added that shows the message timestamp in human-readable string format.
  • The userid column is the key for the new stream.

The following statement generates a new stream, named pageviews_transformed, that has the specified properties:

CREATE STREAM pageviews_transformed
  WITH (TIMESTAMP='viewtime',
        PARTITIONS=5,
        VALUE_FORMAT='JSON') AS
  SELECT viewtime,
         userid,
         pageid,
         TIMESTAMPTOSTRING(viewtime, 'yyyy-MM-dd HH:mm:ss.SSS') AS timestring
  FROM pageviews
  PARTITION BY userid;

Content-based Routing

Frequently, you need to route messages from a source stream to multiple destination streams, based on conditions in the data. This is content-based routing or data routing.

Use the WHERE clause to select a subset of data. To route streams with different criteria to other streams that are backed by different underlying Kafka topics, write multiple KSQL queries with different WHERE clauses.

In this example, two streams are derived from a pageviews stream, both with different users selected into the output.

CREATE STREAM pageviews_for_first_two_users AS
  SELECT viewtime,
         userid,
         pageid
  FROM pageviews
  WHERE userid='User_1' OR userid='User_2'
  PARTITION BY userid;
CREATE STREAM pageviews_for_other_users AS
  SELECT viewtime,
         userid,
         pageid
  FROM pageviews
  WHERE userid<>'User_1' AND userid<>'User_2'
  PARTITION BY userid;