KSQL Serialization

Controlling serialization

KSQL offers several mechanisms for controlling serialization and deserialization.

The primary mechanism is by choosing the serialization format when you create a stream or table and specify the VALUE_FORMAT in the WITH clause.

CREATE TABLE x (F0 INT, F1 STRING) WITH (VALUE_FORMAT='JSON', ...);

For more information on the formats that KSQL supports, see Formats.

KSQL provides some additional configuration that allows serialization to be controlled:

Formats

KSQL currently supports three serialization formats:

  • DELIMITED supports comma separated values. See DELIMITED below.
  • JSON supports JSON values. See JSON below.
  • AVRO supports AVRO serialized values. See Avro below.

DELIMITED

The DELIMITED format supports comma separated values.

The serialized object should be a Kafka-serialized string, which will be split into columns.

For example, given a KSQL statement such as:

CREATE STREAM x (ID BIGINT, NAME STRING, AGE INT) WITH (VALUE_FORMAT='DELIMITED', ...);

KSQL splits a value of 120, bob, 49 into the three fields with ID of 120, NAME of bob and AGE of 49.

This data format supports all KSQL data types except ARRAY, MAP and STRUCT.

JSON

The JSON format supports JSON values.

The JSON format supports all of KSQL’s ref:data types <data-types>. As JSON does not itself support a map type, KSQL serializes MAP``s as JSON objects.  Because of this the JSON format can only support ``MAP objects that have STRING keys.

The serialized object should be a Kafka-serialized string containing a valid JSON value. The format supports JSON objects only. Top-level primitives, arrays and maps are not currently supported.

JSON Objects

Values that are JSON objects are probably the most common.

For example, given a KSQL statement such as:

CREATE STREAM x (ID BIGINT, NAME STRING, AGE INT) WITH (VALUE_FORMAT='JSON', ...);

And a JSON value of:

{
  "id": 120,
  "name": "bob",
  "age": "49"
}

KSQL deserializes the JSON object’s fields into the corresponding fields of the stream.

Avro

The AVRO format supports Avro binary serialization of all of KSQL’s ref:data types <data-types>.

The format supports Avro records only. Top-level primitives, arrays and maps are not supported at this time.

The format requires KSQL to be configured to store and retrieve the Avro schemas from the Confluent Schema Registry. For more information, see Configuring Avro and Schema Registry for KSQL.

Avro Records

Avro records can be deserialized into matching KSQL schemas.

For example, given a KSQL statement such as:

CREATE STREAM x (ID BIGINT, NAME STRING, AGE INT) WITH (VALUE_FORMAT='JSON', ...);

And an Avro record serialized with the schema:

{
  "type": "record",
  "namespace": "com.acme",
  "name": "UserDetails",
  "fields": [
    { "name": "id", "type": "long" },
    { "name": "name", "type": "string" }
    { "name": "age", "type": "int" }
  ]
}

KSQL deserializes the Avro record’s fields into the corresponding fields of the stream.