Data Types and Serialization

Every Kafka Streams application must provide SerDes (Serializer/Deserializer) for the data types of record keys and record values (e.g. java.lang.String or Avro objects) to materialize the data when necessary. Operations that require such SerDes information include: stream(), table(), to(), through(), groupByKey(), groupBy().

You can provide SerDes by using either of these methods:

  • By setting default SerDes via a StreamsConfig instance.
  • By specifying explicit SerDes when calling the appropriate API methods, thus overriding the defaults.

Configuring SerDes

SerDes specified in the Streams configuration via StreamsConfig are used as the default in your Kafka Streams application.

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsConfig;

Properties settings = new Properties();
// Default serde for keys of data records (here: built-in serde for String type)
settings.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
// Default serde for values of data records (here: built-in serde for Long type)
settings.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.Long().getClass().getName());

StreamsConfig config = new StreamsConfig(settings);

Overriding default SerDes

You can also specify SerDes explicitly by passing them to the appropriate API methods, which overrides the default serde settings:

import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;

final Serde<String> stringSerde = Serdes.String();
final Serde<Long> longSerde = Serdes.Long();

// The stream userCountByRegion has type `String` for record keys (for region)
// and type `Long` for record values (for user counts).
KStream<String, Long> userCountByRegion = ...;
userCountByRegion.to(stringSerde, longSerde, "RegionCountsTopic");

If you want to override SerDes selectively, i.e., keep the defaults for some fields, then pass null whenever you want to leverage the default serde settings:

import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;

// Use the default serializer for record keys (here: region as String) by passing `null`,
// but override the default serializer for record values (here: userCount as Long).
final Serde<Long> longSerde = Serdes.Long();
KStream<String, Long> userCountByRegion = ...;
userCountByRegion.to(null, longSerde, "RegionCountsTopic");

Available SerDes

Apache Kafka includes several built-in serde implementations in its kafka-clients maven artifact:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>0.10.2.1-cp1</version>
</dependency>

This artifact provides the following serde implementations under the package org.apache.kafka.common.serialization, which you can leverage when e.g., defining default serializers in your Streams configuration.

Data type Serde
byte[] Serdes.ByteArray(), Serdes.Bytes() (see tip below)
ByteBuffer Serdes.ByteBuffer()
Double Serdes.Double()
Integer Serdes.Integer()
Long Serdes.Long()
String Serdes.String()

Tip

Bytes is a wrapper for Java’s byte[] (byte array) that supports proper equality and ordering semantics. You may want to consider using Bytes instead of byte[] in your applications.

You would use the built-in SerDes as follows, using the example of the String serde:

// When configuring the default SerDes of StreamConfig
Properties streamsConfiguration = new Properties();
streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,   Serdes.String().getClass().getName());
streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());

// When you want to override SerDes explicitly/selectively
final Serde<String> stringSerde = Serdes.String();
KStreamBuilder builder = new KStreamBuilder();
KStream<String, String> textLines = builder.stream(stringSerde, stringSerde, "TextLinesTopic");

The code examples of Kafka Streams also include a basic serde implementation for JSON:

Lastly, the Confluent examples repository includes basic serde implementations for Apache Avro:

As well as templated serde implementations:

Implementing custom SerDes

If you need to implement custom SerDes, your best starting point is to take a look at the source code references of existing SerDes (see previous section). Typically, your workflow will be similar to:

  1. Write a serializer for your data type T by implementing org.apache.kafka.common.serialization.Serializer.
  2. Write a deserializer for T by implementing org.apache.kafka.common.serialization.Deserializer.
  3. Write a serde for T by implementing org.apache.kafka.common.serialization.Serde, which you either do manually (see existing SerDes in the previous section) or by leveraging helper functions in Serdes such as Serdes.serdeFrom(Serializer<T>, Deserializer<T>).