Upgrade ksqlDB for Confluent Platform¶
About backward compatibility¶
Most releases of ksqlDB are backward compatible. But backward compatibility comes at a cost: progress is slower and the code base incurs increased complexity. ksqlDB is a young product and we want to move fast, so we have decided to choose speed of development over strong backward compatibility guarantees.
Until version 1.0 of ksqlDB, each minor release will potentially have breaking changes in it, which may mean that you can’t simply update the ksqlDB binaries and restart the server(s).
The data models and binary formats used within ksqlDB are in flux. This means data local to each ksqlDB node and stored centrally within internal Kafka topics may not be compatible with the new version you’re trying to deploy.
Should I upgrade?¶
It’s great that you’re interested in trying out the new features and fixes that new versions of ksqlDB bring. But before rushing off to upgrade all your ksqlDB clusters, ask yourself, “Do I need to upgrade this cluster”?
If you’re running ksqlDB in production, and you don’t yet need the features or fixes the new version brings, consider delaying any upgrade until either another release has features or fixes you need, or until ksqlDB reaches version 1.0 and promises backward compatibility.
How to upgrade¶
When possible, ksqlDB maintains runtime compatibility between versions, which means you can upgrade in-place by stopping your ksqlDB servers and restarting them with the new version, and your existing query pipelines will resume from where they left off. New query statements may need to be adjusted for syntax changes required by the new version.
However, until ksqlDB 1.0, some ksqlDB versions may not support in-place upgrades. In these situations, upgrading a cluster involves leaving the old cluster running on the old version, bringing up a new cluster on the new version, porting across your database schema, and finally thinking about your data. Read on for details.
In-Place upgrade¶
Follow these steps to do an in-place upgrade of ksqlDB.
- Download the new ksqlDB version to upgrade to.
- Set it up with all required configs. For the vast majority of upgrades, you may simply copy existing configs over to the new version.
- Copy source UDFs and recompile against the new ksqlDB version, to accommodate any changes to the UDF framework.
- Stop the ksqlDB process running the current version and start the newer version with the latest ksqlDB config file, which you created in Step 2.
- ksqlDB starts executing queries and rebuilds all states, using the
command topic, because the value of
ksql.service.id
is the same as the previous version. ksqlDB Server with the latest version is up and running.
Troubleshooting an Upgrade¶
ksqlDB Server fails to start with an error message like “KsqlException: UdfFactory not compatible with existing factory. function: existing”:
This error occurs when the new ksqlDB version introduces a built-in function that has the same name as yours, creating a conflict with your UDF. Create a new UDF jar by renaming or eliminating the conflicting function(s), and restart ksqlDB with the new UDF jar.
DESCRIBE EXTENDED command doesn’t work: Usually, this happens when the ksqlDB CLI and ksqlDB Server versions are different. You can create an alias for ksqlDB CLI to make sure to change it to a newer version. You should ensure that the ksqlDB CLI and Server versions are the same. In “/.bashrc” file, you can create an alias for logging into ksqlDB cli. Eg :
- Open .bashrc file
- Create alias using command : “alias ksqldb=‘cd && ./ksql http://:8088’” and save the changes. Now you can type “ksqldb” on shell and can login into ksqlDB.
Steps to follow when “In-Place” upgrade is not supported¶
Port the database schema¶
To port your database schema from one cluster to another you need to recreate all the streams, tables and types in the source cluster.
The recommended process is to use the commandTopicConsumer Python script to dump the ksqlDB command topic.
commandTopicConsumer.py
#!/usr/bin/env python3
#
# Copyright 2020 Confluent Inc.
#
# Licensed under the Confluent Community License (the "License"); you may not use
# this file except in compliance with the License. You may obtain a copy of the
# License at
#
# http://www.confluent.io/confluent-community-license
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OF ANY KIND, either express or implied. See the License for the
# specific language governing permissions and limitations under the License.
#
#
# Usage: commandTopicConsumer.py [-h] [-f CONFFILE] [-b BROKERS][-k KSQLSERVICEID]
# Command topic consumer that dumps CREATE, DROP and TERMINATE queries to
# stdout. If no arguments are provided, default values are used. Default broker
# is 'localhost:9092'. Default ksqlServiceId is 'default_'. You may optionally
# provide a configuration file with broker specific configuration parameters.
# Every run of this script will consume the topic from the beginning.
# optional arguments:
# -h, --help show this help message and exit
# -b BROKERS Bootstrap servers
# -f CONFFILE Configuration file (configProp=value format)
# -k KSQLSERVICEID KsqlDB service ID
# -d Enable debug logging
from confluent_kafka import KafkaError, KafkaException, version
from confluent_kafka import Producer, Consumer
import json
import logging
import argparse
import uuid
import sys
import re
class CommandRecord (object):
def __init__(self, stmt):
self.stmt = stmt
def __str__(self):
return "({})".format(self.stmt)
@classmethod
def deserialize(cls, binstr):
d = json.loads(binstr)
return CommandRecord(d['statement'])
class CommandConsumer(object):
def __init__(self, ksqlServiceId, conf):
self.consumer = Consumer(conf)
self.topic = '_confluent-ksql-{}_command_topic'.format(ksqlServiceId)
def consumer_run(self):
max_offset = -1001
def latest_offsets(consumer, partitions):
nonlocal max_offset
for p in partitions:
high_water = consumer.get_watermark_offsets(p)[1]
if high_water >= max_offset:
max_offset = high_water
logging.debug("Max offset in command topic = %d", max_offset)
self.consumer.subscribe([self.topic], on_assign=latest_offsets)
self.msg_cnt = 0
self.msg_err_cnt = 0
stmts = {}
try:
while True:
msg = self.consumer.poll(0.2)
if msg is None:
continue
if msg.error() is not None:
print("consumer: error: {}".format(msg.error()))
self.consumer_err_cnt += 1
continue
try:
#print("Read msg with offset ", msg.offset())
self.msg_cnt += 1
record = CommandRecord.deserialize(msg.value())
#print(record)
# match statements CREATE/DROP STREAM, CREATE/DROP TABLE
match = re.search(r'(?:create|drop) (?:stream|table) ([a-zA-z0-9-]+?)(:?\(|AS|\s|;)', record.stmt, re.I)
if match:
name = match.group(1).upper()
if name == "KSQL_PROCESSING_LOG":
continue
if name not in stmts:
stmts[name] = []
stmts[name].append(record.stmt)
# match statements TERMINATE query
match2 = re.search(r'(?:terminate) (?:ctas|csas)_(.+?)_', record.stmt, re.I)
if match2:
name = match2.group(1).upper()
stmts[name].append(record.stmt)
# match statements INSERT INTO stream or table
match3 = re.search(r'(?:insert into) ([a-zA-z0-9-]+?)(:?\(|\s|\()', record.stmt, re.I)
if match3:
name = match3.group(1).upper()
stmts[name].append(record.stmt)
#match statements CREATE TYPE
match4 = re.search(r'(?:create|drop) type ([a-zA-z0-9-]+?)(:?AS|\s|;)', record.stmt, re.I)
if match4:
name = match4.group(1).upper()
if name not in stmts:
stmts[name] = []
stmts[name].append(record.stmt)
if match is None and match2 is None and match3 is None and match4 is None:
if 'UNRECOGNIZED' not in stmts:
stmts['UNRECOGNIZED'] = []
stmts['UNRECOGNIZED'].append(record.stmt)
# High watermark is +1 from last offset
if msg.offset() >= max_offset-1:
break
except ValueError as ex:
print("consumer: Failed to deserialize message in "
"{} [{}] at offset {} (headers {}): {}".format(
msg.topic(), msg.partition(), msg.offset(), msg.headers(), ex))
self.msg_err_cnt += 1
except KeyboardInterrupt:
pass
finally:
self.consumer.close()
logging.debug("Consumed {} messages, erroneous message = {}.".format(self.msg_cnt, self.msg_err_cnt))
outer_json = []
for key, value in stmts.items():
inner_json = {}
inner_json['subject'] = key
inner_json['statements'] = value
outer_json.append(inner_json)
print(json.dumps(outer_json ))
if __name__ == '__main__':
parser = argparse.ArgumentParser(description="Command topic consumer that dumps CREATE, DROP and TERMINATE queries to "+
"stdout. If no arguments are provided, default values are used. Default broker is "
"'localhost:9092'. Default ksqlServiceId is 'default_'. You may optionally provide a configuration file with "+
"broker specific configuration parameters. Every run of this script will consume the topic from the beginning. ")
parser.add_argument('-f', dest='confFile', type=argparse.FileType('r'),
help='Configuration file (configProp=value format)')
parser.add_argument('-b', dest='brokers', type=str, default=None, help='Bootstrap servers')
parser.add_argument('-k', dest='ksqlServiceId', type=str, default=None, help='KsqlDB service ID')
parser.add_argument("-d", dest='debug', action="store_true", default=False, help="Enable debug logging")
args = parser.parse_args()
if args.debug:
logging.basicConfig(stream=sys.stderr, level=logging.DEBUG)
conf = dict()
if args.confFile is not None:
# Parse client configuration file
for line in args.confFile:
line = line.strip()
if len(line) == 0 or line[0] == '#':
continue
i = line.find('=')
if i <= 0:
raise ValueError("Configuration lines must be `name=value..`, not {}".format(line))
name = line[:i]
value = line[i+1:]
conf[name] = value
if args.brokers is not None:
# Overwrite any brokers specified in configuration file with
# brokers from -b command line argument
conf['bootstrap.servers'] = args.brokers
elif 'bootstrap.servers' not in conf:
conf['bootstrap.servers'] = 'localhost:9092'
if args.ksqlServiceId is None:
args.ksqlServiceId = 'default_'
conf['auto.offset.reset'] = 'earliest'
conf['enable.auto.commit']= 'False'
conf['client.id'] = 'commandClient'
# Generate a unique group.id
conf['group.id'] = 'commandTopicConsumer.py-{}'.format(uuid.uuid4())
c = CommandConsumer(args.ksqlServiceId, conf)
c.consumer_run()
If you prefer to recover the schema manually, use the following steps.
Tip
You can use the SPOOL <ksqldb-reference-spool> command to capture the output of the commands you run in the CLI to a file.
- Capture streams SQL:
- Run
list streams extended;
to list all of the streams. - Grab the SQL statement that created each stream from the output,
ignoring
KSQL_PROCESSING_LOG
. - Capture tables SQL:
- Run
list tables extended;
to list all of the tables. - Grab the SQL statement that created each table from the output.
- Capture custom types SQL:
- Run
list types;
to list all of the custom types. - Convert the output into
CREATE TYPE <name> AS <schema>
syntax by grabbing the name from the first column and the schema from the second column of the output. - Order by dependency: you’ll now have the list of SQL statements to rebuild the schema, but they are not yet ordered in terms of dependencies. You will need to reorder the statements to ensure each statement comes after any other statements it depends on.
- Update the script to take into account any changes in syntax or
functionality between the old and new clusters. The release notes
can help here. It can also be useful to have a test ksqlDB cluster,
pointing to a different test Kafka cluster, where you can try
running the script to get feedback on any errors. Note: you may want
to temporarily add
PARTITIONS=1
to theWITH
clause of anyCREATE TABLE
orCREATE STREAM
command, so that the command will run without requiring you to first create the necessary topics in the test Kafka cluster. - Stop the old cluster: if you do not do so then both the old and new cluster will be publishing to sink topics, resulting in undefined behavior.
- Build the schema in the new instance. Now you have the SQL file you can run this against the new cluster to build a copy of the schema. This is best achieved with the RUN SCRIPT command, which takes a SQL file as an input.
Rebuild state¶
Porting the database schema to the new cluster will cause ksqlDB to start processing data. As this is a new cluster it will start processing all data from the start, i.e. it will likely be processing data the old cluster has already processed.
IMPORTANT: while ksqlDB is processing historic data it will output historic results to sink topics. Such historic results may cause issues with downstream consumers of these data. The historic results will be correctly timestamped, allowing well behaved consumers to correctly process or ignore the historic results.
NOTE: source data that the old cluster processed may not longer be available in Kafka for the new cluster to process, e.g. topics with limited retention. It is therefore possible for the new cluster to have different results to the old.
It is possible to monitor how far behind the processing is through JMX.
Monitor the
kafka.consumer/consumer-fetch-manager-metrics/<consumer-name>/records-lag-max
metrics to observe the new nodes processing the historic data.
Destroy the old cluster¶
Once you’re happy with your new cluster you can destroy the old one using the terminate endpoint. This will stop all processing and delete any internal topics in Kafka.
Upgrade notes¶
Upgrading from ksqlDB 0.14.0 to 0.29.0¶
In-place upgrades are supported from ksqlDB 0.14.0 to 0.29.0. See the changelog for potential breaking changes that may affect the behavior or required syntax for new queries.
Join statements¶
The GRACE PERIOD clause is now supported in stream-stream joins since 0.20.0. However, when set explicitly, the grace period now defines when the left/outer non-joined results are emitted. Contrary to the old syntax (without GRACE PERIOD) that emits left/outer non-joined results eagerly, which may cause “spurious” result records.
The use of GRACE PERIOD is highly recommended now that it is supported. It helps to reduce high disk usage when using small grace period values (default 24 hours if not set) and also provides better semantics for left/outer joins.
Upgrading from ksqlDB 0.10.0 to 0.14.0¶
In-place upgrades are supported from ksqlDB 0.10.0 to 0.14.0. See the changelog for potential breaking changes that may affect the behavior or required syntax for new queries.
Upgrading from ksqlDB 0.9.0 to 0.10.0¶
In-place upgrades are supported from ksqlDB 0.9.0 to 0.10.0. However, in-place upgrades from pre-0.7.0 versions to 0.10.0 are not supported, as ksqlDB 0.7.0 is not backward compatible. Do not upgrade in place from a pre-0.7.0 version to 0.10.0.
The following changes in SQL syntax and functionality may mean SQL statements that ran previously no longer run.
Any key name¶
Statements containing PARTITION BY, GROUP BY, or JOIN clauses now produce different output schemas.
For PARTITION BY and GROUP BY statements, the name of the key column in the result is determined by the PARTITION BY or GROUP BY clause: 1. Where the partitioning or grouping is a single column reference, then the key column has the same name as this column. For example:
-- OUTPUT will have a key column called X;
CREATE STREAM OUTPUT AS
SELECT *
FROM INPUT
GROUP BY X;
- Where the partitioning or grouping is a single struct field, then the key column has the same name as the field. For example:
-- OUTPUT will have a key column called FIELD1;
CREATE STREAM OUTPUT AS
SELECT *
FROM INPUT
GROUP BY X->field1;
- Otherwise, the key column name is system-generated and has the form
KSQL_COL_n
, wheren
is a positive integer.
In all cases, except where grouping by more than one column, you can set the new key column’s name by defining an alias in the projection. For example:
-- OUTPUT will have a key column named ID.
CREATE TABLE OUTPUT AS
SELECT
USERID AS ID,
COUNT(*)
FROM USERS
GROUP BY ID;
For groupings of multiple expressions, you can’t provide a name for the system-generated key column. However, a work around is to combine the grouping columns yourself, which does enable you to provide an alias:
-- products_by_sub_cat will have a key column named COMPOSITEKEY:
CREATE TABLE products_by_sub_cat AS
SELECT
categoryId + Ԥ' + subCategoryId AS compositeKey
SUM(quantity) as totalQty
FROM purchases
GROUP BY CAST(categoryId AS STRING) + Ԥ' + CAST(subCategoryId AS STRING);
For JOIN statements, the name of the key column in the result is determined by the join criteria. 1. For INNER and LEFT OUTER joins where the join criteria contain at least one column reference, the key column is named based on the left-most source whose join criteria is a column reference. For example:
-- OUTPUT will have a key column named I2_ID.
CREATE TABLE OUTPUT AS
SELECT *
FROM I1
JOIN I2 ON abs(I1.ID) = I2.ID JOIN I3 ON I2.ID = I3.ID;
The key column can be given a new name, if required, by defining an alias in the projection. For example:
-- OUTPUT will have a key column named ID.
CREATE TABLE OUTPUT AS
SELECT
I2.ID AS ID,
I1.V0,
I2.V0,
I3.V0
FROM I1
JOIN I2 ON abs(I1.ID) = I2.ID
JOIN I3 ON I2.ID = I3.ID;
- For FULL OUTER joins and other joins where the join criteria are not
on column references, the key column in the output is not equivalent
to any column from any source. The key column has a system-generated
name in the form
KSQL_COL_n
, wheren
is a positive integer. For example:
-- OUTPUT will have a key column named KSQL_COL_0, or similar.
CREATE TABLE OUTPUT AS
SELECT *
FROM I1
FULL OUTER JOIN I2 ON I1.ID = I2.ID;
The key column can be given a new name, if required, by defining an
alias in the projection. A new UDF has been introduced to help define
the alias called JOINKEY
. It takes the join criteria as its
parameters. For example:
-- OUTPUT will have a key column named ID.
CREATE TABLE OUTPUT AS
SELECT
JOINKEY(I1.ID, I2.ID) AS ID,
I1.V0,
I2.V0
FROM I1
FULL OUTER JOIN I2 ON I1.ID = I2.ID;
JOINKEY
will be deprecated in a future release of ksqlDB once
multiple key columns are supported.
Explicit keys¶
CREATE TABLE
statements will now fail if the PRIMARY KEY
column
is not provided.
For example, a statement such as:
CREATE TABLE FOO (
name STRING
) WITH (
kafka_topic='foo',
value_format='json'
);
Will need to be updated to include the definition of the PRIMARY KEY, for example:
CREATE TABLE FOO (
ID STRING PRIMARY KEY,
name STRING
) WITH (
kafka_topic='foo',
value_format='json'
);
If using schema inference, i.e. loading the value columns of the topic from the Schema Registry, the primary key can be provided as a partial schema, for example:
-- FOO will have value columns loaded from the Schema Registry
CREATE TABLE FOO (
ID INT PRIMARY KEY
) WITH (
kafka_topic='foo',
value_format='avro'
);
CREATE STREAM
statements that do not define a KEY
column no
longer have an implicit ROWKEY
key column.
For example:
CREATE STREAM BAR (
NAME STRING
) WITH (...);
Previously, the above statement would have resulted in a stream with two
columns: ROWKEY STRING KEY
and NAME STRING
.
With this change, the above statement results in a stream with only the
NAME STRING
column.
Streams with no KEY column are serialized to Kafka topics with a
null
key.
Key columns required in projection¶
A statement that creates a materialized view must include the key columns in the projection. For example:
CREATE TABLE OUTPUT AS
SELECT
productId, // <-- key column in projection
SUM(quantity) as unitsSold
FROM sales
GROUP BY productId;
The key column productId
is required in the projection. In previous
versions of ksqlDB, the presence
of productId
in the projection would have placed a copy of the
data into the value of the underlying Kafka topic’s record. But starting
in version 0.10.0, the projection must include the key columns, and
ksqlDB stores these columns
in the key of the underlying Kafka record. Optionally, you may provide an alias for
the key column(s).
CREATE TABLE OUTPUT AS
SELECT
productId as id, // <-- aliased key column
SUM(quantity) as unitsSold
FROM sales
GROUP BY productId;
If you need a copy of the key column in the Kafka record’s value, use the AS_VALUE function to indicate this to ksqlDB. For example, the following statement produces an output inline with the previous version of ksqlDB
for the above example materialized view:
CREATE TABLE OUTPUT AS
SELECT
productId as ROWKEY, // <-- key column named ROWKEY
AS_VALUE(productId) as productId, // <-- productId copied into value
SUM(quantity) as unitsSold
FROM sales
GROUP BY productId;
WITH(KEY) syntax removed¶
In previous versions, all key columns were called ROWKEY
. To enable
using a more
user-friendly name for the key column in queries, it was possible
to supply an alias for the key column in the WITH clause, for example:
CREATE TABLE INPUT (
ROWKEY INT PRIMARY KEY,
ID INT,
V0 STRING
) WITH (
key='ID',
...
);
With the previous query, the ID
column can be used as an alias for
ROWKEY
. This approach required the Kafka message value to contain an
exact copy of the key.
KLIP-24
removed the restriction that key columns must be named ROWKEY
,
negating the need for the WITH(KEY)
syntax, which has been removed.
Also, this change removed the requirement for the Kafka message value to
contain an exact copy of the key.
Update your queries by removing the KEY
from the WITH
clause and
naming
your KEY
and PRIMARY KEY
columns appropriately. For example, the
previous CREATE TABLE statement can now be rewritten as:
CREATE TABLE INPUT (
ID INT PRIMARY KEY,
V0 STRING
) WITH (...);
Unless the value format is DELIMITED
, which means the value columns
are order dependent, so dropping the ID
value column would result
in a
deserialization error or the wrong values being loaded. If you’re using
DELIMITED
, consider rewriting as:
CREATE TABLE INPUT (
ID INT PRIMARY KEY,
ignoreMe INT,
V0 STRING
) WITH (...);
Upgrading from ksqlDB 0.8.0 to 0.9.0¶
In-place upgrades are supported from ksqlDB 0.8.0 to 0.9.0. However, in-place upgrades from pre-0.7.0 versions to 0.9.0 are not supported, as ksqlDB 0.7.0 is not backward compatible. Do not upgrade in place from a pre-0.7.0 version to 0.9.0.
The following changes in SQL syntax and functionality may mean SQL statements that ran previously no longer run.
Table PRIMARY KEYs¶
Tables now use PRIMARY KEY
to define their primary key column rather
than KEY
. Update your CREATE TABLE
statements as required. For
example, statements like this:
CREATE TABLE OUTPUT (ROWKEY INT KEY, V0 STRING, V1 DOUBLE) WITH (...);
Must be updated to:
CREATE TABLE OUTPUT (ROWKEY INT PRIMARY KEY, V0 STRING, V1 DOUBLE) WITH (...);
Upgrading from ksqlDB 0.7.0 to 0.8.0¶
In-place upgrades are supported from ksqlDB 0.7.0 to 0.8.0. See the changelog for bug fixes and other changes.
Upgrading from ksqlDB 0.6.0 to 0.7.0¶
Important
ksqlDB 0.7.0 is not backward compatible. Do not upgrade in-place.
The following changes in SQL syntax and functionality may mean SQL statements that ran previously no longer run.
PARTITION BY
and GROUP BY
result schema changes:¶
Materialized views created with CREATE TABLE AS SELECT
or
CREATE STREAM AS SELECT
that include a PARTITION BY
or
GROUP BY
on a single column may fail or result in a different result
schema.
The statement may fail if the type of the single column is not a
supported primitive type: INT
, BIGINT
, DOUBLE
or STRING
.
For example:
CREATE STREAM input (col0 ARRAY<STRING>) WITH (kafka_topic='input', value_format='json');
-- following will fail due to grouping by non-primitive key:
CREATE TABLE output AS SELECT count(1) FROM input GROUP BY col0;
Workaround: Change the statement to CAST
the GROUP BY
or
PARTITION BY
column to a STRING
, for example
CREATE TABLE output AS SELECT count(1) FROM input GROUP BY CAST(col0 AS STRING);
The statement will result in a stream or table with a different schema
if the single column has a supported primitive key other than
STRING
. The type of the ROWKEY
system column in the resulting
table or stream will match the type of the single column. This can cause
downstream statements to fail.
CREATE STREAM input (col0 INT) WITH (kafka_topic='input', value_format='json');
-- schema of output has changed!
-- was: ROWKEY STRING KEY, COUNT BIGINT
-- now: ROWKEY INT KEY, COUNT BIGINT
CREATE TABLE output AS SELECT count(1) AS COUNT FROM input GROUP BY col0;
Workaround 1: Fix the downstream queries to use the new SQL type.
Workaround 2: Change the statement to CAST
the GROUP BY
or
PARTITION BY
column to a STRING
. This allows the query to
operate as it did previously, for example
-- schema: ROWKEY STRING KEY, COUNT BIGINT
CREATE TABLE output AS SELECT count(1) AS COUNT FROM input GROUP BY CAST(col0 AS STRING);
Joins may result in different schema or may fail¶
Some existing joins may now fail and others may see that the type of
ROWKEY
in the result schema may have changed, due to primitive key
support.
The statement may fail if the two sides of the join have a different SQL Type. For example:
CREATE STREAM L (ID INT) WITH (...);
CREATE TABLE R (ID BIGINT) WITH (...);
-- previously the following statement would have succeeded, but will now fail:
SELECT * FROM L JOIN R ON L.ID = R.ID;
The join now fails because joins require the join keys to exactly match, which can not be the case if they are a different SQL type.
workaround:: ksqlDB now supports arbitrary expressions in the join
criteria, allowing you to CAST
either, or both, sides to the same
SQL type. For example,
SELECT * FROM L JOIN R ON CAST(L.ID AS BIGINT) = R.ID;
The statement will result in a stream or table with a different schema
if the join column has a supported primitive key other than STRING
.
The type of the ROWKEY
system column in the resulting table or
stream will match the type of the join column. This can cause downstream
statements to fail.
CREATE STREAM L (ID INT, COUNT BIGINT) WITH (...);
CREATE TABLE R (ID INT, NAME STRING) WITH (...);
-- schema of output has changed!
-- was: ROWKEY STRING KEY, COUNT BIGINT, NAME STRING
-- now: ROWKEY INT KEY, COUNT BIGINT, NAME STRING
SELECT COUNT, NAME FROM L JOIN R ON L.ID = R.ID;
Workaround 1: Fix the downstream queries to use the new SQL type.
Workaround 2: Change the statement to CAST
the join criteria to
a STRING
. This allows the query to operate as it did previously, for
example
-- schema: ROWKEY STRING KEY, COUNT BIGINT, NAME STRING
SELECT COUNT, NAME FROM L JOIN R ON CAST(L.ID AS STRING) = CAST(R.ID AS STRING);
Incompatibility between ROWKEY
type and WITH(KEY)
type¶
CREATE TABLE
and CREATE STREAM
statements accept an optional
KEY
property in their WITH
clause. ksqlDB now requires the
column identified in the KEY
property to have the same SQL type as
key of the Kafka message, i.e. same type as ROWKEY
. This makes
sense, given the column identified in the KEY
property is expected
to hold the exact same data as ROWKEY
if things are to work
correctly.
For example, the statement below would previously have executed, but will now fail:
-- fails with error due to SQL type of `col0` not matching the type of `ROWKEY` column.
CREATE STREAM input (col0 INT) WITH (key='col0', kafka_topic='input', value_format='json');
Workaround 1: Update the statement to explicitly set the type of
ROWKEY
If the key type matches the column type, e.g. in the example above the
key is an INT
, then the statement can be updated to explicitly set
ROWKEY
type:
sql CREATE STREAM input (ROWKEY INT KEY, col0 INT) WITH (key='col0', kafka_topic='input', value_format='json');
Note: in version 0.7 the key column must be called ROWKEY
. This
requirement was removed by
Issue #3536.
Workaround 2: remove the KEY
property from the WITH
clause.
Though this may mean ksqlDB needs to repartition the data where
previously it did not.
Removal of WindowStart()
and WindowEnd()
UDAFs¶
These UDAFs allowed access to the window bounds in the projection of
GROUP BY
queries, for example:
SELECT ROWKEY, WindowStart(), WindowEnd(), COUNT() from input WINDOW SESSION (30 SECONDS) GROUP BY ROWKEY;
These window bound UDAFs have been removed in ksqlDB 0.7.
workaround: Please use the WindowStart
and WindowEnd
system
columns instead, for example:
SELECT ROWKEY, WindowStart, WindowEnd, COUNT() from input WINDOW SESSION (30 SECONDS) GROUP BY ROWKEY;
Windowed ROWKEY
data change¶
Any query of a windowed source that uses ROWKEY
in the SELECT
projection will see the contents of ROWKEY
change, for example:
-- assuming `input` is a windowed stream:
CREATE STREAM output AS SELECT ROWKEY as KEY_COPY, ID, NAME From input WHERE COUNT > 100;
For previous versions of ksqlDB KEY_COPY
would be of type STRING
and would contain data in the format
<key> : Window{start=<window-start>, end=<window-end>}
. From v0.7
onwards the type of KEY_COPY
will match the type and contents of
ROWKEY
.
workaround: if required, the statement can be updated to reconstruct
the old string value by accessing the window bounds using the
WINDOWSTART
and WINDOWEND
system columns, for example:
CREATE STREAM output AS SELECT
(CAST(ROWKEY AS STRING) + " : Window{start=" + CAST(WINDOWSTART AS STRING) + ", end=-}" ) as KEY_COPY,
ID,
NAME
From input WHERE COUNT > 100;
Change in array base index¶
Previous versions of ksqlDB used base-0 indexing when accessing array elements. For example:
CREATE STREAM input (ids ARRAY<BIGINT>) WITH (...);
-- access array using base-zero indexing:
SELECT ids[0] AS firstId, ids[1] AS secondId from input;
Starting from v0.7 ksqlDB more correctly uses base-one indexing.
Workaround: update the statements to use base-one indexing. For example:
SELECT ids[1] AS firstId, ids[2] AS secondId from input;
Change in required order for EMIT CHANGES
and PARTITION BY
¶
Previous releases of ksqlDB required the EMIT CHANGES
before the
PARTITION BY
. For example:
SELECT * FROM input EMIT CHANGES PARTITION BY col0;
Starting from v0.7, ksqlDB requires the PARTITION BY
to come before
the EMIT CHANGES
Workaround: update the statement to reflect the new required order. For example:
SELECT * FROM input PARTITION BY col0 EMIT CHANGES;
ALL
, WINDOWSTART
and WINDOWEND
are now reserved identifiers¶
Any query using these identifiers will need to be changed to either use some other identifier, or to quote them.