Important
You are viewing documentation for an older version of Confluent Platform. For the latest, click here.
KSQL Serialization¶
Controlling serialization¶
KSQL offers several mechanisms for controlling serialization and deserialization.
The primary mechanism is by choosing the serialization format when you create
a stream or table and specify the VALUE_FORMAT
in the WITH
clause.
CREATE TABLE ORDERS (F0 INT, F1 STRING) WITH (VALUE_FORMAT='JSON', ...);
For more information on the formats that KSQL supports, see Formats.
KSQL provides some additional configuration that allows serialization to be controlled:
Single field (un)wrapping¶
Note
The DELIMITED
and KAFKA
formats do not support single field unwrapping.
Controlling deserializing of single fields¶
When KSQL deserializes a Kafka message into a row, the key is deserialized into the key field, and the message’s value is deserialized into the value fields.
By default, KSQL expects any value with a single-field schema to have been serialized as a named field within a record. However, this is not always the case. KSQL also supports reading data that has been serialized as an anonymous value.
For example, a value with multiple fields might look like the following in JSON:
{
"id": 134,
"name": "John"
}
If the value only had the id
field, KSQL would still expect the value to be serialized as a
named field, for example:
{
"id": 134
}
If your data contains only a single field, and that field is not wrapped within a JSON object,
or an Avro record if using the AVRO
format, then you can use the WRAP_SINGLE_VALUE
property in the WITH
clause of your CREATE TABLE or
CREATE STREAM statements. Setting the property to false
tells KSQL
that the value is not wrapped, so the example above would be a JSON number:
134
For example, the following creates a table where the values in the underlying topic have been serialized as an anonymous JSON number:
CREATE TABLE TRADES (ID INT) WITH (WRAP_SINGLE_VALUE=false, ...);
If a statement doesn’t set the value wrapping explicitly, KSQL uses the system
default, defined by ksql.persistence.wrap.single.values
. You can change the system default.
For more information, see ksql.persistence.wrap.single.values.
Important
KSQL treats null
keys and values as a special case. We recommend avoiding
unwrapped single-field schemas if the field can have a null
value.
A null
value in a table’s topic is treated as a tombstone, which indicates that a row has been
removed. If a table’s source topic has an unwrapped single-field key schema and the value is
null
, it’s treated as a tombstone, resulting in any previous value for the key being removed
from the table.
A null
key or value in a stream’s topic is ignored when the stream is part of a join.
A null
value in a table’s topic is treated as a tombstone, and a null
key is
ignored when the table is part of a join.
When you have an unwrapped single-field schema, ensure that any null
key or
value has the desired result.
Controlling serialization of single fields¶
When KSQL serializes a row into a Kafka message, the key field is serialized into the message’s key, and any value fields are serialized into the message’s value.
By default, if the value has only a single field, KSQL serializes the single field as a named field within a record. However, this doesn’t always match the requirements of downstream consumers, so KSQL allows the value to be serialized as an anonymous value.
For example, consider the statements:
CREATE STREAM x (f0 INT, f1 STRING) WITH (VALUE_FORMAT='JSON', ...);
CREATE STREAM y AS SELECT f0 FROM x EMIT CHANGES;
The second statement defines a stream with only a single field in the value,
named f0
.
By default, when KSQL writes out the result to Kafka, it persists the single field as
a named field within a JSON object, or an Avro record if using the AVRO
format:
{
"F0": 10
}
If you require the value to be serialized as an anonymous value, for example:
10
Then you can use the WRAP_SINGLE_VALUE
property in your statement.
For example,
CREATE STREAM y WITH(WRAP_SINGLE_VALUE=false) AS SELECT f0 FROM x EMIT CHANGES;
If a statement doesn’t set the value wrapping explicitly, KSQL uses the system
default, defined by ksql.persistence.wrap.single.values
. You can change the system default.
For more information, see ksql.persistence.wrap.single.values.
Important
KSQL treats null
keys and values as a special case. We recommended avoiding
unwrapped single-field schemas if the field can have a null
value.
A null
value in a table’s topic is treated as a tombstone, which indicates that a row has been
removed. If a table’s source topic has an unwrapped single-field key schema and the value is
null
, it’s treated as a tombstone, resulting in any previous value for the key being removed
from the table.
A null
key or value in a stream’s topic is ignored when the stream is part of a join.
A null
value in a table’s topic is treated as a tombstone, and a null
key is
ignored when the table is part of a join.
When you have an unwrapped single-field schema, ensure that any null
key or
value has the desired result.
Single-field serialization examples¶
-- Assuming system configuration is at the default:
-- ksql.persistence.wrap.single.values=true
-- creates a stream, picking up the system default of wrapping values.
-- the serialized value is expected to be wrapped.
-- if the serialized forms do not match the expected wrapping it will result in a deserialization error.
CREATE STREAM IMPLICIT_SOURCE (NAME STRING) WITH (...);
-- override 'ksql.persistence.wrap.single.values' to false
-- the serialized value is expected to not be unwrapped.
CREATE STREAM EXPLICIT_SOURCE (ID INT) WITH (WRAP_SINGLE_VALUE=false, ...);
-- results in an error as the value schema is multi-field
CREATE STREAM BAD_SOURCE (ID INT, NAME STRING) WITH (WRAP_SINGLE_VALUE=false, ...);
-- creates a stream, picking up the system default of wrapping values.
-- the serialized values in the sink topic will be wrapped.
CREATE STREAM IMPLICIT_SINK AS SELECT ID FROM S EMIT CHANGES;
-- override 'ksql.persistence.wrap.single.values' to false
-- the serialized values will not be wrapped.
CREATE STREAM EXPLICIT_SINK WITH(WRAP_SINGLE_VALUE=false) AS SELECT ID FROM S EMIT CHANGES;
-- results in an error as the value schema is multi-field
CREATE STREAM BAD_SINK WITH(WRAP_SINGLE_VALUE=true) AS SELECT ID, COST FROM S EMIT CHANGES;
Formats¶
KSQL currently supports three serialization formats:
DELIMITED
supports comma separated values. See DELIMITED below.JSON
supports JSON values. See JSON below.AVRO
supports AVRO serialized values. See Avro below.KAFKA
supports primitives serialized using the standard Kafka serializers. See KAFKA below.
DELIMITED¶
The DELIMITED
format supports comma separated values. You can use other
delimiter characters by specifying the VALUE_DELIMITER when you use
VALUE_FORMAT=’DELIMITED’ in a WITH clause. Only a single character is valid
as a delimiter. The default is the comma character. For space- and
tab-delimited values, use the special values SPACE
or TAB
, not an actual
space or tab character.
The serialized object should be a Kafka-serialized string, which will be split into columns.
For example, given a KSQL statement such as:
CREATE STREAM x (ID BIGINT, NAME STRING, AGE INT) WITH (VALUE_FORMAT='DELIMITED', ...);
KSQL splits a value of 120, bob, 49
into the three fields with ID
of 120
,
NAME
of bob
and AGE
of 49
.
This data format supports all KSQL data types except ARRAY
, MAP
and
STRUCT
.
JSON¶
The JSON
format supports JSON values.
The JSON format supports all KSQL ref:data types <data-types>. As JSON doesn’t itself
support a map type, KSQL serializes MAP
types as JSON objects. Because of this the JSON format can
only support MAP
objects that have STRING
keys.
The serialized object should be a Kafka-serialized string containing a valid JSON value. The format supports JSON objects and top-level primitives, arrays and maps. See below for more info.
JSON Objects¶
Values that are JSON objects are probably the most common.
For example, given a KSQL statement such as:
CREATE STREAM x (ID BIGINT, NAME STRING, AGE INT) WITH (VALUE_FORMAT='JSON', ...);
And a JSON value of:
{
"id": 120,
"name": "bob",
"age": "49"
}
KSQL deserializes the JSON object’s fields into the corresponding fields of the stream.
Top-level primitives, arrays and maps¶
The JSON format supports reading and writing top-level primitives, arrays and maps.
For example, given a KSQL statement with only a single field in the value schema and the
WRAP_SINGLE_VALUE
property set to false
:
CREATE STREAM x (ID BIGINT) WITH (VALUE_FORMAT='JSON', WRAP_SINGLE_VALUE=false, ...);
And a JSON value of:
10
KSQL can deserialize the values into the ID
field of the stream.
When serializing data with a single field, KSQL can serialize the field as an anonymous value if
the WRAP_SINGLE_VALUE
is set to false
, for example:
CREATE STREAM y WITH (WRAP_SINGLE_VALUE=false) AS SELECT id FROM x EMIT CHANGES;
For more information, see Single field (un)wrapping.
Field Name Case Sensitivity¶
The format is case-insensitive when matching a KSQL field name with a JSON document’s property name. The first case-insensitive match is used.
Avro¶
The AVRO
format supports Avro binary serialization of all KSQL ref:data types <data-types>,
including records and top-level primitives, arrays, and maps.
The format requires KSQL to be configured to store and retrieve the Avro schemas from the Confluent Schema Registry. For more information, see Configuring Avro and Schema Registry for KSQL.
Avro Records¶
Avro records can be deserialized into matching KSQL schemas.
For example, given a KSQL statement such as:
CREATE STREAM x (ID BIGINT, NAME STRING, AGE INT) WITH (VALUE_FORMAT='JSON', ...);
And an Avro record serialized with the schema:
{
"type": "record",
"namespace": "com.acme",
"name": "UserDetails",
"fields": [
{ "name": "id", "type": "long" },
{ "name": "name", "type": "string" }
{ "name": "age", "type": "int" }
]
}
KSQL deserializes the Avro record’s fields into the corresponding fields of the stream.
Top-level primitives, arrays and maps¶
The Avro format supports reading and writing top-level primitives, arrays and maps.
For example, given a KSQL statement with only a single field in the value schema and the
WRAP_SINGLE_VALUE
property set to false
:
CREATE STREAM x (ID BIGINT) WITH (VALUE_FORMAT='AVRO', WRAP_SINGLE_VALUE=false, ...);
And an Avro value serialized with the schema:
{
"type": "long"
}
KSQL can deserialize the values into the ID
field of the stream.
When serializing data with a single field, KSQL can serialize the field as an anonymous value if
the WRAP_SINGLE_VALUE
is set to false
, for example:
CREATE STREAM y WITH (WRAP_SINGLE_VALUE=false) AS SELECT id FROM x EMIT CHANGES;
For more information, see Single field (un)wrapping.
Field Name Case Sensitivity¶
The format is case-insensitive when matching a KSQL field name with an Avro record’s field name. The first case-insensitive match is used.
Decimal Serialization¶
KSQL accepts Decimals that are serialized either as numbers, or the text representation of the base 10 equivalent. For example, KSQL can read data from both formats below:
{
"value": 1.12345678912345,
"value": "1.12345678912345"
}
Decimals with specified precision and scale are serialized as JSON floating point numbers. For example:
{
"value": 1.12345678912345
}
KAFKA¶
The KAFKA
format supports``INT``, BIGINT
, DOUBLE
and STRING
primitives that have
been serialized using Kafka’s standard set of serializers.
The format is designed primarily to support primitive message keys. It can be used as a value format, though certain operations aren’t supported when this is the case.
Unlike some other formats, the KAFKA
format does not perform any type coercion, so it’s important
to correctly match the field type to the underlying serialized form to avoid deserialization errors.
The table below details the SQL types the format supports, including details of the associated Kafka
Java Serializer, Deserializer and Connect Converter classes you would need to use to write the key
to Kafka, read the key from Kafka, or use to configure Apache Connect to work with the KAFKA
format,
respectively.
KSQL Field Type | Kafka Type | Kafka Serializer | Kafka Deserializer | Connect Converter |
---|---|---|---|---|
INT / INTEGER | A 32-bit signed integer | org.apache.kafka.common.serialization.IntegerSerializer |
org.apache.kafka.common.serialization.IntegerDeserializer |
org.apache.kafka.connect.storage.IntegerConverter |
BIGINT | A 64-bit signed integer | org.apache.kafka.common.serialization.LongSerializer |
org.apache.kafka.common.serialization.LongDeserializer |
org.apache.kafka.connect.storage.LongConverter |
DOUBLE | A 64-bit floating point number | org.apache.kafka.common.serialization.DoubleSerializer |
org.apache.kafka.common.serialization.DoubleDeserializer |
org.apache.kafka.connect.storage.DoubleConverter |
STRING / VARCHAR | A UTF-8 encoded text string | org.apache.kafka.common.serialization.StringSerializer |
org.apache.kafka.common.serialization.StringDeserializer |
org.apache.kafka.connect.storage.StringConverter |
Because the format supports only primitive types, you can only use it when the schema contains a single field.
For example, if your Kafka messages have a long
key, you can make them available to KSQL a statement
similar to:
CREATE STREAM USERS (ROWKEY BIGINT KEY, NAME STRING) WITH (KEY_FORMAT='KAFKA', VALUE_FORMAT='JSON', ...);