KSQL Serialization

Controlling serialization

KSQL offers several mechanisms for controlling serialization and deserialization.

The primary mechanism is by choosing the serialization format when you create a stream or table and specify the VALUE_FORMAT in the WITH clause.

CREATE TABLE ORDERS (F0 INT, F1 STRING) WITH (VALUE_FORMAT='JSON', ...);

For more information on the formats that KSQL supports, see Formats.

KSQL provides some additional configuration that allows serialization to be controlled:

Single field (un)wrapping

Note

The DELIMITED and KAFKA formats do not support single field unwrapping.

Controlling deserializing of single fields

When KSQL deserializes a Kafka message into a row, the key is deserialized into the key field, and the message’s value is deserialized into the value fields.

By default, KSQL expects any value with a single-field schema to have been serialized as a named field within a record. However, this is not always the case. KSQL also supports reading data that has been serialized as an anonymous value.

For example, a value with multiple fields might look like the following in JSON:

{
   "id": 134,
   "name": "John"
}

If the value only had the id field, KSQL would still expect the value to be serialized as a named field, for example:

{
   "id": 134
}

If your data contains only a single field, and that field is not wrapped within a JSON object, or an Avro record if using the AVRO format, then you can use the WRAP_SINGLE_VALUE property in the WITH clause of your CREATE TABLE or CREATE STREAM statements. Setting the property to false tells KSQL that the value is not wrapped, so the example above would be a JSON number:

134

For example, the following creates a table where the values in the underlying topic have been serialized as an anonymous JSON number:

CREATE TABLE TRADES (ID INT) WITH (WRAP_SINGLE_VALUE=false, ...);

If a statement doesn’t set the value wrapping explicitly, KSQL uses the system default, defined by ksql.persistence.wrap.single.values. You can change the system default. For more information, see ksql.persistence.wrap.single.values.

Important

KSQL treats null keys and values as a special case. We recommend avoiding unwrapped single-field schemas if the field can have a null value.

A null value in a table’s topic is treated as a tombstone, which indicates that a row has been removed. If a table’s source topic has an unwrapped single-field key schema and the value is null, it’s treated as a tombstone, resulting in any previous value for the key being removed from the table.

A null key or value in a stream’s topic is ignored when the stream is part of a join. A null value in a table’s topic is treated as a tombstone, and a null key is ignored when the table is part of a join.

When you have an unwrapped single-field schema, ensure that any null key or value has the desired result.

Controlling serialization of single fields

When KSQL serializes a row into a Kafka message, the key field is serialized into the message’s key, and any value fields are serialized into the message’s value.

By default, if the value has only a single field, KSQL serializes the single field as a named field within a record. However, this doesn’t always match the requirements of downstream consumers, so KSQL allows the value to be serialized as an anonymous value.

For example, consider the statements:

CREATE STREAM x (f0 INT, f1 STRING) WITH (VALUE_FORMAT='JSON', ...);
CREATE STREAM y AS SELECT f0 FROM x EMIT CHANGES;

The second statement defines a stream with only a single field in the value, named f0.

By default, when KSQL writes out the result to Kafka, it persists the single field as a named field within a JSON object, or an Avro record if using the AVRO format:

{
   "F0": 10
}

If you require the value to be serialized as an anonymous value, for example:

10

Then you can use the WRAP_SINGLE_VALUE property in your statement.

For example,

CREATE STREAM y WITH(WRAP_SINGLE_VALUE=false) AS SELECT f0 FROM x EMIT CHANGES;

If a statement doesn’t set the value wrapping explicitly, KSQL uses the system default, defined by ksql.persistence.wrap.single.values. You can change the system default. For more information, see ksql.persistence.wrap.single.values.

Important

KSQL treats null keys and values as a special case. We recommended avoiding unwrapped single-field schemas if the field can have a null value.

A null value in a table’s topic is treated as a tombstone, which indicates that a row has been removed. If a table’s source topic has an unwrapped single-field key schema and the value is null, it’s treated as a tombstone, resulting in any previous value for the key being removed from the table.

A null key or value in a stream’s topic is ignored when the stream is part of a join. A null value in a table’s topic is treated as a tombstone, and a null key is ignored when the table is part of a join.

When you have an unwrapped single-field schema, ensure that any null key or value has the desired result.

Single-field serialization examples

-- Assuming system configuration is at the default:
--  ksql.persistence.wrap.single.values=true

-- creates a stream, picking up the system default of wrapping values.
-- the serialized value is expected to be wrapped.
-- if the serialized forms do not match the expected wrapping it will result in a deserialization error.
CREATE STREAM IMPLICIT_SOURCE (NAME STRING) WITH (...);

-- override 'ksql.persistence.wrap.single.values' to false
-- the serialized value is expected to not be unwrapped.
CREATE STREAM EXPLICIT_SOURCE (ID INT) WITH (WRAP_SINGLE_VALUE=false, ...);

-- results in an error as the value schema is multi-field
CREATE STREAM BAD_SOURCE (ID INT, NAME STRING) WITH (WRAP_SINGLE_VALUE=false, ...);

-- creates a stream, picking up the system default of wrapping values.
-- the serialized values in the sink topic will be wrapped.
CREATE STREAM IMPLICIT_SINK AS SELECT ID FROM S EMIT CHANGES;

-- override 'ksql.persistence.wrap.single.values' to false
-- the serialized values will not be wrapped.
CREATE STREAM EXPLICIT_SINK WITH(WRAP_SINGLE_VALUE=false) AS SELECT ID FROM S EMIT CHANGES;

-- results in an error as the value schema is multi-field
CREATE STREAM BAD_SINK WITH(WRAP_SINGLE_VALUE=true) AS SELECT ID, COST FROM S EMIT CHANGES;

Formats

KSQL currently supports three serialization formats:

  • DELIMITED supports comma separated values. See DELIMITED below.
  • JSON supports JSON values. See JSON below.
  • AVRO supports AVRO serialized values. See Avro below.
  • KAFKA supports primitives serialized using the standard Kafka serializers. See KAFKA below.

DELIMITED

The DELIMITED format supports comma separated values. You can use other delimiter characters by specifying the VALUE_DELIMITER when you use VALUE_FORMAT=’DELIMITED’ in a WITH clause. Only a single character is valid as a delimiter. The default is the comma character. For space- and tab-delimited values, use the special values SPACE or TAB, not an actual space or tab character.

The serialized object should be a Kafka-serialized string, which will be split into columns.

For example, given a KSQL statement such as:

CREATE STREAM x (ID BIGINT, NAME STRING, AGE INT) WITH (VALUE_FORMAT='DELIMITED', ...);

KSQL splits a value of 120, bob, 49 into the three fields with ID of 120, NAME of bob and AGE of 49.

This data format supports all KSQL data types except ARRAY, MAP and STRUCT.

JSON

The JSON format supports JSON values.

The JSON format supports all KSQL ref:data types <data-types>. As JSON doesn’t itself support a map type, KSQL serializes MAP types as JSON objects. Because of this the JSON format can only support MAP objects that have STRING keys.

The serialized object should be a Kafka-serialized string containing a valid JSON value. The format supports JSON objects and top-level primitives, arrays and maps. See below for more info.

JSON Objects

Values that are JSON objects are probably the most common.

For example, given a KSQL statement such as:

CREATE STREAM x (ID BIGINT, NAME STRING, AGE INT) WITH (VALUE_FORMAT='JSON', ...);

And a JSON value of:

{
  "id": 120,
  "name": "bob",
  "age": "49"
}

KSQL deserializes the JSON object’s fields into the corresponding fields of the stream.

Top-level primitives, arrays and maps

The JSON format supports reading and writing top-level primitives, arrays and maps.

For example, given a KSQL statement with only a single field in the value schema and the WRAP_SINGLE_VALUE property set to false:

CREATE STREAM x (ID BIGINT) WITH (VALUE_FORMAT='JSON', WRAP_SINGLE_VALUE=false, ...);

And a JSON value of:

10

KSQL can deserialize the values into the ID field of the stream.

When serializing data with a single field, KSQL can serialize the field as an anonymous value if the WRAP_SINGLE_VALUE is set to false, for example:

CREATE STREAM y WITH (WRAP_SINGLE_VALUE=false) AS SELECT id FROM x EMIT CHANGES;

For more information, see Single field (un)wrapping.

Field Name Case Sensitivity

The format is case-insensitive when matching a KSQL field name with a JSON document’s property name. The first case-insensitive match is used.

Avro

The AVRO format supports Avro binary serialization of all KSQL ref:data types <data-types>, including records and top-level primitives, arrays, and maps.

The format requires KSQL to be configured to store and retrieve the Avro schemas from the Confluent Schema Registry. For more information, see Configuring Avro and Schema Registry for KSQL.

Avro Records

Avro records can be deserialized into matching KSQL schemas.

For example, given a KSQL statement such as:

CREATE STREAM x (ID BIGINT, NAME STRING, AGE INT) WITH (VALUE_FORMAT='JSON', ...);

And an Avro record serialized with the schema:

{
  "type": "record",
  "namespace": "com.acme",
  "name": "UserDetails",
  "fields": [
    { "name": "id", "type": "long" },
    { "name": "name", "type": "string" }
    { "name": "age", "type": "int" }
  ]
}

KSQL deserializes the Avro record’s fields into the corresponding fields of the stream.

Top-level primitives, arrays and maps

The Avro format supports reading and writing top-level primitives, arrays and maps.

For example, given a KSQL statement with only a single field in the value schema and the WRAP_SINGLE_VALUE property set to false:

CREATE STREAM x (ID BIGINT) WITH (VALUE_FORMAT='AVRO', WRAP_SINGLE_VALUE=false, ...);

And an Avro value serialized with the schema:

{
  "type": "long"
}

KSQL can deserialize the values into the ID field of the stream.

When serializing data with a single field, KSQL can serialize the field as an anonymous value if the WRAP_SINGLE_VALUE is set to false, for example:

CREATE STREAM y WITH (WRAP_SINGLE_VALUE=false) AS SELECT id FROM x EMIT CHANGES;

For more information, see Single field (un)wrapping.

Field Name Case Sensitivity

The format is case-insensitive when matching a KSQL field name with an Avro record’s field name. The first case-insensitive match is used.

Decimal Serialization

KSQL accepts Decimals that are serialized either as numbers, or the text representation of the base 10 equivalent. For example, KSQL can read data from both formats below:

{
  "value": 1.12345678912345,
  "value": "1.12345678912345"
}

Decimals with specified precision and scale are serialized as JSON floating point numbers. For example:

{
  "value": 1.12345678912345
}

KAFKA

The KAFKA format supports``INT``, BIGINT, DOUBLE and STRING primitives that have been serialized using Kafka’s standard set of serializers.

The format is designed primarily to support primitive message keys. It can be used as a value format, though certain operations aren’t supported when this is the case.

Unlike some other formats, the KAFKA format does not perform any type coercion, so it’s important to correctly match the field type to the underlying serialized form to avoid deserialization errors.

The table below details the SQL types the format supports, including details of the associated Kafka Java Serializer, Deserializer and Connect Converter classes you would need to use to write the key to Kafka, read the key from Kafka, or use to configure Apache Connect to work with the KAFKA format, respectively.

KSQL Field Type Kafka Type Kafka Serializer Kafka Deserializer Connect Converter
INT / INTEGER A 32-bit signed integer org.apache.kafka.common.serialization.IntegerSerializer org.apache.kafka.common.serialization.IntegerDeserializer org.apache.kafka.connect.storage.IntegerConverter
BIGINT A 64-bit signed integer org.apache.kafka.common.serialization.LongSerializer org.apache.kafka.common.serialization.LongDeserializer org.apache.kafka.connect.storage.LongConverter
DOUBLE A 64-bit floating point number org.apache.kafka.common.serialization.DoubleSerializer org.apache.kafka.common.serialization.DoubleDeserializer org.apache.kafka.connect.storage.DoubleConverter
STRING / VARCHAR A UTF-8 encoded text string org.apache.kafka.common.serialization.StringSerializer org.apache.kafka.common.serialization.StringDeserializer org.apache.kafka.connect.storage.StringConverter

Because the format supports only primitive types, you can only use it when the schema contains a single field.

For example, if your Kafka messages have a long key, you can make them available to KSQL a statement similar to:

CREATE STREAM USERS (ROWKEY BIGINT KEY, NAME STRING) WITH (KEY_FORMAT='KAFKA', VALUE_FORMAT='JSON', ...);