Kafka Streams のデータ型とシリアル化

すべての Kafka Streams アプリケーションでは、必要時にデータを具現化できるように、レコードキーとレコード値のデータ型(java.lang.String や Avro オブジェクトなど)に対応する Serde(シリアライザーと逆シリアライザー)を提供する必要があります。このような Serde の情報が要求される操作には、stream()table()to()repartition()groupByKey()groupBy() があります。

Serde を指定するには、次のいずれかの方法を使用します。どちらかを必ず使用してください。

  • Properties インスタンスを通じてデフォルトの Serde を設定する。
  • 適切な API メソッドの呼び出し時に明示的に Serde を指定する。これにより、デフォルトがオーバーライドされます。

Java ストリームアプリケーションは、Kafka コンソールプロデューサー、JDBC Source Connector、Java クライアントプロデューサーなどの複数の方法で、データを逆シリアル化したり取り込んだりするように構成できます。完全なコードの例については、「Kafka Connect と Kafka Streams によるパイプライン処理」を参照してください。

Serde の構成

Properties 構成を通じて Streams の構成に指定されている Serde は、Kafka Streams アプリケーションでデフォルトとして使用されます。この構成のデフォルトは null であるため、この構成を使用してデフォルトの Serde を設定するか、以下の説明どおりに明示的に Serde を渡す必要があります。

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsConfig;

Properties settings = new Properties();
// Default serde for keys of data records (here: built-in serde for String type)
settings.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
// Default serde for values of data records (here: built-in serde for Long type)
settings.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Long().getClass().getName());

注釈

Properties を通じて Serde を指定する場合、その Serde クラスにジェネリック型を含めることはできません。つまり、MySerde<T extends Number> implements Serde<T> というクラスは使用できません。これは、Serdes.serdeFrom(Serializer&lt;T&gt;, Deserializer&lt;T&gt;) によって作成された Serde は使用できないことを意味します。Java の型消去の影響により、MySerde implements Serde<MyCustomType> のように完全に型指定された Serde クラスだけがサポートされます。

デフォルトの Serde のオーバーライド

適切な API メソッドに Serde を渡すと、明示的に指定した Serde でデフォルトの serde 設定をオーバーライドできます。

import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;

final Serde<String> stringSerde = Serdes.String();
final Serde<Long> longSerde = Serdes.Long();

// The stream userCountByRegion has type `String` for record keys (for region)
// and type `Long` for record values (for user counts).
KStream<String, Long> userCountByRegion = ...;
userCountByRegion.to("RegionCountsTopic", Produced.with(stringSerde, longSerde));

serde を選択的にオーバーライドして、一部のフィールドではデフォルトを保持する場合は、デフォルト設定の使用が必要な場面に対して serde を指定しないようにします。

import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;

// Use the default serializer for record keys (here: region as String) by not specifying the key serde,
// but override the default serializer for record values (here: userCount as Long).
final Serde<Long> longSerde = Serdes.Long();
KStream<String, Long> userCountByRegion = ...;
userCountByRegion.to("RegionCountsTopic", Produced.valueSerde(Serdes.Long()));

注釈

受信レコードの一部が破損している場合やフォーマットが正しくない場合は、逆シリアライザークラスによってエラーが報告されます。リリース 4.0.0 から導入された org.apache.kafka.streams.errors.DeserializationExceptionHandler インターフェイスを使用すると、このようなレコードの処理方法をカスタマイズできます。カスタマイズしたインターフェイスの実装は、StreamsConfig を通じて指定できます。詳細については、FAQ の「エラーと例外処理」を参照してください。

使用可能な Serde

プリミティブ型と基本型

Apache Kafka® の kafka-clients Maven アーティファクトには、byte[] などの Java のプリミティブ型および基本型に対応する組み込みの serde の実装がいくつか含まれています。

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>7.1.1-ccs</version>
</dependency>

このアーティファクトでは、パッケージ org.apache.kafka.common.serialization で以下の serde の実装を提供します。これらは、Streams 構成でデフォルトのシリアライザーを定義するときなどに利用できます。

データ型 Serde
byte[] Serdes.ByteArray()Serdes.Bytes() (下記のヒントを参照)
ByteBuffer Serdes.ByteBuffer()
Double Serdes.Double()
Integer Serdes.Integer()
Long Serdes.Long()
String Serdes.String()
UUID Serdes.UUID()

ちなみに

Bytes は Java の byte[] (バイト配列)に対するラッパーで、適切な等価性と順序付けのセマンティクスをサポートします。アプリケーションでは、byte[] の代わりに Bytes を使用することを検討できます。

String の serde を例として、組み込みの Serde の使用方法を以下に示します。

// When configuring the default Serdes of StreamConfig
Properties streamsConfiguration = new Properties();
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,   Serdes.String().getClass().getName());
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());

// When you want to override Serdes explicitly/selectively
final Serde<String> stringSerde = Serdes.String();
StreamsBuilder builder = new StreamsBuilder();
builder.stream("my-avro-topic", Consumed.with(keyGenericAvroSerde, valueGenericAvroSerde));

Avro

Confluent は、汎用的な Avro と固有の Avro フォーマットのデータに対して、Schema Registry 互換の Avro serde を提供しています。

<dependency>
    <groupId>io.confluent</groupId>
    <artifactId>kafka-streams-avro-serde</artifactId>
    <version>7.1.1</version>
</dependency>

汎用と固有のどちらの Avro serde でも、schema.registry.url 設定を通じて Confluent Schema Registry のエンドポイントを構成する必要があります。

  • StreamsConfig を使用して汎用または固有の Avro serde をデフォルトの serde として定義する場合は、StreamsConfig に Schema Registry のエンドポイントも設定する必要があります。
  • 汎用または固有の Avro serde を直接インスタンス化する場合は(new GenericAvroSerde() など)、serde インスタンスを使用する前に、serde インスタンスの Serde#configure() を呼び出して Schema Registry のエンドポイントを設定する必要があります。さらに、Serde#configure() でブール型のパラメーターを使用して、レコード "キー"(true)またはレコード "値"(false)のシリアル化と逆シリアル化に serde インスタンスを使用するかどうかを指定する必要があります。

Confluent の GenericAvroSerde を使用する例を以下に示します。

// Generic Avro serde example
import io.confluent.kafka.streams.serdes.avro.GenericAvroSerde;

// When configuring the default serdes of StreamConfig
final Properties streamsConfiguration = new Properties();
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, GenericAvroSerde.class);
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, GenericAvroSerde.class);
streamsConfiguration.put("schema.registry.url", "http://my-schema-registry:8081");

// When you want to override serdes explicitly/selectively
final Map<String, String> serdeConfig = Collections.singletonMap("schema.registry.url",
                                                                 "http://my-schema-registry:8081");
final Serde<GenericRecord> keyGenericAvroSerde = new GenericAvroSerde();
keyGenericAvroSerde.configure(serdeConfig, true); // `true` for record keys
final Serde<GenericRecord> valueGenericAvroSerde = new GenericAvroSerde();
valueGenericAvroSerde.configure(serdeConfig, false); // `false` for record values

StreamsBuilder builder = new StreamsBuilder();
KStream<GenericRecord, GenericRecord> textLines =
  builder.stream(keyGenericAvroSerde, valueGenericAvroSerde, "my-avro-topic");

Confluent の SpecificAvroSerde を使用する例を以下に示します。

// Specific Avro serde example
import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde;

// When configuring the default serdes of StreamConfig
final Properties streamsConfiguration = new Properties();
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, SpecificAvroSerde.class);
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, SpecificAvroSerde.class);
streamsConfiguration.put("schema.registry.url", "http://my-schema-registry:8081");

// When you want to override serdes explicitly/selectively
final Map<String, String> serdeConfig = Collections.singletonMap("schema.registry.url",
                                                                 "http://my-schema-registry:8081");
// `Foo` and `Bar` are Java classes generated from Avro schemas
final Serde<Foo> keySpecificAvroSerde = new SpecificAvroSerde<>();
keySpecificAvroSerde.configure(serdeConfig, true); // `true` for record keys
final Serde<Bar> valueSpecificAvroSerde = new SpecificAvroSerde<>();
valueSpecificAvroSerde.configure(serdeConfig, false); // `false` for record values

StreamsBuilder builder = new StreamsBuilder();
KStream<Foo, Bar> textLines = builder.stream("my-avro-topic", Consumed.with(keySpecificAvroSerde, valueSpecificAvroSerde));

ソースストリームを作成するときは、Streams DSL を使用して入力 serde を指定します。低レベルの Processor API を使用してプロセッサートポロジーを構築する場合は、Confluent の GenericAvroSerde クラスや SpecificAvroSerde クラスなどの serde クラスを指定できます。

TopologyBuilder builder = new TopologyBuilder();
builder.addSource("Source", keyGenericAvroSerde.deserializer(), valueGenericAvroSerde.deserializer(), inputTopic);

以下のエンドツーエンドのデモは、Confluent Avro serde の使用例を示しています。

Avro のプリミティブ型

バージョン 5.5.0 以降の Confluent Platform には、"Avro のプリミティブ型" のフォーマットでデータを読み書きするためのシリアライザーと逆シリアライザーが用意されています。

Avro プリミティブ型 は、nullbooleanintlongfloatdoublebytesstring です。その他の型は、この serde ではサポートされていません。

PrimitiveAvroSerde の主なユースケースはキーです。

この serde の固有の Avro のバージョンは SpecificAvroSerde で、その汎用 Avro バージョンは GenericAvroSerde です。

この serde は、Schema Registry ワイヤ形式 に従ってデータの読み取りと書き込みを行います。Schema Registry エンドポイントへのアクセスが必要とされ、schema.registry.url パラメーターを使用して PrimitiveAvroSerde#configure(Map, boolean) で定義する必要があります。

Confluent の PrimitiveAvroSerde を使用する例を以下に示します。

Properties streamsConfiguration = new Properties();
streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, PrimitiveAvroSerde.class);
streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, PrimitiveAvroSerde.class);
streamsConfiguration.put(
    AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
    "http://confluent-schema-registry-server:8081/");

次の例では、アプリケーションのデフォルトの serde を明示的にオーバーライドして、この serde を KStream#to のような特定の操作でのみ使用する方法を示しています。

Serde<Long> longAvroSerde = new PrimitiveAvroSerde<Long>();
boolean isKeySerde = true;
longAvroSerde.configure(
    Collections.singletonMap(
        AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
        "http://confluent-schema-registry-server:8081/"),
    isKeySerde);

Serde<GenericRecord> genericAvroSerde = new GenericAvroSerde();
isKeySerde = false;
genericAvroSerde.configure(
    Collections.singletonMap(
        AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
        "http://confluent-schema-registry-server:8081/"),
    isKeySerde);

KStream<Long, GenericRecord> stream = builder.stream("my-input-topic",
    Consumed.with(longAvroSerde, genericAvroSerde));

リフレクション Avro

バージョン 5.4.0 以降の Confluent Platform には、"リフレクション Avro" フォーマットでデータを読み書きするためのシリアライザーと逆シリアライザーも用意されています。この serde の "汎用 Avro" バージョンが GenericAvroSerde です。この serde は、「フォーマット、シリアライザー、逆シリアライザー」に定義されているワイヤ形式に従ってデータの読み取りと書き込みを行います。Schema Registry エンドポイントへのアクセスが必要とされ、schema.registry.url パラメーターを使用して GenericAvroDeserializer で定義する必要があります。

以下のコード例では、レコードキーとレコード値の両方に対して、この serde を Kafka Streams アプリケーションのデフォルトの serde として構成します。

Properties streamsConfiguration = new Properties();
streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, ReflectionAvroSerde.class);
streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, ReflectionAvroSerde.class);
streamsConfiguration.put(
       AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
       "http://confluent-schema-registry-server:8081/");

以下のコード例では、アプリケーションのデフォルトの serde がどのように構成されているかにかかわらず、Kafka Streams のような特定の操作でのみこの serde を使用するように、明示的にオーバーライドします。

Serde<MyJavaClassGeneratedFromAvroSchema> reflectionAvroSerde = new ReflectionAvroSerde<>();
   boolean isKeySerde = false;
   reflectionAvroSerde.configure(
       Collections.singletonMap(
           AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
           "http://confluent-schema-registry-server:8081/"),
       isKeySerde);
KStream<String, MyJavaClassGeneratedFromAvroSchema> stream = ...;
stream.to(Serdes.String(), reflectionAvroSerde, "my-output-topic");

JSON

Kafka Streams のコード例には、JSON スキーマ用の基本的な serde の実装も含まれています。

サンプルファイルに示されているように、JSONSerdes の内部クラスの Serdes.serdeFrom(&lt;serializerInstance&gt;, &lt;deserializerInstance&gt;) を使用すると、JSON と互換性のあるシリアライザーと逆シリアライザーを構築できます。

JSON スキーマ

Confluent は、JSON フォーマットのデータに対応する Schema Registry 互換の JSON スキーマ Serde を提供しています。

<dependency>
    <groupId>io.confluent</groupId>
    <artifactId>kafka-streams-json-schema-serde</artifactId>
    <version>7.1.1</version>
</dependency>

Avro デシリアライザーが具体的な Avro レコード型のインスタンスまたは GenericRecord を返すように、JSON Schema デシリアライザーも、具体的な Java クラスまたは JsonNode のインスタンスを返します。詳細については、「JSON Schema のシリアライザーと逆シリアライザー」を参照してください。

以下のコード例では、KafkaJsonSchemaSerde クラスを使用して、スキーマ付きの JSON レコードをシリアル化および逆シリアル化する方法を示します。コード全体の記述については、KafkaJsonSchemaSerdeTest.java を参照してください。

private static final String ANY_TOPIC = "any-topic";

private static ObjectMapper objectMapper = new ObjectMapper();

private static final String recordSchemaString = "{\"properties\": {\n"
    + "     \"null\": {\"type\": \"null\"},\n"
    + "     \"boolean\": {\"type\": \"boolean\"},\n"
    + "     \"number\": {\"type\": \"number\"},\n"
    + "     \"string\": {\"type\": \"string\"}\n"
    + "  },\n"
    + "  \"additionalProperties\": false\n"
    + "}";

private static final JsonSchema recordSchema = new JsonSchema(recordSchemaString);

private Object createJsonRecord() throws IOException {
  String json = "{\n"
      + "    \"null\": null,\n"
      + "    \"boolean\": true,\n"
      + "    \"number\": 12,\n"
      + "    \"string\": \"string\"\n"
      + "}";

  return objectMapper.readValue(json, Object.class);
}

private static KafkaJsonSchemaSerde<Object> createConfiguredSerdeForRecordValues() {
  SchemaRegistryClient schemaRegistryClient = new SchemaRegistryClient();
  KafkaJsonSchemaSerde<Object> serde = new KafkaJsonSchemaSerde<>(schemaRegistryClient);
  Map<String, Object> serdeConfig = new HashMap<>();
  serdeConfig.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "demo");
  serde.configure(serdeConfig, false);
  return serde;
}

public void shouldRoundTripRecords() throws Exception {
  // Given
  KafkaJsonSchemaSerde<Object> serde = createConfiguredSerdeForRecordValues();
  Object record = createJsonRecord();

  // When
  Object roundtrippedRecord = serde.deserializer().deserialize(
      ANY_TOPIC, serde.serializer().serialize(ANY_TOPIC, record));

  // Then
  assertThat(roundtrippedRecord, equalTo(record));

  // Cleanup
  serde.close();
}

Protobuf

Confluent は、Protobuf フォーマットのデータに対応する Schema Registry 互換の Protobuf Serde を提供しています。

<dependency>
  <groupId>io.confluent</groupId>
  <artifactId>kafka-streams-protobuf-serde</artifactId>
  <version>7.1.1</version>
</dependency>

Protobuf serde では、スキーマから他のスキーマを参照する "参照スキーマ" という機能がサポートされます。Protobuf スキーマは、参照スキーマを自動的に登録することもできます。詳細については、「Protobuf スキーマのシリアライザーと逆シリアライザー」を参照してください。

以下のコード例では、KafkaProtobufSerde クラスを使用して、スキーマ付きの Protoobuf レコードをシリアル化および逆シリアル化する方法を示します。コード全体の記述については、KafkaProtobufSerdeTest.java を参照してください。

private static final String ANY_TOPIC = "any-topic";

private static final String recordSchemaString = "syntax = \"proto3\";\n"
    + "\n"
    + "option java_package = \"io.confluent.kafka.serializers.protobuf.test\";\n"
    + "option java_outer_classname = \"TestMessageProtos\";\n"
    + "\n"
    + "import \"google/protobuf/descriptor.proto\";\n"
    + "\n"
    + "message TestMessage {\n"
    + "    string test_string = 1 [json_name = \"test_str\"];\n"
    + "    bool test_bool = 2;\n"
    + "    bytes test_bytes = 3;\n"

    ...

    + "}\n";

private static final ProtobufSchema recordSchema = new ProtobufSchema(recordSchemaString);

private DynamicMessage createDynamicMessage() {
  DynamicMessage.Builder builder = recordSchema.newMessageBuilder();
  Descriptors.Descriptor desc = builder.getDescriptorForType();
  Descriptors.FieldDescriptor fd = desc.findFieldByName("test_string");
  builder.setField(fd, "string");
  fd = desc.findFieldByName("test_bool");
  builder.setField(fd, true);
  fd = desc.findFieldByName("test_bytes");
  builder.setField(fd, ByteString.copyFromUtf8("hello"));

  ...

  return builder.build();
}

private static KafkaProtobufSerde<Message> createConfiguredSerdeForRecordValues() {
  SchemaRegistryClient schemaRegistryClient = new SchemaRegistryClient();
  KafkaProtobufSerde<Message> serde = new KafkaProtobufSerde<>(schemaRegistryClient);
  Map<String, Object> serdeConfig = new HashMap<>();
  serdeConfig.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "demo");
  serde.configure(serdeConfig, false);
  return serde;
}

public void shouldRoundTripRecords() {
  // Given
  KafkaProtobufSerde<Message> serde = createConfiguredSerdeForRecordValues();
  DynamicMessage record = createDynamicMessage();

  // When
  Message roundtrippedRecord = serde.deserializer().deserialize(
      ANY_TOPIC, serde.serializer().serialize(ANY_TOPIC, record));

  // Then
  assertThat(roundtrippedRecord, equalTo(record));

  // Cleanup
  serde.close();
}

その他の serde

Confluent のサンプルリポジトリ には、テンプレート化された serde の実装方法を示す例が含まれています。

カスタム Serde の実装

カスタムの Serde を実装する必要がある場合は、最初の手順として、既存の Serde のソースコード(前のセクションを参照)を参考にするのが最適です。一般に、実装のワークフローは次のようになります。

  1. org.apache.kafka.common.serialization.Serializer を実装して、データ型 T に対応する "シリアライザー" を記述します。
  2. org.apache.kafka.common.serialization.Deserializer を実装して、T に対応する "逆シリアライザー" を記述します。
  3. org.apache.kafka.common.serialization.Serde を実装して、T に対応する "serde" を記述します。これは手動で行うか(前のセクションで紹介した既存の Serde を参照)、Serde に含まれている Serdes.serdeFrom(Serializer<T>, Deserializer<T>) などのヘルパー関数を利用します。KafkaStreams に提供する構成でカスタム serde を使用する場合は、独自のクラス(ジェネリック型を含まないもの)を実装する必要があることに注意してください。serde クラスにジェネリック型が含まれている場合、または Serdes.serdeFrom(Serializer<T>, Deserializer<T>) を使用している場合、その serde はメソッド呼び出し(builder.stream("topicName", Consumed.with(...)) など)にのみ渡すことができます。

注釈

このウェブサイトには、Apache License v2 の条件に基づいて Apache Software Foundation で開発されたコンテンツが含まれています。