Schema Inference With ID in ksqlDB for Confluent Platform¶
For supported Serialization Formats,
ksqlDB can use schema inference
to retrieve (read) and register (write) schemas as needed. If you specify a
KEY_SCHEMA_ID
or VALUE_SCHEMA_ID
explicitly in the CREATE
statements, ksqlDB retrieves and registers the schema specified by the
ID from Schema Registry, and it also serializes data using exactly the same
schema referred to by the ID. This can spare you from defining columns
and data types manually and also make sure the data are serialized by
the specified physical schema, which can be consumed in downstream
systems. Before using schema inference with explicit IDs in ksqlDB, make
sure that the Schema Registry is up and running and ksqlDB is
configured to use it.
Here’s what you can do with schema inference with IDs in ksqlDB:
- Declare streams and tables on Kafka topics with supported key
and value formats by using
CREATE STREAM
andCREATE TABLE
statements withKEY_SCHEMA_ID
orVALUE_SCHEMA_ID
properties, without the need to declare the key and value columns. - Declare derived views with
CREATE STREAM AS SELECT
andCREATE TABLE AS SELECT
statements withKEY_SCHEMA_ID
orVALUE_SCHEMA_ID
properties. The schema of the view is registered in Schema Registry automatically. - Serialize output data using the schema referred to by
KEY_SCHEMA_ID
orVALUE_SCHEMA_ID
, instead of the logical data source schema stored in ksqlDB.
If you’re declaring a stream or table with a key format that’s different from its value format, and only one of the two formats supports schema inference, you can explicitly provide the columns for the format that doesn’t support schema inference while still having ksqlDB load columns for the format that does support schema inference from Schema Registry. This is known as partial schema inference. To infer value columns for a keyless stream, set the key format to NONE.
Tables require a PRIMARY KEY
, so you must supply one explicitly in
your CREATE TABLE
statement. KEY
columns are optional for
streams, so if you don’t supply one, the stream is created without a key
column.
Note
The following example statements show how to create streams and tables
that have Avro-formatted data. If you want to use Protobuf or
JSON-formatted data, substitute PROTOBUF
or JSON_SR
for AVRO
in each statement.
Create a new stream or table¶
When KEY_SCHEMA_ID
or VALUE_SCHEMA_ID
is used in statements to
create a stream or table, the schema fetched from Schema Registry is used to
infer data source’s columns and serialize output data. See
Schema inference and data serialization
for details about how columns are inferred and data are serialized.
Important
- The schemas referred to by
KEY_SCHEMA_ID
andVALUE_SCHEMA_ID
must be registered in Schema Registry. They can be under any subject but must match the formats defined byKEY_FORMAT
andVALUE_FORMAT
, respectively. - You can’t define key or value columns in a statement if a corresponding
KEY_SCHEMA_ID
orVALUE_SCHEMA_ID
is supplied.
Without a key column¶
The following statement shows how to create a new pageviews
stream
by reading from a Kafka topic that has Avro-formatted message
values.
CREATE STREAM pageviews
WITH (
KAFKA_TOPIC='pageviews-avro-topic',
VALUE_FORMAT='AVRO',
VALUE_SCHEMA_ID=1
);
In this example, you don’t need to define any columns in the CREATE
statement. ksqlDB infers this information automatically from Schema Registry
using the provided VALUE_SCHEMA_ID
.
With a key column¶
The following statement shows how to create a new pageviews
stream
by reading from a Kafka topic that has Avro-formatted key and
message values.
CREATE STREAM pageviews WITH (
KAFKA_TOPIC='pageviews-avro-topic',
KEY_FORMAT='AVRO',
VALUE_FORMAT='AVRO',
KEY_SCHEMA_ID=1,
VALUE_SCHEMA_ID=2
);
In this example, ksqlDB infers the key and value columns automatically
from Schema Registry using the provided KEY_SCHEMA_ID
and
VALUE_SCHEMA_ID
.
With partial schema inference¶
The following statement shows how to create a new pageviews
table by
reading from a Kafka topic that has Avro-formatted message values
and a KAFKA
-formatted INT
primary key.
CREATE TABLE pageviews (
pageId INT PRIMARY KEY
) WITH (
KAFKA_TOPIC='pageviews-avro-topic',
KEY_FORMAT='KAFKA',
VALUE_FORMAT='AVRO',
VALUE_SCHEMA_ID=1
);
In this example, only the key column is supplied in the CREATE
statement. ksqlDB infers the value columns automatically from
Schema Registry using the provided VALUE_SCHEMA_ID
.
Declaring a derived view with schema ID.¶
The following statement shows how to create a materialized view derived
from an existing source with the VALUE_SCHEMA_ID
property. The
schema referred to by VALUE_SCHEMA_ID
is used to check column
compatibility with output columns and serialize output data. For more
information, see
Schema inference and data serialization.
CREATE STREAM pageviews_new
WITH (
VALUE_FORMAT='AVRO',
VALUE_SCHEMA_ID=1
) AS
SELECT
pageId,
ts
FROM pageviews
Important
The schema referred to by VALUE_SCHEMA_ID
must be compatible with the
logical schema defined by the SELECT
clause. For more information, see
Schema inference and data serialization.
Schema inference and data serialization¶
The schema in Schema Registry is a “physical schema”, and the schema in
ksqlDB is a “logical schema”. The physical schema, not the logical
schema, is registered under the subject <topic-name>-key
or
<topic-name>-value
if corresponding KEY_SCHEMA_ID
or
VALUE_SCHEMA_ID
values are provided.
Schema inference schema requirements¶
If WRAP_SINGLE_VALUE
is set to true
in the SQL statement, the
physical schema is expected to be a struct
type, and the field names
are used as data source column names. Field types are inferred from
corresponding column data types.
- In
AVRO
, thestruct
type corresponds with therecord
type. - In
PROTOBUF
thestruct
type corresponds with themessage
type. - In
JSON_SR
, thestruct
type corresponds with theobject
type.
Note
In the following examples, the AVRO
schema string in Schema Registry is a
single-line raw string without newline characters (\n
). The strings
are shown as human-readable text for convenience.
For example, the following a physical schema is in AVRO
format and
is registered with Schema Registry with ID 1:
{
"schema": {
"type": "record",
"name": "PageViewValueSchema",
"namespace": "io.confluent.ksql.avro_schemas",
"fields": [
{
"name": "page_name",
"type": "string",
"default": "abc"
},
{
"name": "ts",
"type": "int",
"default": 123
}
]
}
}
The following CREATE
statement defines a stream on the pageviews
topic and specifies the physical schema that has an ID of 1
.
CREATE STREAM pageviews (pageId INT KEY)
WITH (
KAFKA_TOPIC='pageviews-avro-topic', KEY_FORMAT='KAFKA', VALUE_FORMAT='AVRO',VALUE_SCHEMA_ID=1,PARTITIONS=1
);
The following output from the describe pageviews
command shows the
inferred logical schema for the pageviews
stream:
DESCRIBE pageviews;
Name : PAGEVIEWS
Field | Type
------------------------------------
PAGEID | INTEGER (key)
page_name | VARCHAR(STRING)
ts | INTEGER
------------------------------------
If WRAP_SINGLE_VALUE
is false
in the statement, and if
KEY_SCHEMA_ID
is set, ROWKEY
is used as the key’s column name.
If VALUE_SCHEMA_ID
is set, ROWVAL
is used as the value’s column
name. The physical schema is used as the column data type.
For example, the following physical schema is AVRO
and is defined in
Schema Registry with ID 2
:
{"schema": "int"}
The following CREATE
statement defines a table on the
pageview-count
topic and specifies the physical schema that has ID
2
.
sql hl_lines="7" CREATE TABLE pageview_count ( pageId INT PRIMARY KEY ) WITH ( KAFKA_TOPIC='pageview-count', KEY_FORMAT='KAFKA', VALUE_FORMAT='AVRO', VALUE_SCHEMA_ID=2, WRAP_SINGLE_VALUE=false, PARTITIONS=1 );
The inferred logical schema for the pageview_count
table is:
Name : PAGEVIEW_COUNT
Field | Type
-----------------------------------------
PAGEID | INTEGER (primary key)
ROWVAL | INTEGER
-----------------------------------------
For more information about WRAP_SINGLE_VALUE
, see
Single field unwrapping.
Schema Inference Type Handling¶
ksqlDB supports the null
value for KEY
and VALUE
columns. If
a field in the physical schema is a required type, it’s translated to an
optional type in the logical schema. This has subtle implications for
data serialization which are explained in the following section.
Important
- ksqlDB ignores unsupported types in the physical schema and continues translating supported types to the logical schema. You should verify that the logical schema is translated as expected.
- During schema translation from a physical schema to a logical schema,
struct
type field names are used as column names in the logical schema. Field names are not translated to uppercase, in contrast with schema inference without a schema id, which does translate field names to uppercase.
Schema Compatibility Check for Derived View¶
You can use schema IDs when creating a materialized view, but instead of inferring the logical schema for the view, the schema is used to check compatibility against the query’s projection and serialized output data. For compatibility checks, the inferred logical schema must be a superset of the query’s projection schema, which means corresponding column names, types, and order must match. The inferred logical schema may have extra columns.
The following example creates the pageviews_new
topic as the result
of a SELECT query:
CREATE STREAM pageviews_new
WITH (
VALUE_FORMAT='AVRO',
VALUE_SCHEMA_ID=1
) AS
SELECT
pageId,
ts
FROM pageviews
If the pageviews
value column has the type ts INT
, the logical
schema of pageviews_new
is decided by the projection in the query
SELECT pageId, ts FROM pageviews
. When VALUE_SCHEMA_ID
is used,
the inferred logical schema is checked against ts INT
for
compatibility.
The following example shows a compatible physical schema:
json hl_lines="8-9" { "schema": { "type": "record", "name": "PageViewNewSchema", "namespace": "io.confluent.ksql.avro_schemas", "fields": [ { "name": "ts", "type": "int", "default": 123 }, { "name": "title", "type": "string", "default": "title" } ] } }
In this AVRO
schema, title
is an extra field. Because the
physical schema is used for data serialization, the title
field with
a default value appears in serialized data, even though the inserted
value can never set the title
field, because it’s not in the logical
schema defined by the SELECT
clause of the query.
The following example shows an incompatible physical schema, which is
incompatible because of the type mismatch for pageId
.
json hl_lines="8-9" { "schema": { "type": "record", "name": "PageViewNewSchema", "namespace": "io.confluent.ksql.avro_schemas", "fields": [ { "name": "pageId", "type": "string", "default": "id" }, { "name": "ts", "type": "int", "default": 123 } ] } }
Data Serialization¶
When a schema ID is provided, and schema inference is successful, ksqlDB
can create the data source. When writing to the data source, the
physical schema inferred by the schema ID is used to serialize data,
instead of the logical schema that’s used in other cases. Because
ksqlDB’s logical schema accepts null
values but the physical schema
may not, serialization can fail even if the inserted value is valid for
the logical schema.
The following example shows a physical schema that’s defined in
Schema Registry with ID 1
. No default values are specified for the
page_name
and ts
fields.
json hl_lines="8-9 12-13" { "schema": { "type": "record", "name": "PageViewValueSchema", "namespace": "io.confluent.ksql.avro_schemas", "fields": [ { "name": "page_name", "type": "string" }, { "name": "ts", "type": "int" } ] } }
The following example creates a stream with schema ID 1
:
CREATE STREAM pageviews (
pageId INT KEY
) WITH (
KAFKA_TOPIC='pageviews-avro-topic',
KEY_FORMAT='KAFKA',
VALUE_FORMAT='AVRO',
VALUE_SCHEMA_ID=1,
PARTITIONS=1
);
ksqlDB infers the following schema for pageviews
:
Name : PAGEVIEWS
Field | Type
------------------------------------
PAGEID | INTEGER (key)
page_name | VARCHAR(STRING)
ts | INTEGER
------------------------------------
If you insert values to pageviews
with null
values, ksqlDB
returns an error:
INSERT INTO pageviews VALUES (1, null, null);
Failed to insert values into 'PAGEVIEWS'. Could not serialize value: [ null | null ]. Error serializing message to topic: pageviews-avro-topic1. Invalid value: null used for required field: "page_name", schema type: STRING
This error occurs because page_name
and ts
are required fields
without default values in the specified physical schema.
Important
ksqlDB doesn’t check that null
can be serialized in a physical schema
that contains required fields. You must ensure that null
can be handled
properly, either by making physical schema fields optional or by using the
IFNULL
function to ensure that null
is never inserted.
Important
If key_schema_id
is used in table creation with windowed aggregation,
the serialized key value also contain window information in addition to
original key value.