Upgrading ksqlDB

Upgrading to ksqlDB 6.2 from ksqlDB 6.1

For a complete list of changes, see the ksqlDB changelog.

Note

Queries written for ksqlDB 6.1 continue to run without changes under ksqlDB 6.2. New queries issued on 6.2 may have different behavior, even if the text of the query statement is the same.

Breaking Changes

  • Queries with GROUP BY clauses that contain multiple grouping expressions now result in multiple key columns, one for each grouping expression, rather than a single key column that is the string-concatenation of the grouping expressions. Note that this new behavior (and breaking change) apply only to new queries; existing queries will continue to run uninterrupted with the previous behavior, even across ksqlDB server upgrades.
  • Stream-table key-to-key joins on mismatched formats now repartition the table (right hand side) instead of the stream. Old enqueued commands are not affected, so this change should remain invisible to the end-user.

Upgrading to ksqlDB 6.1 from ksqlDB 6.0

For a complete list of changes, see the ksqlDB changelog.

Note

Queries written for ksqlDB 6.0 continue to run without changes under ksqlDB 6.1. New queries issued on 6.1 may have different behavior, even if the text of the query statement is the same.

Breaking Changes

  • ksqlDB now creates windowed tables with cleanup policy compact,delete, rather than compact. Also, topics that back streams are always created with cleanup policy delete, rather than the broker default (by default, delete).

  • ksqlDB no longer emits unnecessary tombstones when a HAVING clause filters out a row from the source that is not in the output table. For example, given:

    -- source stream:
    CREATE STREAM FOO (ID INT KEY, VAL INT) WITH (...);
    -- aggregate into a table:
    CREATE TABLE BAR AS
    SELECT ID, SUM(VAL) AS SUM
    FROM FOO
    GROUP BY ID
    HAVING SUM(VAL) > 0;
    -- insert some values into the stream:
    INSERT INTO FOO VALUES(1, -5);
    INSERT INTO FOO VALUES(1, 6);
    INSERT INTO FOO VALUES(1, -2);
    INSERT INTO FOO VALUES(1, -1);
    

    Where previously the contents of the sink topic BAR would have contained records:

    Key Value Notes
    1 null Spurious tombstone: the table does not contain a row with key 1, so no tombstone is required.
    1 {sum=1} Row added as HAVING criteria now met
    1 null Row deleted as HAVING criteria now not met
    1 null Spurious tombstone: the table does not contain a row with key 1, so no tombstone is required.

    The topic will now contain:

    Key Value
    1 {sum=1}
    1 null

Upgrading to ksqlDB 6.0 from ksqlDB 5.5

For a complete list of changes, see the ksqlDB changelog.

Note

Queries written for ksqlDB 5.5 continue to run without changes under ksqlDB 6.0. New queries must use the updated 6.0 syntax.

Note

When you upgrade from from ksqlDB 5.5 to ksqlDB 6.0 and later, you may encounter this issue: State Store Passes Wrong Changelog Topic to Serde for Optimized Source Tables. Upgraded ksqlDB applications may still run, but symptoms include spamming your Schema Registry logs with serialization exceptions. To correct this issue, delete the “phantom” changelog subject. For more information, see The Curious Incident of the State Store in Recovery in ksqlDB.

Breaking Changes

  • The “select star” statement (select *), no longer expands to include ROWTIME column(s). Instead, ROWTIME is included in the results of queries only if explicitly included in the projection, for example. select rowtime, *. This change affects only new statements. Any view previously created via a CREATE STREAM AS SELECT or CREATE TABLE AS SELECT statement is unaffected.

  • This release changes the system-generated column name for any columns in projections dereference fields in a struct. Previously, the full path was used when generating the name. In 6.0, only the final field name is used. For example, SELECT someStruct->someField, ... previously generated a column name of SOMESTRUCT__SOMEFIELD and now generates a name of SOMEFIELD. Generated column names may have a numeral appended to the end to ensure uniqueness, for example SOMEFIELD_2.

    Note

    We recommend that you do not rely on system-generated column names for production systems, because naming logic may change between releases. Providing an explicit alias ensures consistent naming across releases, for example, SELECT someStruct->someField AS someField. For backward compatibility, existing running queries aren’t affected by this change, and they will continue to run with the same column names. Any statements executed after the upgrade will use the new names, where no explicit alias is provided. Add explicit aliases to your statements if you require the old names, for example: SELECT someStruct->someField AS SOMESTRUCT__SOMEFIELD, ....

  • In version 5.5, queries that referenced a single GROUP BY column in the projection would fail if they were resubmitted, due to a duplicate column. In 6.0, the same existing queries will continue to run if they’re running already, which means that this change affects only newly submitted queries. Existing queries use the earlier query semantics.

  • Push queries that rely on auto-generated column names may see changes in column names. Pull queries and any existing persistent queries are unaffected, for example, those created with CREATE STREAM AS SELECT, CREATE TABLE AS SELECT, or INSERT INTO.

  • ksqlDB Server no longer ships with Jetty. This means that when you start the server, you must supply Jetty-specific dependencies, like certain login modules used for HTTP Basic authentication, by using the KSQL_CLASSPATH environment variable for ksqlDB to find them.

Any key name

Statements containing PARTITION BY, GROUP BY, or JOIN clauses now produce different output schemas.

For PARTITION BY and GROUP BY clauses, the following rules define how the name of the key column in the result is determined:

  1. Where the partitioning or grouping is a single-column reference, then the key column has the same name as this column, for example:

    -- OUTPUT has a key column named X;
    CREATE STREAM OUTPUT AS
      SELECT *
      FROM INPUT
      GROUP BY X;
    
  2. Where the partitioning or grouping is a single field in a struct, the key column has the same name as the field. For example:

    -- OUTPUT has a key column named FIELD1;
    CREATE STREAM OUTPUT AS
      SELECT *
      FROM INPUT
      GROUP BY X->field1;
    
  3. Otherwise, the key column name is system-generated and has the form KSQL_COL_n, where n is a positive integer.

In all cases, except where grouping by more than one column, you can set the new key column’s name by defining an alias in the projection, for example:

 -- OUTPUT has a key column named ID.
 CREATE TABLE OUTPUT AS
   SELECT
     USERID AS ID,
     COUNT(*)
   FROM USERS
   GROUP BY ID;

For groupings of multiple expressions, you can’t provide a name for the system-generated key column. You can work around this by combining the grouping columns manually, which enables you to provide an alias, for example:

-- products_by_sub_cat has a key column named COMPOSITEKEY:
CREATE TABLE products_by_sub_cat AS
  SELECT
    categoryId + ‘§’ + subCategoryId AS compositeKey
    SUM(quantity) as totalQty
  FROM purchases
  GROUP BY CAST(categoryId AS STRING) + ‘§’ + CAST(subCategoryId AS STRING);

For JOIN statements, the name of the key column in the result is determined by the join criteria.

  1. For INNER and LEFT OUTER joins where the join criteria contain at least one column reference, the key column is named based on the left-most source whose join criteria is a column reference, for example:

    -- OUTPUT has a key column named I2_ID.
    CREATE TABLE OUTPUT AS
      SELECT *
      FROM I1
        JOIN I2 ON abs(I1.ID) = I2.ID JOIN I3 ON I2.ID = I3.ID;
    

    You can give the key column a new name, if required, by defining an alias in the projection, for example:

    -- OUTPUT has a key column named ID.
    CREATE TABLE OUTPUT AS
      SELECT
        I2.ID AS ID,
        I1.V0,
        I2.V0,
        I3.V0
      FROM I1
        JOIN I2 ON abs(I1.ID) = I2.ID
        JOIN I3 ON I2.ID = I3.ID;
    
  2. For FULL OUTER joins and other joins where the join criteria are not on column references, the key column in the output is not equivalent to any column from any source. The key column has a system-generated name in the form KSQL_COL_n, where n is a positive integer, for example:

    -- OUTPUT has a key column named KSQL_COL_0, or similar.
    CREATE TABLE OUTPUT AS
      SELECT *
      FROM I1
        FULL OUTER JOIN I2 ON I1.ID = I2.ID;
    

    You can give the key column a new name, if required, by defining an alias in the projection. A new UDF, named JOINKEY, has been introduced in 6.0 to help define the alias. It takes the join criteria as its parameters, for example:

    -- OUTPUT has a key column named ID.
    CREATE TABLE OUTPUT AS
      SELECT
        JOINKEY(I1.ID, I2.ID) AS ID,
        I1.V0,
        I2.V0
      FROM I1
        FULL OUTER JOIN I2 ON I1.ID = I2.ID;
    

Explicit keys

In version 6.0, new CREATE TABLE statements will fail if the PRIMARY KEY column isn’t provided. For example, you must update a statement like following to include the definition of the PRIMARY KEY:

CREATE TABLE FOO (
    name STRING
  ) WITH (
    kafka_topic='foo',
    value_format='json'
  );

Update the previous statement to include the definition of the PRIMARY KEY:

 CREATE TABLE FOO (
     ID STRING PRIMARY KEY,
     name STRING
   ) WITH (
     kafka_topic='foo',
     value_format='json'
   );

If you load the value columns of the topic from Schema Registry, also known as “schema inference”, you can provide the primary key as a partial schema, for example:

-- FOO has value columns loaded from Schema Registry
CREATE TABLE FOO (
    ID INT PRIMARY KEY
  ) WITH (
    kafka_topic='foo',
    value_format='avro'
  );

CREATE STREAM statements that don’t define a KEY column no longer have an implicit ROWKEY key column, for example:

CREATE STREAM BAR (
    NAME STRING
  ) WITH (...);

In version 5.5, the previous statement would have resulted in a stream that had two columns: ROWKEY STRING KEY and NAME STRING. With this change, the previous statement results in a stream that has only the NAME STRING column.

Streams with no KEY column are serialized to Apache Kafka® topics with a null key.

Key columns required in projection

A statement that creates a materialized view must include the key columns in the projection, for example:

 CREATE TABLE OUTPUT AS
   SELECT
       productId,            -- key column in projection
       SUM(quantity) as unitsSold
   FROM sales
   GROUP BY productId;

The key column productId is required in the projection. In previous versions of ksqlDB, the presence of productId in the projection would have placed a copy of the data into the value of the underlying Kafka topic’s record. But starting in version 6.0, the projection must include the key columns, and ksqlDB stores these columns in the key of the underlying Kafka record. Optionally, you may provide an alias for the key column(s), for example:

 CREATE TABLE OUTPUT AS
   SELECT
       productId as id,            -- aliased key column
       SUM(quantity) as unitsSold
   FROM sales
   GROUP BY productId;

If you need a copy of the key column in the Kafka record’s value, use the AS_VALUE function to indicate this to ksqlDB. For example, the following statement produces an output like earlier versions of ksqlDB for the previous example materialized view:

 CREATE TABLE OUTPUT AS
   SELECT
       productId as ROWKEY,                -- key column named ROWKEY
       AS_VALUE(productId) as productId,   -- productId copied into value
       SUM(quantity) as unitsSold
   FROM sales
   GROUP BY productId;

WITH(KEY) syntax removed

In previous versions, all key columns were named ROWKEY. To enable using a more user-friendly name for the key column in queries, you could supply an alias for the key column in the WITH clause, for example:

 CREATE TABLE INPUT (
     ROWKEY INT PRIMARY KEY,
     ID INT,
     V0 STRING
   ) WITH (
     key='ID',    -- removed in ksqlDB 6.0
     ...
   );

With the previous query, the ID column can be used as an alias for ROWKEY. This approach required the Kafka message value to contain an exact copy of the key. KLIP-24 removed the restriction that key columns must be named ROWKEY, eliminating the need for the WITH(KEY) syntax, which has been removed. Also, this change removed the requirement for the Kafka message value to contain an exact copy of the key.

Update your queries by removing the KEY from the WITH clause and naming your KEY and PRIMARY KEY columns appropriately. For example, you can rewrite the previous CREATE TABLE statement like this:

 CREATE TABLE INPUT (
     ID INT PRIMARY KEY,
     V0 STRING
   ) WITH (...);

This doesn’t work when the value format is DELIMITED, because the value columns are order dependent, so dropping the ID value column would result in a deserialization error or the wrong values being loaded. If you’re using DELIMITED, consider rewriting the previous example like this:

 CREATE TABLE INPUT (
     ID INT PRIMARY KEY,
     ignoreMe INT,
     V0 STRING
   ) WITH (...);

Basic+Bearer authentication

In ksql-server.properties, remove the following configuration settings:

  • rest.servlet.initializor.classes
  • websocket.servlet.initializor.classes

Add the following setting:

ksql.authentication.plugin.class=io.confluent.ksql.security.VertxBearerOrBasicAuthenticationPlugin

Upgrading to ksqlDB 5.5 from KSQL 5.4

Warning

The upgrade from KSQL 5.4 to ksqlDB 5.5 is not a rolling restart. You must shut down all KSQL instances and then start up all ksqlDB instances, so there will be downtime.

Complete the following steps to perform the upgrade from KSQL 5.4 to ksqlDB 5.5:

  1. Capture existing SQL statements
  2. Stop clients from writing to KSQL
  3. Stop the existing KSQL deployment
  4. Deploy a new ksqlDB cluster with a new service ID
  5. Set up security (optional)
  6. Recompile user-defined functions (optional)
  7. Replay SQL statements that you captured in the first step

Capture existing SQL statements

To capture existing SQL statements, we recommend using the kafka-console-consumer to output the existing KSQL command topic. The following example command shows how to pipe the output to jq and save the SQL commands to a statements.sql file.

Note

You must provide credentials for the kafka-console-consumer command by using the -consumer.config option. For more information, see Encryption and Authentication with SSL.

# export KSQL_SERVICE_ID=<ksql.service.id>
# export BROKER=localhost
# export PORT=9092
./bin/kafka-console-consumer --bootstrap-server ${BROKER}:${PORT} --topic _confluent-ksql-${KSQL_SERVICE_ID}_command_topic --from-beginning | jq -r ".statement" > statements.sql

To get the kafka-console-consumer tool, install Confluent Platform.

Look through the statements to make sure that the command worked as expected. Also, you may want to remove CREATE/DROP pairings, because you will execute all of these statements in the new cluster.

Stop clients that write to KSQL

To prevent data loss, stop all client applications and producers that write to the KSQL cluster.

Stop the existing KSQL deployment

Stop the KSQL cluster. The procedure for stopping the cluster varies depending on your deployment. For example, we recommend using systemctl for RPM deployments. If you’re using a docker-compose stack, you might use the docker-compose down command.

Install ksqlDB packages

If your deployment uses DEB or RPM release artifacts, you must uninstall the old packages and install the new ones. Because the configuration directory has changed from /etc/ksql to /etc/ksqldb you must also copy any configuration files to the new location:

cp -p ${CONFLUENT_HOME}/etc/ksql/* ${CONFLUENT_HOME}/etc/ksqldb/

Ensure that all of the files have been copied, with the correct owner, group, and file permissions. If the permissions aren’t correct, change them manually.

Change the ksqlDB service ID

Different deployment strategies configure ksqlDB differently, but you must use a different value for ksql.service.id before you start the new ksqlDB server. If you use the old value, the server won’t start. Here are some common deployment mechanisms and how to change this configuration:

  • Debian/RPM: change the property in ${CONFLUENT_HOME}/etc/ksqldb/ksql-server.properties.
  • Docker: change the environment variable KSQL_KSQL_SERVICE_ID.
  • Confluent for Kubernetes (CFK): see Upgrading with CFK.

Set up security (optional)

If you have security enabled, set up security for your ksqlDB app. ksqlDB supports using role-based access control (RBAC), ACLs, and no authorization.

Create new role bindings or assign ACLs for the ksql service principal:

  • Topic: __consumer_offsets

  • Topic: __transaction_state

  • TransactionalId: The value that you set in the configuration file, for example, ksqldb_.

    If you’re using ACLs for security, these ACLs are required:

    • DESCRIBE operation on the TOPIC with LITERAL name __consumer_offsets.
    • DESCRIBE operation on the TOPIC with LITERAL name __transaction_state.
    • DESCRIBE and WRITE operations on the TRANSACTIONAL_ID with LITERAL name <ksql.service.id>.

    If you’re using RBAC for security, these role assignments are required:

    • DeveloperRead role on the __consumer_offsets topic.
    • DeveloperRead role on __transaction_state topic.
    • DeveloperWrite role on the <ksql.service.id> TransactionalId.

Recompile user-defined functions (optional)

If your KSQL application uses user-defined functions, you must recompile them with the upgraded dependencies. For more information, see ksqlDB Custom Function Reference (UDF, UDAF, and UDTF).

Start ksqlDB

Start the ksqldb service. The procedure for starting the cluster varies depending on your deployment. For example, we recommend using systemctl for RPM deployments. If you’re using a docker-compose stack, you might use the docker-compose up command.

Replay SQL statements

To replay SQL statements, start the ksqlDB CLI and issue RUN SCRIPT <path-to-statements.sql>;.

Important

There have been backward-incompatible syntax changes between KSQL and ksqlDB, and some of the statements may fail. If this happens, run the statements in statements.sql one-by-one, fixing any statements that have failed. In particular, continuous and persistent queries now require the EMIT CHANGES syntax. For more information, see Breaking Changes.

Upgrading to KSQL 5.4

Upgrade one server at a time in a “rolling restart”. The remaining servers should have sufficient spare capacity to take over temporarily for unavailable, restarting servers.

Notable changes in 5.4:

  • KSQL Server

    • Query Id generation

      • This version of KSQL includes a change to how query ids are generated for Persistent Queries (INSERT INTO/CREATE STREAM AS SELECT/CREATE TABLE AS SELECT). Previously, query ids would be incremented on every successful Persistent Query created. New query ids use the Kafka record offset of the query creating command in the KSQL command topic.

        In order to prevent inconsistent query ids, don’t create new Persistent Queries while upgrading your KSQL servers (5.3 or lower). Old running queries will retain their original id on restart, while new queries will utilize the new id convention.

        See Github PR #3354 for more info.

Upgrading from KSQL 5.2 to KSQL 5.3

Notable changes in 5.3:

  • KSQL Server

    • Avro schema compatibility

      • This version of KSQL fixes a bug where the schemas returned by UDF and UDAFs might not be marked as nullable. This can cause serialization issues in the presence of null values, as might be encountered if the UDF fails.

        With the bug fix all fields are now optional.

        This is a forward compatible change in Avro, i.e. after upgrading, KSQL will be able to read old values using the new schema. However, it is important to ensure downstream consumers of the data are using the updated schema before upgrading KSQL, as otherwise deserialization may fail. The updated schema is best obtained from running the query in another KSQL cluster, running version 5.3.

        See Github issue #2769 for more info.

  • Configuration:

    • ksql.sink.partitions and ksql.sink.replicas are deprecated. All new queries will use the source topic partition count and replica count for the sink topic instead, unless partitions and replicas are set in the WITH clause. For backward compatibility, if you specify these configs, KSQL uses them, but Confluent recommends you discontinue their use.
    • A new config variable, ksql.internal.topic.replicas, was introduced to set the replica count for the internal topics created by KSQL Server. The internal topics include command topic or config topic.

Upgrading from KSQL 5.1 to KSQL 5.2

Notable changes in 5.2:

  • KSQL Server
    • Interactive mode:
      • The use of the RUN SCRIPT statement via the REST API is now deprecated and will be removed in the next major release. (Github issue 2179). The feature circumnavigates certain correctness checks and is unnecessary, given the script content can be supplied in the main body of the request. If you are using the RUN SCRIPT functionality from the KSQL CLI, your scripts will not be affected, as this will continue to be supported. If you are using the RUN SCRIPT functionality directly against the REST API your requests will work with the 5.2 server, but will be rejected after the next major version release. Instead, include the contents of the script in the main body of your request.
  • Configuration:
    • When upgrading your headless (non-interactive) mode application from version 5.0.0 and below, you must include the configs specified in the 5.1 upgrade instructions.
    • When upgrading your headless (non-interactive) mode application, you must include the following properties in your properties file:
ksql.windowed.session.key.legacy=true
ksql.named.internal.topics=off
ksql.streams.topology.optimization=none

Version 5.1 of KSQL and earlier incorrectly excluded the end time in the record key in Kafka for session-windowed data. The ksql.windowed.session.key.legacy=true config ensures that KSQL continues to store session keys without the end time.

Version 5.1 also didn’t use Kafka Streams optimizations, which required enabling internal topic names. For compatibility, the ksql.named.internal.topics and ksql.streams.topology.optimization configs are disabled.

Upgrading from KSQL 5.0.0 and below to KSQL 5.1

  • KSQL server:
    • The KSQL engine metrics are now prefixed with the ksql.service.id. If you have been using any metric monitoring tool, you need to update your metric names. For instance, assuming ksql.service.id is set to default_, messages-produced-per-sec will be changed to _confluent-ksql-default_messages-consumed-per-sec.
  • Configuration:
    • When upgrading your headless (non-interactive) mode application, you must either update your queries to use the new SUBSTRING indexing semantics, or set ksql.functions.substring.legacy.args to true. If possible, we recommend that you update your queries accordingly, instead of enabling this configuration setting. Refer to the SUBSTRING documentation in the function guide for details on how to do so. Note that this is NOT required for interactive mode KSQL.

Upgrading from KSQL 0.x (Developer Preview) to KSQL 4.1

KSQL 4.1 is not backward-compatible with the previous KSQL 0.x developer preview releases. In particular, you must manually migrate queries running in the older preview releases of KSQL to the 4.1 version by issuing statements like CREATE STREAM and CREATE TABLE again.

Notable changes in 4.1:

  • KSQL CLI:
    • The ksql-cli command was renamed to ksql.
    • The CLI no longer supports what was formerly called “standalone” or “local” mode, where ksql-cli would run both the CLI and also a KSQL server process inside the same JVM. In 4.1, ksql will only run the CLI. For local development and testing, you can now run confluent start (which will also launch a KSQL server), followed by ksql to start the CLI. This setup is used for the Confluent Platform quickstart. Alternatively, you can start the KSQL server directly as described in Starting the ksqlDB Server, followed by ksql to start the CLI.
  • KSQL server:
    • The default listeners address was changed to http://localhost:8088 (KSQL 0.x used http://localhost:8080).
    • Assigning KSQL servers to a specific KSQL cluster has been simplified and is now done with the ksql.service.id setting. See ksql.service.id for details.
  • Executing .sql files: To run pre-defined KSQL queries stored in a .sql file, see Non-interactive (Headless) ksqlDB Usage.
  • Configuration: Advanced KSQL users can configure the Kafka Streams and Kafka producer/consumer client settings used by KSQL. This is achieved by using prefixes for the respective configuration settings. See Configure ksqlDB Server as well as Configuration Parameter Reference and Configure ksqlDB CLI for details.