Upgrading ksqlDB¶
Upgrading to ksqlDB 6.1 from ksqlDB 6.0¶
For a complete list of changes, see the ksqlDB changelog.
Note
Queries written for ksqlDB 6.0 continue to run without changes under ksqlDB 6.1. New queries issued on 6.1 may have different behavior, even if the text of the query statement is the same.
Breaking Changes¶
ksqlDB now creates windowed tables with cleanup policy
compact,delete
, rather thancompact
. Also, topics that back streams are always created with cleanup policydelete
, rather than the broker default (by default,delete
).ksqlDB no longer emits unnecessary tombstones when a
HAVING
clause filters out a row from the source that is not in the output table. For example, given:-- source stream: CREATE STREAM FOO (ID INT KEY, VAL INT) WITH (...); -- aggregate into a table: CREATE TABLE BAR AS SELECT ID, SUM(VAL) AS SUM FROM FOO GROUP BY ID HAVING SUM(VAL) > 0; -- insert some values into the stream: INSERT INTO FOO VALUES(1, -5); INSERT INTO FOO VALUES(1, 6); INSERT INTO FOO VALUES(1, -2); INSERT INTO FOO VALUES(1, -1);
Where previously the contents of the sink topic
BAR
would have contained records:Key Value Notes 1
null
Spurious tombstone: the table does not contain a row with key 1
, so no tombstone is required.1
{sum=1}
Row added as HAVING criteria now met 1
null
Row deleted as HAVING criteria now not met 1
null
Spurious tombstone: the table does not contain a row with key 1
, so no tombstone is required.The topic will now contain:
Key Value 1
{sum=1}
1
null
Upgrading to ksqlDB 6.0 from ksqlDB 5.5¶
For a complete list of changes, see the ksqlDB changelog.
Note
Queries written for ksqlDB 5.5 continue to run without changes under ksqlDB 6.0. New queries must use the updated 6.0 syntax.
Note
When you upgrade from from ksqlDB 5.5 to ksqlDB 6.0 and later, you may encounter this issue: State Store Passes Wrong Changelog Topic to Serde for Optimized Source Tables. Upgraded ksqlDB applications may still run, but symptoms include spamming your Schema Registry logs with serialization exceptions. To correct this issue, delete the “phantom” changelog subject. For more information, see The Curious Incident of the State Store in Recovery in ksqlDB.
Breaking Changes¶
The “select star” statement (
select *
), no longer expands to includeROWTIME
column(s). Instead,ROWTIME
is included in the results of queries only if explicitly included in the projection, for example.select rowtime, *
. This change affects only new statements. Any view previously created via a CREATE STREAM AS SELECT or CREATE TABLE AS SELECT statement is unaffected.This release changes the system-generated column name for any columns in projections dereference fields in a struct. Previously, the full path was used when generating the name. In 6.0, only the final field name is used. For example,
SELECT someStruct->someField, ...
previously generated a column name ofSOMESTRUCT__SOMEFIELD
and now generates a name ofSOMEFIELD
. Generated column names may have a numeral appended to the end to ensure uniqueness, for exampleSOMEFIELD_2
.Note
We recommend that you do not rely on system-generated column names for production systems, because naming logic may change between releases. Providing an explicit alias ensures consistent naming across releases, for example,
SELECT someStruct->someField AS someField
. For backward compatibility, existing running queries aren’t affected by this change, and they will continue to run with the same column names. Any statements executed after the upgrade will use the new names, where no explicit alias is provided. Add explicit aliases to your statements if you require the old names, for example:SELECT someStruct->someField AS SOMESTRUCT__SOMEFIELD, ...
.In version 5.5, queries that referenced a single GROUP BY column in the projection would fail if they were resubmitted, due to a duplicate column. In 6.0, the same existing queries will continue to run if they’re running already, which means that this change affects only newly submitted queries. Existing queries use the earlier query semantics.
Push queries that rely on auto-generated column names may see changes in column names. Pull queries and any existing persistent queries are unaffected, for example, those created with CREATE STREAM AS SELECT, CREATE TABLE AS SELECT, or INSERT INTO.
ksqlDB Server no longer ships with Jetty. This means that when you start the server, you must supply Jetty-specific dependencies, like certain login modules used for HTTP Basic authentication, by using the KSQL_CLASSPATH environment variable for ksqlDB to find them.
Any key name¶
Statements containing PARTITION BY, GROUP BY, or JOIN clauses now produce different output schemas.
For PARTITION BY and GROUP BY clauses, the following rules define how the name of the key column in the result is determined:
Where the partitioning or grouping is a single-column reference, then the key column has the same name as this column, for example:
-- OUTPUT has a key column named X; CREATE STREAM OUTPUT AS SELECT * FROM INPUT GROUP BY X;
Where the partitioning or grouping is a single field in a struct, the key column has the same name as the field. For example:
-- OUTPUT has a key column named FIELD1; CREATE STREAM OUTPUT AS SELECT * FROM INPUT GROUP BY X->field1;
Otherwise, the key column name is system-generated and has the form
KSQL_COL_n
, where n is a positive integer.
In all cases, except where grouping by more than one column, you can set the new key column’s name by defining an alias in the projection, for example:
-- OUTPUT has a key column named ID.
CREATE TABLE OUTPUT AS
SELECT
USERID AS ID,
COUNT(*)
FROM USERS
GROUP BY ID;
For groupings of multiple expressions, you can’t provide a name for the system-generated key column. You can work around this by combining the grouping columns manually, which enables you to provide an alias, for example:
-- products_by_sub_cat has a key column named COMPOSITEKEY:
CREATE TABLE products_by_sub_cat AS
SELECT
categoryId + ‘§’ + subCategoryId AS compositeKey
SUM(quantity) as totalQty
FROM purchases
GROUP BY CAST(categoryId AS STRING) + ‘§’ + CAST(subCategoryId AS STRING);
For JOIN statements, the name of the key column in the result is determined by the join criteria.
For INNER and LEFT OUTER joins where the join criteria contain at least one column reference, the key column is named based on the left-most source whose join criteria is a column reference, for example:
-- OUTPUT has a key column named I2_ID. CREATE TABLE OUTPUT AS SELECT * FROM I1 JOIN I2 ON abs(I1.ID) = I2.ID JOIN I3 ON I2.ID = I3.ID;
You can give the key column a new name, if required, by defining an alias in the projection, for example:
-- OUTPUT has a key column named ID. CREATE TABLE OUTPUT AS SELECT I2.ID AS ID, I1.V0, I2.V0, I3.V0 FROM I1 JOIN I2 ON abs(I1.ID) = I2.ID JOIN I3 ON I2.ID = I3.ID;
For FULL OUTER joins and other joins where the join criteria are not on column references, the key column in the output is not equivalent to any column from any source. The key column has a system-generated name in the form
KSQL_COL_n
, where n is a positive integer, for example:-- OUTPUT has a key column named KSQL_COL_0, or similar. CREATE TABLE OUTPUT AS SELECT * FROM I1 FULL OUTER JOIN I2 ON I1.ID = I2.ID;
You can give the key column a new name, if required, by defining an alias in the projection. A new UDF, named
JOINKEY
, has been introduced in 6.0 to help define the alias. It takes the join criteria as its parameters, for example:-- OUTPUT has a key column named ID. CREATE TABLE OUTPUT AS SELECT JOINKEY(I1.ID, I2.ID) AS ID, I1.V0, I2.V0 FROM I1 FULL OUTER JOIN I2 ON I1.ID = I2.ID;
Explicit keys¶
In version 6.0, new CREATE TABLE
statements will fail if the PRIMARY KEY
column isn’t provided. For example, you must update a statement like following to
include the definition of the PRIMARY KEY:
CREATE TABLE FOO (
name STRING
) WITH (
kafka_topic='foo',
value_format='json'
);
Update the previous statement to include the definition of the PRIMARY KEY:
CREATE TABLE FOO (
ID STRING PRIMARY KEY,
name STRING
) WITH (
kafka_topic='foo',
value_format='json'
);
If you load the value columns of the topic from Schema Registry, also known as “schema inference”, you can provide the primary key as a partial schema, for example:
-- FOO has value columns loaded from Schema Registry
CREATE TABLE FOO (
ID INT PRIMARY KEY
) WITH (
kafka_topic='foo',
value_format='avro'
);
CREATE STREAM statements that don’t define a KEY
column no longer have an
implicit ROWKEY
key column, for example:
CREATE STREAM BAR (
NAME STRING
) WITH (...);
In version 5.5, the previous statement would have resulted in a stream that had
two columns: ROWKEY STRING KEY
and NAME STRING
. With this change, the
previous statement results in a stream that has only the NAME STRING
column.
Streams with no KEY column are serialized to Apache Kafka® topics with a null
key.
Key columns required in projection¶
A statement that creates a materialized view must include the key columns in the projection, for example:
CREATE TABLE OUTPUT AS
SELECT
productId, -- key column in projection
SUM(quantity) as unitsSold
FROM sales
GROUP BY productId;
The key column productId
is required in the projection. In previous
versions of ksqlDB, the presence of productId
in the projection would
have placed a copy of the data into the value of the underlying Kafka topic’s
record. But starting in version 6.0, the projection must include the key
columns, and ksqlDB stores these columns in the key of the underlying
Kafka record. Optionally, you may provide an alias for the key column(s), for
example:
CREATE TABLE OUTPUT AS
SELECT
productId as id, -- aliased key column
SUM(quantity) as unitsSold
FROM sales
GROUP BY productId;
If you need a copy of the key column in the Kafka record’s value, use the AS_VALUE function to indicate this to ksqlDB. For example, the following statement produces an output like earlier versions of ksqlDB for the previous example materialized view:
CREATE TABLE OUTPUT AS
SELECT
productId as ROWKEY, -- key column named ROWKEY
AS_VALUE(productId) as productId, -- productId copied into value
SUM(quantity) as unitsSold
FROM sales
GROUP BY productId;
WITH(KEY) syntax removed¶
In previous versions, all key columns were named ROWKEY
. To enable using a
more user-friendly name for the key column in queries, you could supply an
alias for the key column in the WITH clause, for example:
CREATE TABLE INPUT (
ROWKEY INT PRIMARY KEY,
ID INT,
V0 STRING
) WITH (
key='ID', -- removed in ksqlDB 6.0
...
);
With the previous query, the ID
column can be used as an alias for
ROWKEY
. This approach required the Kafka message value to contain an exact
copy of the key.
KLIP-24
removed the restriction that key columns must be named ROWKEY
, eliminating
the need for the WITH(KEY)
syntax, which has been removed. Also, this change
removed the requirement for the Kafka message value to contain an exact copy of
the key.
Update your queries by removing the KEY
from the WITH
clause and naming
your KEY
and PRIMARY KEY
columns appropriately. For example, you can
rewrite the previous CREATE TABLE statement like this:
CREATE TABLE INPUT (
ID INT PRIMARY KEY,
V0 STRING
) WITH (...);
This doesn’t work when the value format is DELIMITED
, because the value
columns are order dependent, so dropping the ID
value column would result
in a deserialization error or the wrong values being loaded. If you’re using
DELIMITED
, consider rewriting the previous example like this:
CREATE TABLE INPUT (
ID INT PRIMARY KEY,
ignoreMe INT,
V0 STRING
) WITH (...);
Basic+Bearer authentication¶
In ksql-server.properties
, remove the following configuration settings:
rest.servlet.initializor.classes
websocket.servlet.initializor.classes
Add the following setting:
ksql.authentication.plugin.class=io.confluent.ksql.security.VertxBearerOrBasicAuthenticationPlugin
Upgrading to ksqlDB 5.5 from KSQL 5.4¶
Warning
The upgrade from KSQL 5.4 to ksqlDB 5.5 is not a rolling restart. You must shut down all KSQL instances and then start up all ksqlDB instances, so there will be downtime.
Complete the following steps to perform the upgrade from KSQL 5.4 to ksqlDB 5.5:
- Capture existing SQL statements
- Stop clients from writing to KSQL
- Stop the existing KSQL deployment
- Deploy a new ksqlDB cluster with a new service ID
- Set up security (optional)
- Recompile user-defined functions (optional)
- Replay SQL statements that you captured in the first step
Capture existing SQL statements¶
To capture existing SQL statements, we recommend using the kafka-console-consumer
to output the existing KSQL command topic. The following example command shows
how to pipe the output to jq and save the SQL commands to a statements.sql
file.
Note
You must provide credentials for the kafka-console-consumer
command by
using the -consumer.config
option. For more information, see
Encryption and Authentication with SSL.
# export KSQL_SERVICE_ID=<ksql.service.id>
# export BROKER=localhost
# export PORT=9092
./bin/kafka-console-consumer --bootstrap-server ${BROKER}:${PORT} --topic _confluent-ksql-${KSQL_SERVICE_ID}_command_topic --from-beginning | jq -r ".statement" > statements.sql
To get the kafka-console-consumer
tool, install Confluent Platform.
Look through the statements to make sure that the command worked as expected. Also, you may want to remove CREATE/DROP pairings, because you will execute all of these statements in the new cluster.
Stop clients that write to KSQL¶
To prevent data loss, stop all client applications and producers that write to the KSQL cluster.
Stop the existing KSQL deployment¶
Stop the KSQL cluster. The procedure for stopping the cluster varies depending
on your deployment. For example, we recommend using systemctl
for RPM
deployments. If you’re using a docker-compose stack, you might use the
docker-compose down
command.
Install ksqlDB packages¶
If your deployment uses DEB or RPM release artifacts, you must uninstall the
old packages and install the new ones. Because the configuration directory
has changed from /etc/ksql
to /etc/ksqldb
you must also copy any
configuration files to the new location:
cp -p ${CONFLUENT_HOME}/etc/ksql/* ${CONFLUENT_HOME}/etc/ksqldb/
Ensure that all of the files have been copied, with the correct owner, group, and file permissions. If the permissions aren’t correct, change them manually.
Change the ksqlDB service ID¶
Different deployment strategies configure ksqlDB differently, but you must
use a different value for ksql.service.id
before you start the new ksqlDB
server. If you use the old value, the server won’t start. Here are some common
deployment mechanisms and how to change this configuration:
- Debian/RPM: change the property in
${CONFLUENT_HOME}/etc/ksqldb/ksql-server.properties
. - Docker: change the environment variable
KSQL_KSQL_SERVICE_ID
. - Confluent for Kubernetes (CFK): see Upgrading with CFK.
Set up security (optional)¶
If you have security enabled, set up security for your ksqlDB app. ksqlDB supports using role-based access control (RBAC), ACLs, and no authorization.
Create new role bindings or assign ACLs for the ksql
service
principal:
Topic:
__consumer_offsets
Topic:
__transaction_state
TransactionalId: The value that you set in the configuration file, for example,
ksqldb_
.If you’re using ACLs for security, these ACLs are required:
DESCRIBE
operation on theTOPIC
withLITERAL
name__consumer_offsets
.DESCRIBE
operation on theTOPIC
withLITERAL
name__transaction_state
.DESCRIBE
andWRITE
operations on theTRANSACTIONAL_ID
withLITERAL
name<ksql.service.id>
.
If you’re using RBAC for security, these role assignments are required:
DeveloperRead
role on the__consumer_offsets
topic.DeveloperRead
role on__transaction_state
topic.DeveloperWrite
role on the<ksql.service.id>
TransactionalId.
Recompile user-defined functions (optional)¶
If your KSQL application uses user-defined functions, you must recompile them with the upgraded dependencies. For more information, see ksqlDB Custom Function Reference (UDF, UDAF, and UDTF).
Start ksqlDB¶
Start the ksqldb
service. The procedure for starting the cluster varies
depending on your deployment. For example, we recommend using systemctl
for RPM
deployments. If you’re using a docker-compose stack, you might use the
docker-compose up
command.
Replay SQL statements¶
To replay SQL statements, start the ksqlDB CLI and issue
RUN SCRIPT <path-to-statements.sql>;
.
Important
There have been backward-incompatible syntax changes between KSQL and ksqlDB,
and some of the statements may fail. If this happens, run the statements in
statements.sql
one-by-one, fixing any statements that have failed. In
particular, continuous and persistent queries now require the
EMIT CHANGES
syntax. For more information, see
Breaking Changes.
Upgrading to KSQL 5.4¶
Upgrade one server at a time in a “rolling restart”. The remaining servers should have sufficient spare capacity to take over temporarily for unavailable, restarting servers.
Notable changes in 5.4:
KSQL Server
Query Id generation
This version of KSQL includes a change to how query ids are generated for Persistent Queries (INSERT INTO/CREATE STREAM AS SELECT/CREATE TABLE AS SELECT). Previously, query ids would be incremented on every successful Persistent Query created. New query ids use the Kafka record offset of the query creating command in the KSQL command topic.
In order to prevent inconsistent query ids, don’t create new Persistent Queries while upgrading your KSQL servers (5.3 or lower). Old running queries will retain their original id on restart, while new queries will utilize the new id convention.
See Github PR #3354 for more info.
Upgrading from KSQL 5.2 to KSQL 5.3¶
Notable changes in 5.3:
KSQL Server
Avro schema compatibility
This version of KSQL fixes a bug where the schemas returned by UDF and UDAFs might not be marked as nullable. This can cause serialization issues in the presence of
null
values, as might be encountered if the UDF fails.With the bug fix all fields are now optional.
This is a forward compatible change in Avro, i.e. after upgrading, KSQL will be able to read old values using the new schema. However, it is important to ensure downstream consumers of the data are using the updated schema before upgrading KSQL, as otherwise deserialization may fail. The updated schema is best obtained from running the query in another KSQL cluster, running version 5.3.
See Github issue #2769 for more info.
Configuration:
ksql.sink.partitions
andksql.sink.replicas
are deprecated. All new queries will use the source topic partition count and replica count for the sink topic instead unless partitions and replicas are set in the WITH clause.- A new config variable,
ksql.internal.topic.replicas
, was introduced to set the replica count for the internal topics created by KSQL Server. The internal topics include command topic or config topic.
Upgrading from KSQL 5.1 to KSQL 5.2¶
Notable changes in 5.2:
- KSQL Server
- Interactive mode:
- The use of the
RUN SCRIPT
statement via the REST API is now deprecated and will be removed in the next major release. (Github issue 2179). The feature circumnavigates certain correctness checks and is unnecessary, given the script content can be supplied in the main body of the request. If you are using theRUN SCRIPT
functionality from the KSQL CLI, your scripts will not be affected, as this will continue to be supported. If you are using theRUN SCRIPT
functionality directly against the REST API your requests will work with the 5.2 server, but will be rejected after the next major version release. Instead, include the contents of the script in the main body of your request.
- The use of the
- Interactive mode:
- Configuration:
- When upgrading your headless (non-interactive) mode application from version 5.0.0 and below, you must include the configs specified in the 5.1 upgrade instructions.
- When upgrading your headless (non-interactive) mode application, you must include the following properties in your properties file:
ksql.windowed.session.key.legacy=true
ksql.named.internal.topics=off
ksql.streams.topology.optimization=none
Version 5.1 of KSQL and earlier incorrectly excluded the end time in the record key in
Kafka for session-windowed data. The ksql.windowed.session.key.legacy=true
config
ensures that KSQL continues to store session keys without the end time.
Version 5.1 also didn’t use Kafka Streams optimizations, which required
enabling internal topic names. For compatibility, the ksql.named.internal.topics
and ksql.streams.topology.optimization
configs are disabled.
Upgrading from KSQL 5.0.0 and below to KSQL 5.1¶
- KSQL server:
- The KSQL engine metrics are now prefixed with the
ksql.service.id
. If you have been using any metric monitoring tool, you need to update your metric names. For instance, assumingksql.service.id
is set todefault_
,messages-produced-per-sec
will be changed to_confluent-ksql-default_messages-consumed-per-sec
.
- The KSQL engine metrics are now prefixed with the
- Configuration:
- When upgrading your headless (non-interactive) mode application, you must
either update your queries to use the new SUBSTRING indexing semantics, or
set
ksql.functions.substring.legacy.args
totrue
. If possible, we recommend that you update your queries accordingly, instead of enabling this configuration setting. Refer to the SUBSTRING documentation in the function guide for details on how to do so. Note that this is NOT required for interactive mode KSQL.
- When upgrading your headless (non-interactive) mode application, you must
either update your queries to use the new SUBSTRING indexing semantics, or
set
Upgrading from KSQL 0.x (Developer Preview) to KSQL 4.1¶
KSQL 4.1 is not backward-compatible with the previous KSQL 0.x developer preview releases.
In particular, you must manually migrate queries running in the older preview releases of KSQL to the 4.1 version by
issuing statements like CREATE STREAM
and CREATE TABLE
again.
Notable changes in 4.1:
- KSQL CLI:
- The
ksql-cli
command was renamed toksql
. - The CLI no longer supports what was formerly called “standalone” or “local” mode, where
ksql-cli
would run both the CLI and also a KSQL server process inside the same JVM. In 4.1,ksql
will only run the CLI. For local development and testing, you can now runconfluent start
(which will also launch a KSQL server), followed byksql
to start the CLI. This setup is used for the Confluent Platform quickstart. Alternatively, you can start the KSQL server directly as described in Starting the ksqlDB Server, followed byksql
to start the CLI.
- The
- KSQL server:
- The default
listeners
address was changed tohttp://localhost:8088
(KSQL 0.x usedhttp://localhost:8080
). - Assigning KSQL servers to a specific KSQL cluster has been simplified and is now done with the
ksql.service.id
setting. See ksql.service.id for details.
- The default
- Executing
.sql
files: To run pre-defined KSQL queries stored in a.sql
file, see Non-interactive (Headless) ksqlDB Usage. - Configuration: Advanced KSQL users can configure the Kafka Streams and Kafka producer/consumer client settings used by KSQL. This is achieved by using prefixes for the respective configuration settings. See Configure ksqlDB Server as well as Configuration Parameter Reference and Configure ksqlDB CLI for details.