.. title:: Kafka Streams Data Types and Serialization for Confluent Platform .. meta:: :description: Learn about data types and Serdes you can use in your Kafka Streams applications. .. _streams_developer-guide_serdes: |kstreams| Data Types and Serialization for |cp| ------------------------------------------------ Every |kstreams| application must provide Serdes (Serializer/Deserializer) for the data types of record keys and record values (e.g. ``java.lang.String`` or Avro objects) to materialize the data when necessary. Operations that require such Serdes information include: ``stream()``, ``table()``, ``to()``, ``repartition()``, ``groupByKey()``, ``groupBy()``. You can provide Serdes by using either of these methods, but you must use at least one of these methods: - By setting default Serdes via a ``Properties`` instance. - By specifying explicit Serdes when calling the appropriate API methods, thus overriding the defaults. .. include:: ../../includes/connect-streams-pipeline-link.rst :start-line: 2 :end-line: 6 Configuring Serdes ^^^^^^^^^^^^^^^^^^ Serdes specified in the Streams configuration via the ``Properties`` config are used as the default in your |kstreams| application. Because this config's default is null, you must either set a default Serde by using this configuration or pass in Serdes explicitly, as described below. .. sourcecode:: java import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.StreamsConfig; Properties settings = new Properties(); // Default serde for keys of data records (here: built-in serde for String type) settings.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); // Default serde for values of data records (here: built-in serde for Long type) settings.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Long().getClass().getName()); If a ``Serde`` is specified via ``Properties``, the ``Serde`` class can't have generic types, which meant that you can't use a class like ``MySerde implements Serde``. This implies that you can't use any ``Serde`` that is created via ``Serdes.serdeFrom(Serializer, Deserializer)``. Only fully typed ``Serde`` classes like ``MySerde implements Serde`` are supported, due to Java type erasure. Overriding default Serdes ^^^^^^^^^^^^^^^^^^^^^^^^^ You can also specify Serdes explicitly by passing them to the appropriate API methods, which overrides the default serde settings: .. sourcecode:: java import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; final Serde stringSerde = Serdes.String(); final Serde longSerde = Serdes.Long(); // The stream userCountByRegion has type `String` for record keys (for region) // and type `Long` for record values (for user counts). KStream userCountByRegion = ...; userCountByRegion.to("RegionCountsTopic", Produced.with(stringSerde, longSerde)); If you want to override serdes selectively, i.e., keep the defaults for some fields, then don't specify the serde whenever you want to leverage the default settings: .. sourcecode:: java import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; // Use the default serializer for record keys (here: region as String) by not specifying the key serde, // but override the default serializer for record values (here: userCount as Long). final Serde longSerde = Serdes.Long(); KStream userCountByRegion = ...; userCountByRegion.to("RegionCountsTopic", Produced.valueSerde(Serdes.Long())); If some of your incoming records are corrupted or ill-formatted, they will cause the deserializer class to report an error. An interface named ``org.apache.kafka.streams.errors.DeserializationExceptionHandler`` enables you to customize how to handle such records. Specify the customized implementation of the interface by using ``StreamsConfig``. For more information, see :ref:`Failure and exception handling FAQ `. .. _streams_developer-guide_serdes-available: Available Serdes ^^^^^^^^^^^^^^^^ Primitive and basic types """"""""""""""""""""""""" |ak-tm| includes several built-in serde implementations for Java primitives and basic types such as ``byte[]`` in its ``kafka-clients`` Maven artifact: .. codewithvars:: xml org.apache.kafka kafka-clients |kafka_release| This artifact provides the following serde implementations under the package :kafka-file:`org.apache.kafka.common.serialization|clients/src/main/java/org/apache/kafka/common/serialization`, which you can leverage when e.g., defining default serializers in your Streams configuration. +------------+------------------------------------------------------------+ | Data type | Serde | +============+============================================================+ | byte[] | ``Serdes.ByteArray()``, ``Serdes.Bytes()`` (see tip below) | +------------+------------------------------------------------------------+ | ByteBuffer | ``Serdes.ByteBuffer()`` | +------------+------------------------------------------------------------+ | Double | ``Serdes.Double()`` | +------------+------------------------------------------------------------+ | Integer | ``Serdes.Integer()`` | +------------+------------------------------------------------------------+ | Long | ``Serdes.Long()`` | +------------+------------------------------------------------------------+ | String | ``Serdes.String()`` | +------------+------------------------------------------------------------+ | UUID | ``Serdes.UUID()`` | +------------+------------------------------------------------------------+ .. tip:: :kafka-file:`Bytes|clients/src/main/java/org/apache/kafka/common/utils/Bytes.java` is a wrapper for Java's ``byte[]`` (byte array) that supports proper equality and ordering semantics. You may want to consider using ``Bytes`` instead of ``byte[]`` in your applications. You would use the built-in Serdes as follows, using the example of the String serde: .. sourcecode:: java // When configuring the default Serdes of StreamConfig Properties streamsConfiguration = new Properties(); streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); // When you want to override Serdes explicitly/selectively final Serde stringSerde = Serdes.String(); StreamsBuilder builder = new StreamsBuilder(); builder.stream("my-avro-topic", Consumed.with(keyGenericAvroSerde, valueGenericAvroSerde)); .. _streams-data-avro: Avro """" Confluent provides |sr|-compatible `Avro serdes `__ for data in generic Avro and in specific Avro format: .. codewithvars:: xml io.confluent kafka-streams-avro-serde |release| Both the generic and the specific Avro serde require you to configure the endpoint of :ref:`Confluent Schema Registry ` via the ``schema.registry.url`` setting: * When you define the generic or specific Avro serde as a default serde via ``StreamsConfig``, then you must also set the |sr| endpoint in ``StreamsConfig``. * When you instantiate the generic or specific Avro serde directly (e.g., ``new GenericAvroSerde()``), you must call ``Serde#configure()`` on the serde instance to set the |sr| endpoint before using the serde instance. Additionally, you must tell ``Serde#configure()`` via a boolean parameter whether the serde instance is used for serializing/deserializing record *keys* (``true``) or record *values* (``false``). Usage example for Confluent ``GenericAvroSerde``: .. sourcecode:: java // Generic Avro serde example import io.confluent.kafka.streams.serdes.avro.GenericAvroSerde; // When configuring the default serdes of StreamConfig final Properties streamsConfiguration = new Properties(); streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, GenericAvroSerde.class); streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, GenericAvroSerde.class); streamsConfiguration.put("schema.registry.url", "http://my-schema-registry:8081"); // When you want to override serdes explicitly/selectively final Map serdeConfig = Collections.singletonMap("schema.registry.url", "http://my-schema-registry:8081"); // `Foo` and `Bar` are Java classes generated from Avro schemas final Serde keyGenericAvroSerde = new GenericAvroSerde(); keyGenericAvroSerde.configure(serdeConfig, true); // `true` for record keys final Serde valueGenericAvroSerde = new GenericAvroSerde(); valueGenericAvroSerde.configure(serdeConfig, false); // `false` for record values StreamsBuilder builder = new StreamsBuilder(); KStream textLines = builder.stream("my-avro-topic", Consumed.with(keyGenericAvroSerde, valueGenericAvroSerde)); Usage example for Confluent ``SpecificAvroSerde``: .. sourcecode:: java // Specific Avro serde example import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde; // When configuring the default serdes of StreamConfig final Properties streamsConfiguration = new Properties(); streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, SpecificAvroSerde.class); streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, SpecificAvroSerde.class); streamsConfiguration.put("schema.registry.url", "http://my-schema-registry:8081"); // When you want to override serdes explicitly/selectively final Map serdeConfig = Collections.singletonMap("schema.registry.url", "http://my-schema-registry:8081"); // `Foo` and `Bar` are Java classes generated from Avro schemas final Serde keySpecificAvroSerde = new SpecificAvroSerde<>(); keySpecificAvroSerde.configure(serdeConfig, true); // `true` for record keys final Serde valueSpecificAvroSerde = new SpecificAvroSerde<>(); valueSpecificAvroSerde.configure(serdeConfig, false); // `false` for record values StreamsBuilder builder = new StreamsBuilder(); KStream textLines = builder.stream("my-avro-topic", Consumed.with(keySpecificAvroSerde, valueSpecificAvroSerde)); When you create source streams, you specify input serdes by using the Streams DSL. When you construct the processor topology by using the lower-level :ref:`Processor API`, you can specify the serde class, like the Confluent ``GenericAvroSerde`` and ``SpecificAvroSerde`` classes. .. sourcecode:: java TopologyBuilder builder = new TopologyBuilder(); builder.addSource("Source", keyGenericAvroSerde.deserializer(), valueGenericAvroSerde.deserializer(), inputTopic); The following end-to-end demos showcase using the Confluent Avro serdes: * Java: * :cp-examples:`GenericAvroIntegrationTest|src/test/java/io/confluent/examples/streams/GenericAvroIntegrationTest.java` * :cp-examples:`SpecificAvroIntegrationTest|src/test/java/io/confluent/examples/streams/SpecificAvroIntegrationTest.java` * Scala: * :cp-examples:`GenericAvroScalaIntegrationTest|src/test/scala/io/confluent/examples/streams/GenericAvroScalaIntegrationTest.scala` * :cp-examples:`SpecificAvroScalaIntegrationTest|src/test/scala/io/confluent/examples/streams/SpecificAvroScalaIntegrationTest.scala` .. _streams-data-avro-primitive: Avro Primitive """""""""""""" Starting with version 5.5.0, |cp| provides a serializer and deserializer for writing and reading data in "Avro primitive" format. The `Avro primitive types `__ are ``null``, ``boolean``, ``int``, ``long``, ``float``, ``double``, ``bytes``, and ``string``. Other types aren't supported by this serde. The primary use case for ``PrimitiveAvroSerde`` is for keys. This serde's specific Avro counterpart is ``SpecificAvroSerde``, and its generic Avro counterpart is ``GenericAvroSerde``. This serde reads and writes data according to the |sr| :ref:`wire format `. It requires access to a |sr| endpoint, which you must define in ``PrimitiveAvroSerde#configure(Map, boolean)`` by using the ``schema.registry.url`` parameter. Usage example for Confluent ``PrimitiveAvroSerde``: .. sourcecode:: java Properties streamsConfiguration = new Properties(); streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, PrimitiveAvroSerde.class); streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, PrimitiveAvroSerde.class); streamsConfiguration.put( AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://confluent-schema-registry-server:8081/"); The following example shows how to explicitly override the application's default serdes so that only specific operations, like ``KStream#to`` use this serde. .. sourcecode:: java Serde longAvroSerde = new PrimitiveAvroSerde(); boolean isKeySerde = true; longAvroSerde.configure( Collections.singletonMap( AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://confluent-schema-registry-server:8081/"), isKeySerde); Serde genericAvroSerde = new GenericAvroSerde(); isKeySerde = false; genericAvroSerde.configure( Collections.singletonMap( AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://confluent-schema-registry-server:8081/"), isKeySerde); KStream stream = builder.stream("my-input-topic", Consumed.with(longAvroSerde, genericAvroSerde)); .. _streams-data-reflection-avro: Reflection Avro """"""""""""""" Starting with version 5.4.0, |cp| also provides a serializer and deserializer for writing and reading data in "reflection Avro" format. This serde's "generic Avro" counterpart is ``GenericAvroSerde``. This serde reads and writes data according to the wire format defined at `Formats, Serializers, and Deserializers `__. It requires access to a |sr| endpoint, which you must define in the ``GenericAvroDeserializer`` by using the ``schema.registry.url`` parameter. The following code example configures this serde as a |kstreams| application's default serde for both record keys and record values: .. code:: java Properties streamsConfiguration = new Properties(); streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, ReflectionAvroSerde.class); streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, ReflectionAvroSerde.class); streamsConfiguration.put( AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://confluent-schema-registry-server:8081/"); The following code example explicitly overrides the application's default serdes, regardless of whatever they were configured to, so that only specific operations, like |kstreams| use this serde: .. code:: java Serde reflectionAvroSerde = new ReflectionAvroSerde<>(); boolean isKeySerde = false; reflectionAvroSerde.configure( Collections.singletonMap( AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://confluent-schema-registry-server:8081/"), isKeySerde); KStream stream = ...; stream.to(Serdes.String(), reflectionAvroSerde, "my-output-topic"); JSON """" The |kstreams| code examples also include a basic serde implementation for JSON Schema: * :kafka-file:`PageViewTypedDemo|streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java#L83` As shown in the example file, you can use JSONSerdes inner classes ``Serdes.serdeFrom(, )`` to construct JSON compatible serializers and deserializers. JSON Schema """"""""""" Confluent provides a |sr|-compatible `JSON Schema Serde `__ for data in JSON format: .. codewithvars:: xml io.confluent kafka-streams-json-schema-serde |release| Similar to how the Avro deserializer can return an instance of a specific Avro record type or a ``GenericRecord``, the JSON Schema deserializer can return an instance of a specific Java class, or an instance of ``JsonNode``. For more information, see :ref:`serdes_and_formatter_json`. The following code example shows how to use the ``KafkaJsonSchemaSerde`` class to serialize and deserialize a JSON record with a schema. For a related code listing, see `SerializationTutorial.java `__. .. code:: java private static KafkaJsonSchemaSerde movieProtobufSerde(Properties envProps) { final KafkaJsonSchemaSerde jsonSchemaSerde = new KafkaJsonSchemaSerde<>(); Map serdeConfig = new HashMap<>(); serdeConfig.put(SCHEMA_REGISTRY_URL_CONFIG, envProps.getProperty("schema.registry.url")); jsonSchemaSerde.configure(serdeConfig, false); return jsonSchemaSerde; } Protobuf """""""" Confluent provides a |sr|-compatible `Protobuf Serde `__ for data in Protobuf format: .. codewithvars:: xml io.confluent kafka-streams-protobuf-serde |release| The Protobuf serde provides support for *referenced schemas*, the ability of a schema to refer to other schemas. Also, a Protobuf schema can register a referenced schema automatically. For more information, see :ref:`serdes_and_formatter_protobuf`. The following code example shows how to use the ``KafkaProtobufSerde`` class to serialize and deserialize a Protobuf record with a schema. For the full code listing, see `SerializationTutorial.java `__. .. code:: java protected KafkaProtobufSerde movieProtobufSerde(Properties envProps) { final KafkaProtobufSerde protobufSerde = new KafkaProtobufSerde<>(); Map serdeConfig = new HashMap<>(); serdeConfig.put(SCHEMA_REGISTRY_URL_CONFIG, envProps.getProperty("schema.registry.url")); protobufSerde.configure(serdeConfig, false); return protobufSerde; } Further serdes """""""""""""" The :cp-examples:`Confluent examples repository|` demonstrates how to implement templated serdes: * ``PriorityQueue`` serde: :cp-examples:`PriorityQueueSerde|src/main/java/io/confluent/examples/streams/utils/PriorityQueueSerde.java` .. _streams_developer-guide_serdes-custom: Implementing custom Serdes ^^^^^^^^^^^^^^^^^^^^^^^^^^ If you need to implement custom Serdes, your best starting point is to take a look at the source code references of existing Serdes (see previous section). Typically, your workflow will be similar to: 1. Write a *serializer* for your data type ``T`` by implementing :kafka-file:`org.apache.kafka.common.serialization.Serializer|clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java`. 2. Write a *deserializer* for ``T`` by implementing :kafka-file:`org.apache.kafka.common.serialization.Deserializer|clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java`. 3. Write a *serde* for ``T`` by implementing :kafka-file:`org.apache.kafka.common.serialization.Serde|clients/src/main/java/org/apache/kafka/common/serialization/Serde.java`, which you either do manually (see existing Serdes in the previous section) or by leveraging helper functions in :kafka-file:`Serdes|clients/src/main/java/org/apache/kafka/common/serialization/Serdes.java` such as ``Serdes.serdeFrom(Serializer, Deserializer)``. Note that you will need to implement your own class (that has no generic types) if you want to use your custom serde in the configuration provided to ``KafkaStreams``. If your serde class has generic types or you use ``Serdes.serdeFrom(Serializer, Deserializer)``, you can pass your serde only via methods calls (for example ``builder.stream("topicName", Consumed.with(...))``. .. include:: ../../.hidden/docs-common/home/includes/ak-share.rst