Kafka Streams Data Types and Serialization for Confluent Platform¶
Every Kafka Streams application must provide Serdes (Serializer/Deserializer) for
the data types of record keys and record values (e.g. java.lang.String
or
Avro objects) to materialize the data when necessary. Operations that require
such Serdes information include: stream()
, table()
, to()
, repartition()
,
groupByKey()
, groupBy()
.
You can provide Serdes by using either of these methods, but you must use at least one of these methods:
- By setting default Serdes via a
Properties
instance. - By specifying explicit Serdes when calling the appropriate API methods, thus overriding the defaults.
You can configure Java streams applications to deserialize and ingest data in multiple ways, including Kafka console producers, JDBC source connectors, and Java client producers. For full code examples, see Pipelining with Kafka Connect and Kafka Streams.
Configuring Serdes¶
Serdes specified in the Streams configuration via the Properties
config are
used as the default in your Kafka Streams application. Because this config’s default
is null, you must either set a default Serde by using this configuration or pass
in Serdes explicitly, as described below.
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsConfig;
Properties settings = new Properties();
// Default serde for keys of data records (here: built-in serde for String type)
settings.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
// Default serde for values of data records (here: built-in serde for Long type)
settings.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Long().getClass().getName());
If a Serde
is specified via Properties
, the Serde
class can’t have
generic types, which meant that you can’t use a class like
MySerde<T extends Number> implements Serde<T>
. This implies that you can’t
use any Serde
that is created via Serdes.serdeFrom(Serializer<T>, Deserializer<T>)
.
Only fully typed Serde
classes like MySerde implements Serde<MyCustomType>
are
supported, due to Java type erasure.
Overriding default Serdes¶
You can also specify Serdes explicitly by passing them to the appropriate API methods, which overrides the default serde settings:
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
final Serde<String> stringSerde = Serdes.String();
final Serde<Long> longSerde = Serdes.Long();
// The stream userCountByRegion has type `String` for record keys (for region)
// and type `Long` for record values (for user counts).
KStream<String, Long> userCountByRegion = ...;
userCountByRegion.to("RegionCountsTopic", Produced.with(stringSerde, longSerde));
If you want to override serdes selectively, i.e., keep the defaults for some fields, then don’t specify the serde whenever you want to leverage the default settings:
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
// Use the default serializer for record keys (here: region as String) by not specifying the key serde,
// but override the default serializer for record values (here: userCount as Long).
final Serde<Long> longSerde = Serdes.Long();
KStream<String, Long> userCountByRegion = ...;
userCountByRegion.to("RegionCountsTopic", Produced.valueSerde(Serdes.Long()));
If some of your incoming records are corrupted or ill-formatted, they will
cause the deserializer class to report an error. An interface named
org.apache.kafka.streams.errors.DeserializationExceptionHandler
enables you to customize how to handle such records. Specify the customized
implementation of the interface by using StreamsConfig
. For more
information, see
Failure and exception handling FAQ.
Available Serdes¶
Primitive and basic types¶
Apache Kafka® includes several built-in serde implementations for Java primitives and basic types such as byte[]
in
its kafka-clients
Maven artifact:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>7.5.6-ccs</version>
</dependency>
This artifact provides the following serde implementations under the package org.apache.kafka.common.serialization, which you can leverage when e.g., defining default serializers in your Streams configuration.
Data type | Serde |
---|---|
byte[] | Serdes.ByteArray() , Serdes.Bytes() (see tip below) |
ByteBuffer | Serdes.ByteBuffer() |
Double | Serdes.Double() |
Integer | Serdes.Integer() |
Long | Serdes.Long() |
String | Serdes.String() |
UUID | Serdes.UUID() |
Tip
Bytes is a wrapper for Java’s byte[]
(byte array) that supports proper equality and ordering semantics. You may want to consider using Bytes
instead of byte[]
in your applications.
You would use the built-in Serdes as follows, using the example of the String serde:
// When configuring the default Serdes of StreamConfig
Properties streamsConfiguration = new Properties();
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
// When you want to override Serdes explicitly/selectively
final Serde<String> stringSerde = Serdes.String();
StreamsBuilder builder = new StreamsBuilder();
builder.stream("my-avro-topic", Consumed.with(keyGenericAvroSerde, valueGenericAvroSerde));
Avro¶
Confluent provides Schema Registry-compatible Avro serdes for data in generic Avro and in specific Avro format:
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-streams-avro-serde</artifactId>
<version>7.5.6</version>
</dependency>
Both the generic and the specific Avro serde require you to configure the endpoint of
Confluent Schema Registry via the schema.registry.url
setting:
- When you define the generic or specific Avro serde as a default serde via
StreamsConfig
, then you must also set the Schema Registry endpoint inStreamsConfig
. - When you instantiate the generic or specific Avro serde directly (e.g.,
new GenericAvroSerde()
), you must callSerde#configure()
on the serde instance to set the Schema Registry endpoint before using the serde instance. Additionally, you must tellSerde#configure()
via a boolean parameter whether the serde instance is used for serializing/deserializing record keys (true
) or record values (false
).
Usage example for Confluent GenericAvroSerde
:
// Generic Avro serde example
import io.confluent.kafka.streams.serdes.avro.GenericAvroSerde;
// When configuring the default serdes of StreamConfig
final Properties streamsConfiguration = new Properties();
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, GenericAvroSerde.class);
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, GenericAvroSerde.class);
streamsConfiguration.put("schema.registry.url", "http://my-schema-registry:8081");
// When you want to override serdes explicitly/selectively
final Map<String, String> serdeConfig = Collections.singletonMap("schema.registry.url",
"http://my-schema-registry:8081");
// `Foo` and `Bar` are Java classes generated from Avro schemas
final Serde<Foo> keyGenericAvroSerde = new GenericAvroSerde();
keyGenericAvroSerde.configure(serdeConfig, true); // `true` for record keys
final Serde<Bar> valueGenericAvroSerde = new GenericAvroSerde();
valueGenericAvroSerde.configure(serdeConfig, false); // `false` for record values
StreamsBuilder builder = new StreamsBuilder();
KStream<Foo, Bar> textLines = builder.stream("my-avro-topic", Consumed.with(keyGenericAvroSerde, valueGenericAvroSerde));
Usage example for Confluent SpecificAvroSerde
:
// Specific Avro serde example
import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde;
// When configuring the default serdes of StreamConfig
final Properties streamsConfiguration = new Properties();
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, SpecificAvroSerde.class);
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, SpecificAvroSerde.class);
streamsConfiguration.put("schema.registry.url", "http://my-schema-registry:8081");
// When you want to override serdes explicitly/selectively
final Map<String, String> serdeConfig = Collections.singletonMap("schema.registry.url",
"http://my-schema-registry:8081");
// `Foo` and `Bar` are Java classes generated from Avro schemas
final Serde<Foo> keySpecificAvroSerde = new SpecificAvroSerde<>();
keySpecificAvroSerde.configure(serdeConfig, true); // `true` for record keys
final Serde<Bar> valueSpecificAvroSerde = new SpecificAvroSerde<>();
valueSpecificAvroSerde.configure(serdeConfig, false); // `false` for record values
StreamsBuilder builder = new StreamsBuilder();
KStream<Foo, Bar> textLines = builder.stream("my-avro-topic", Consumed.with(keySpecificAvroSerde, valueSpecificAvroSerde));
When you create source streams, you specify input serdes by using the Streams DSL.
When you construct the processor topology by using the lower-level Processor API,
you can specify the serde class, like the Confluent GenericAvroSerde
and SpecificAvroSerde
classes.
TopologyBuilder builder = new TopologyBuilder();
builder.addSource("Source", keyGenericAvroSerde.deserializer(), valueGenericAvroSerde.deserializer(), inputTopic);
The following end-to-end demos showcase using the Confluent Avro serdes:
Avro Primitive¶
Starting with version 5.5.0, Confluent Platform provides a serializer and deserializer for writing and reading data in “Avro primitive” format.
The Avro primitive types
are null
, boolean
, int
, long
, float
, double
, bytes
,
and string
. Other types aren’t supported by this serde.
The primary use case for PrimitiveAvroSerde
is for keys.
This serde’s specific Avro counterpart is SpecificAvroSerde
, and its
generic Avro counterpart is GenericAvroSerde
.
This serde reads and writes data according to the Schema Registry wire format.
It requires access to a Schema Registry endpoint, which you must define in
PrimitiveAvroSerde#configure(Map, boolean)
by using the
schema.registry.url
parameter.
Usage example for Confluent PrimitiveAvroSerde
:
Properties streamsConfiguration = new Properties();
streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, PrimitiveAvroSerde.class);
streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, PrimitiveAvroSerde.class);
streamsConfiguration.put(
AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
"http://confluent-schema-registry-server:8081/");
The following example shows how to explicitly override the application’s
default serdes so that only specific operations, like KStream#to
use
this serde.
Serde<Long> longAvroSerde = new PrimitiveAvroSerde<Long>();
boolean isKeySerde = true;
longAvroSerde.configure(
Collections.singletonMap(
AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
"http://confluent-schema-registry-server:8081/"),
isKeySerde);
Serde<GenericRecord> genericAvroSerde = new GenericAvroSerde();
isKeySerde = false;
genericAvroSerde.configure(
Collections.singletonMap(
AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
"http://confluent-schema-registry-server:8081/"),
isKeySerde);
KStream<Long, GenericRecord> stream = builder.stream("my-input-topic",
Consumed.with(longAvroSerde, genericAvroSerde));
Reflection Avro¶
Starting with version 5.4.0, Confluent Platform also provides a serializer and deserializer
for writing and reading data in “reflection Avro” format.
This serde’s “generic Avro” counterpart is GenericAvroSerde
. This serde
reads and writes data according to the wire format defined at
Formats, Serializers, and Deserializers for Schema Registry.
It requires access to a Schema Registry endpoint, which you must define in the
GenericAvroDeserializer
by using the schema.registry.url
parameter.
The following code example configures this serde as a Kafka Streams application’s default serde for both record keys and record values:
Properties streamsConfiguration = new Properties();
streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, ReflectionAvroSerde.class);
streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, ReflectionAvroSerde.class);
streamsConfiguration.put(
AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
"http://confluent-schema-registry-server:8081/");
The following code example explicitly overrides the application’s default serdes, regardless of whatever they were configured to, so that only specific operations, like Kafka Streams use this serde:
Serde<MyJavaClassGeneratedFromAvroSchema> reflectionAvroSerde = new ReflectionAvroSerde<>();
boolean isKeySerde = false;
reflectionAvroSerde.configure(
Collections.singletonMap(
AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
"http://confluent-schema-registry-server:8081/"),
isKeySerde);
KStream<String, MyJavaClassGeneratedFromAvroSchema> stream = ...;
stream.to(Serdes.String(), reflectionAvroSerde, "my-output-topic");
JSON¶
The Kafka Streams code examples also include a basic serde implementation for JSON Schema:
As shown in the example file, you can use JSONSerdes inner classes
Serdes.serdeFrom(<serializerInstance>, <deserializerInstance>)
to construct
JSON compatible serializers and deserializers.
JSON Schema¶
Confluent provides a Schema Registry-compatible JSON Schema Serde for data in JSON format:
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-streams-json-schema-serde</artifactId>
<version>7.5.6</version>
</dependency>
Similar to how the Avro deserializer can return an instance of a specific Avro
record type or a GenericRecord
, the JSON Schema deserializer can return an
instance of a specific Java class, or an instance of JsonNode
. For more
information, see JSON Schema Serializer and Deserializer for Schema Registry on Confluent Platform.
The following code example shows how to use the KafkaJsonSchemaSerde
class
to serialize and deserialize a JSON record with a schema. For a related code
listing, see
SerializationTutorial.java.
private static KafkaJsonSchemaSerde<MovieProtos.Movie> movieProtobufSerde(Properties envProps) {
final KafkaJsonSchemaSerde<MovieProtos.Movie> jsonSchemaSerde = new KafkaJsonSchemaSerde<>();
Map<String, Object> serdeConfig = new HashMap<>();
serdeConfig.put(SCHEMA_REGISTRY_URL_CONFIG, envProps.getProperty("schema.registry.url"));
jsonSchemaSerde.configure(serdeConfig, false);
return jsonSchemaSerde;
}
Protobuf¶
Confluent provides a Schema Registry-compatible Protobuf Serde for data in Protobuf format:
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-streams-protobuf-serde</artifactId>
<version>7.5.6</version>
</dependency>
The Protobuf serde provides support for referenced schemas, the ability of a schema to refer to other schemas. Also, a Protobuf schema can register a referenced schema automatically. For more information, see Protobuf Schema Serializer and Deserializer for Schema Registry on Confluent Platform.
The following code example shows how to use the KafkaProtobufSerde
class
to serialize and deserialize a Protobuf record with a schema. For the full code
listing, see
SerializationTutorial.java.
protected KafkaProtobufSerde<MovieProtos.Movie> movieProtobufSerde(Properties envProps) {
final KafkaProtobufSerde<MovieProtos.Movie> protobufSerde = new KafkaProtobufSerde<>();
Map<String, String> serdeConfig = new HashMap<>();
serdeConfig.put(SCHEMA_REGISTRY_URL_CONFIG, envProps.getProperty("schema.registry.url"));
protobufSerde.configure(serdeConfig, false);
return protobufSerde;
}
Further serdes¶
The Confluent examples repository demonstrates how to implement templated serdes:
PriorityQueue<T>
serde: PriorityQueueSerde
Implementing custom Serdes¶
If you need to implement custom Serdes, your best starting point is to take a look at the source code references of existing Serdes (see previous section). Typically, your workflow will be similar to:
- Write a serializer for your data type
T
by implementing org.apache.kafka.common.serialization.Serializer. - Write a deserializer for
T
by implementing org.apache.kafka.common.serialization.Deserializer. - Write a serde for
T
by implementing org.apache.kafka.common.serialization.Serde, which you either do manually (see existing Serdes in the previous section) or by leveraging helper functions in Serdes such asSerdes.serdeFrom(Serializer<T>, Deserializer<T>)
. Note that you will need to implement your own class (that has no generic types) if you want to use your custom serde in the configuration provided toKafkaStreams
. If your serde class has generic types or you useSerdes.serdeFrom(Serializer<T>, Deserializer<T>)
, you can pass your serde only via methods calls (for examplebuilder.stream("topicName", Consumed.with(...))
.
Note
This website includes content developed at the Apache Software Foundation under the terms of the Apache License v2.