Kafka Streams のデータ型とシリアル化

すべての Kafka Streams アプリケーションでは、必要時にデータを具現化できるように、レコードキーとレコード値のデータ型(java.lang.String や Avro オブジェクトなど)に対応する Serde(シリアライザーと逆シリアライザー)を提供する必要があります。このような Serde の情報が要求される操作には、stream()table()to()repartition()groupByKey()groupBy() があります。

Serde を指定するには、次のいずれかの方法を使用します。どちらかを必ず使用してください。

  • Properties インスタンスを通じてデフォルトの Serde を設定する。
  • 適切な API メソッドの呼び出し時に明示的に Serde を指定する。これにより、デフォルトがオーバーライドされます。

Java ストリームアプリケーションは、Kafka コンソールプロデューサー、JDBC Source Connector、Java クライアントプロデューサーなどの複数の方法で、データを逆シリアル化したり取り込んだりするように構成できます。完全なコードの例については、「Kafka Connect と Kafka Streams によるパイプライン処理」を参照してください。

Serde の構成

Properties 構成を通じて Streams の構成に指定されている Serde は、Kafka Streams アプリケーションでデフォルトとして使用されます。この構成のデフォルトは null であるため、この構成を使用してデフォルトの Serde を設定するか、以下の説明どおりに明示的に Serde を渡す必要があります。

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsConfig;

Properties settings = new Properties();
// Default serde for keys of data records (here: built-in serde for String type)
settings.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
// Default serde for values of data records (here: built-in serde for Long type)
settings.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Long().getClass().getName());

注釈

Properties を通じて Serde を指定する場合、その Serde クラスにジェネリック型を含めることはできません。つまり、MySerde<T extends Number> implements Serde<T> というクラスは使用できません。これは、Serdes.serdeFrom(Serializer&lt;T&gt;, Deserializer&lt;T&gt;) によって作成された Serde は使用できないことを意味します。Java の型消去の影響により、MySerde implements Serde<MyCustomType> のように完全に型指定された Serde クラスだけがサポートされます。

デフォルトの Serde のオーバーライド

適切な API メソッドに Serde を渡すと、明示的に指定した Serde でデフォルトの serde 設定をオーバーライドできます。

import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;

final Serde<String> stringSerde = Serdes.String();
final Serde<Long> longSerde = Serdes.Long();

// The stream userCountByRegion has type `String` for record keys (for region)
// and type `Long` for record values (for user counts).
KStream<String, Long> userCountByRegion = ...;
userCountByRegion.to("RegionCountsTopic", Produced.with(stringSerde, longSerde));

serde を選択的にオーバーライドして、一部のフィールドではデフォルトを保持する場合は、デフォルト設定の使用が必要な場面に対して serde を指定しないようにします。

import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;

// Use the default serializer for record keys (here: region as String) by not specifying the key serde,
// but override the default serializer for record values (here: userCount as Long).
final Serde<Long> longSerde = Serdes.Long();
KStream<String, Long> userCountByRegion = ...;
userCountByRegion.to("RegionCountsTopic", Produced.valueSerde(Serdes.Long()));

注釈

受信レコードの一部が破損している場合やフォーマットが正しくない場合は、逆シリアライザークラスによってエラーが報告されます。リリース 4.0.0 から導入された org.apache.kafka.streams.errors.DeserializationExceptionHandler インターフェイスを使用すると、このようなレコードの処理方法をカスタマイズできます。カスタマイズしたインターフェイスの実装は、StreamsConfig を通じて指定できます。詳細については、FAQ の「エラーと例外処理」を参照してください。

使用可能な Serde

プリミティブ型と基本型

Apache Kafka® の kafka-clients Maven アーティファクトには、byte[] などの Java のプリミティブ型および基本型に対応する組み込みの serde の実装がいくつか含まれています。

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>6.2.4-ccs</version>
</dependency>

このアーティファクトでは、パッケージ org.apache.kafka.common.serialization で以下の serde の実装を提供します。これらは、Streams 構成でデフォルトのシリアライザーを定義するときなどに利用できます。

データ型 Serde
byte[] Serdes.ByteArray()Serdes.Bytes() (下記のヒントを参照)
ByteBuffer Serdes.ByteBuffer()
Double Serdes.Double()
Integer Serdes.Integer()
Long Serdes.Long()
String Serdes.String()
UUID Serdes.UUID()

ちなみに

Bytes は Java の byte[] (バイト配列)に対するラッパーで、適切な等価性と順序付けのセマンティクスをサポートします。アプリケーションでは、byte[] の代わりに Bytes を使用することを検討できます。

String の serde を例として、組み込みの Serde の使用方法を以下に示します。

// When configuring the default Serdes of StreamConfig
Properties streamsConfiguration = new Properties();
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,   Serdes.String().getClass().getName());
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());

// When you want to override Serdes explicitly/selectively
final Serde<String> stringSerde = Serdes.String();
StreamsBuilder builder = new StreamsBuilder();
builder.stream("my-avro-topic", Consumed.with(keyGenericAvroSerde, valueGenericAvroSerde));

Avro

Confluent は、汎用的な Avro と固有の Avro フォーマットのデータに対して、Schema Registry 互換の Avro serde を提供しています。

<dependency>
    <groupId>io.confluent</groupId>
    <artifactId>kafka-streams-avro-serde</artifactId>
    <version>6.2.4</version>
</dependency>

汎用と固有のどちらの Avro serde でも、schema.registry.url 設定を通じて Confluent Schema Registry のエンドポイントを構成する必要があります。

  • StreamsConfig を使用して汎用または固有の Avro serde をデフォルトの serde として定義する場合は、StreamsConfig に Schema Registry のエンドポイントも設定する必要があります。
  • 汎用または固有の Avro serde を直接インスタンス化する場合は(new GenericAvroSerde() など)、serde インスタンスを使用する前に、serde インスタンスの Serde#configure() を呼び出して Schema Registry のエンドポイントを設定する必要があります。さらに、Serde#configure() でブール型のパラメーターを使用して、レコード "キー"(true)またはレコード "値"(false)のシリアル化と逆シリアル化に serde インスタンスを使用するかどうかを指定する必要があります。

Confluent の GenericAvroSerde を使用する例を以下に示します。

// Generic Avro serde example
import io.confluent.kafka.streams.serdes.avro.GenericAvroSerde;

// When configuring the default serdes of StreamConfig
final Properties streamsConfiguration = new Properties();
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, GenericAvroSerde.class);
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, GenericAvroSerde.class);
streamsConfiguration.put("schema.registry.url", "http://my-schema-registry:8081");

// When you want to override serdes explicitly/selectively
final Map<String, String> serdeConfig = Collections.singletonMap("schema.registry.url",
                                                                 "http://my-schema-registry:8081");
final Serde<GenericRecord> keyGenericAvroSerde = new GenericAvroSerde();
keyGenericAvroSerde.configure(serdeConfig, true); // `true` for record keys
final Serde<GenericRecord> valueGenericAvroSerde = new GenericAvroSerde();
valueGenericAvroSerde.configure(serdeConfig, false); // `false` for record values

StreamsBuilder builder = new StreamsBuilder();
KStream<GenericRecord, GenericRecord> textLines =
  builder.stream(keyGenericAvroSerde, valueGenericAvroSerde, "my-avro-topic");

Confluent の SpecificAvroSerde を使用する例を以下に示します。

// Specific Avro serde example
import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde;

// When configuring the default serdes of StreamConfig
final Properties streamsConfiguration = new Properties();
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, SpecificAvroSerde.class);
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, SpecificAvroSerde.class);
streamsConfiguration.put("schema.registry.url", "http://my-schema-registry:8081");

// When you want to override serdes explicitly/selectively
final Map<String, String> serdeConfig = Collections.singletonMap("schema.registry.url",
                                                                 "http://my-schema-registry:8081");
// `Foo` and `Bar` are Java classes generated from Avro schemas
final Serde<Foo> keySpecificAvroSerde = new SpecificAvroSerde<>();
keySpecificAvroSerde.configure(serdeConfig, true); // `true` for record keys
final Serde<Bar> valueSpecificAvroSerde = new SpecificAvroSerde<>();
valueSpecificAvroSerde.configure(serdeConfig, false); // `false` for record values

StreamsBuilder builder = new StreamsBuilder();
KStream<Foo, Bar> textLines = builder.stream("my-avro-topic", Consumed.with(keySpecificAvroSerde, valueSpecificAvroSerde));

ソースストリームを作成するときは、Streams DSL を使用して入力 serde を指定します。低レベルの Processor API を使用してプロセッサートポロジーを構築する場合は、Confluent の GenericAvroSerde クラスや SpecificAvroSerde クラスなどの serde クラスを指定できます。

TopologyBuilder builder = new TopologyBuilder();
builder.addSource("Source", keyGenericAvroSerde.deserializer(), valueGenericAvroSerde.deserializer(), inputTopic);

以下のエンドツーエンドのデモは、Confluent Avro serde の使用例を示しています。

リフレクション Avro

バージョン 5.4.0 以降の Confluent Platform には、"リフレクション Avro" フォーマットでデータを読み書きするためのシリアライザーと逆シリアライザーも用意されています。この serde の "汎用 Avro" バージョンが GenericAvroSerde です。この serde は、「フォーマット、シリアライザー、逆シリアライザー」に定義されているワイヤ形式に従ってデータの読み取りと書き込みを行います。Schema Registry エンドポイントへのアクセスが必要とされ、schema.registry.url パラメーターを使用して GenericAvroDeserializer で定義する必要があります。

以下のコード例では、レコードキーとレコード値の両方に対して、この serde を Kafka Streams アプリケーションのデフォルトの serde として構成します。

Properties streamsConfiguration = new Properties();
streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, ReflectionAvroSerde.class);
streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, ReflectionAvroSerde.class);
streamsConfiguration.put(
       AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
       "http://confluent-schema-registry-server:8081/");

以下のコード例では、アプリケーションのデフォルトの serde がどのように構成されているかにかかわらず、Kafka Streams のような特定の操作でのみこの serde を使用するように、明示的にオーバーライドします。

Serde<MyJavaClassGeneratedFromAvroSchema> reflectionAvroSerde = new ReflectionAvroSerde<>();
   boolean isKeySerde = false;
   reflectionAvroSerde.configure(
       Collections.singletonMap(
           AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
           "http://confluent-schema-registry-server:8081/"),
       isKeySerde);
KStream<String, MyJavaClassGeneratedFromAvroSchema> stream = ...;
stream.to(Serdes.String(), reflectionAvroSerde, "my-output-topic");

JSON

Kafka Streams のコード例には、JSON スキーマ用の基本的な serde の実装も含まれています。

サンプルファイルに示されているように、JSONSerdes の内部クラスの Serdes.serdeFrom(&lt;serializerInstance&gt;, &lt;deserializerInstance&gt;) を使用すると、JSON と互換性のあるシリアライザーと逆シリアライザーを構築できます。

JSON スキーマ

Confluent は、JSON フォーマットのデータに対応する Schema Registry 互換の JSON スキーマ Serde を提供しています。

<dependency>
    <groupId>io.confluent</groupId>
    <artifactId>kafka-streams-json-schema-serde</artifactId>
    <version>6.2.4</version>
</dependency>

Avro デシリアライザーが具体的な Avro レコード型のインスタンスまたは GenericRecord を返すように、JSON Schema デシリアライザーも、具体的な Java クラスまたは JsonNode のインスタンスを返します。詳細については、「JSON Schema のシリアライザーと逆シリアライザー」を参照してください。

以下のコード例では、KafkaJsonSchemaSerde クラスを使用して、スキーマ付きの JSON レコードをシリアル化および逆シリアル化する方法を示します。コード全体の記述については、KafkaJsonSchemaSerdeTest.java を参照してください。

private static final String ANY_TOPIC = "any-topic";

private static ObjectMapper objectMapper = new ObjectMapper();

private static final String recordSchemaString = "{\"properties\": {\n"
    + "     \"null\": {\"type\": \"null\"},\n"
    + "     \"boolean\": {\"type\": \"boolean\"},\n"
    + "     \"number\": {\"type\": \"number\"},\n"
    + "     \"string\": {\"type\": \"string\"}\n"
    + "  },\n"
    + "  \"additionalProperties\": false\n"
    + "}";

private static final JsonSchema recordSchema = new JsonSchema(recordSchemaString);

private Object createJsonRecord() throws IOException {
  String json = "{\n"
      + "    \"null\": null,\n"
      + "    \"boolean\": true,\n"
      + "    \"number\": 12,\n"
      + "    \"string\": \"string\"\n"
      + "}";

  return objectMapper.readValue(json, Object.class);
}

private static KafkaJsonSchemaSerde<Object> createConfiguredSerdeForRecordValues() {
  SchemaRegistryClient schemaRegistryClient = new SchemaRegistryClient();
  KafkaJsonSchemaSerde<Object> serde = new KafkaJsonSchemaSerde<>(schemaRegistryClient);
  Map<String, Object> serdeConfig = new HashMap<>();
  serdeConfig.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "demo");
  serde.configure(serdeConfig, false);
  return serde;
}

public void shouldRoundTripRecords() throws Exception {
  // Given
  KafkaJsonSchemaSerde<Object> serde = createConfiguredSerdeForRecordValues();
  Object record = createJsonRecord();

  // When
  Object roundtrippedRecord = serde.deserializer().deserialize(
      ANY_TOPIC, serde.serializer().serialize(ANY_TOPIC, record));

  // Then
  assertThat(roundtrippedRecord, equalTo(record));

  // Cleanup
  serde.close();
}

Protobuf

Confluent は、Protobuf フォーマットのデータに対応する Schema Registry 互換の Protobuf Serde を提供しています。

<dependency>
  <groupId>io.confluent</groupId>
  <artifactId>kafka-streams-protobuf-serde</artifactId>
  <version>6.2.4</version>
</dependency>

Protobuf serde では、スキーマから他のスキーマを参照する "参照スキーマ" という機能がサポートされます。Protobuf スキーマは、参照スキーマを自動的に登録することもできます。詳細については、「Protobuf スキーマのシリアライザーと逆シリアライザー」を参照してください。

以下のコード例では、KafkaProtobufSerde クラスを使用して、スキーマ付きの Protoobuf レコードをシリアル化および逆シリアル化する方法を示します。コード全体の記述については、KafkaProtobufSerdeTest.java を参照してください。

private static final String ANY_TOPIC = "any-topic";

private static final String recordSchemaString = "syntax = \"proto3\";\n"
    + "\n"
    + "option java_package = \"io.confluent.kafka.serializers.protobuf.test\";\n"
    + "option java_outer_classname = \"TestMessageProtos\";\n"
    + "\n"
    + "import \"google/protobuf/descriptor.proto\";\n"
    + "\n"
    + "message TestMessage {\n"
    + "    string test_string = 1 [json_name = \"test_str\"];\n"
    + "    bool test_bool = 2;\n"
    + "    bytes test_bytes = 3;\n"

    ...

    + "}\n";

private static final ProtobufSchema recordSchema = new ProtobufSchema(recordSchemaString);

private DynamicMessage createDynamicMessage() {
  DynamicMessage.Builder builder = recordSchema.newMessageBuilder();
  Descriptors.Descriptor desc = builder.getDescriptorForType();
  Descriptors.FieldDescriptor fd = desc.findFieldByName("test_string");
  builder.setField(fd, "string");
  fd = desc.findFieldByName("test_bool");
  builder.setField(fd, true);
  fd = desc.findFieldByName("test_bytes");
  builder.setField(fd, ByteString.copyFromUtf8("hello"));

  ...

  return builder.build();
}

private static KafkaProtobufSerde<Message> createConfiguredSerdeForRecordValues() {
  SchemaRegistryClient schemaRegistryClient = new SchemaRegistryClient();
  KafkaProtobufSerde<Message> serde = new KafkaProtobufSerde<>(schemaRegistryClient);
  Map<String, Object> serdeConfig = new HashMap<>();
  serdeConfig.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "demo");
  serde.configure(serdeConfig, false);
  return serde;
}

public void shouldRoundTripRecords() {
  // Given
  KafkaProtobufSerde<Message> serde = createConfiguredSerdeForRecordValues();
  DynamicMessage record = createDynamicMessage();

  // When
  Message roundtrippedRecord = serde.deserializer().deserialize(
      ANY_TOPIC, serde.serializer().serialize(ANY_TOPIC, record));

  // Then
  assertThat(roundtrippedRecord, equalTo(record));

  // Cleanup
  serde.close();
}

その他の serde

Confluent のサンプルリポジトリ には、テンプレート化された serde の実装方法を示す例が含まれています。

カスタム Serde の実装

カスタムの Serde を実装する必要がある場合は、最初の手順として、既存の Serde のソースコード(前のセクションを参照)を参考にするのが最適です。一般に、実装のワークフローは次のようになります。

  1. org.apache.kafka.common.serialization.Serializer を実装して、データ型 T に対応する "シリアライザー" を記述します。
  2. org.apache.kafka.common.serialization.Deserializer を実装して、T に対応する "逆シリアライザー" を記述します。
  3. org.apache.kafka.common.serialization.Serde を実装して、T に対応する "serde" を記述します。これは手動で行うか(前のセクションで紹介した既存の Serde を参照)、Serde に含まれている Serdes.serdeFrom(Serializer<T>, Deserializer<T>) などのヘルパー関数を利用します。KafkaStreams に提供する構成でカスタム serde を使用する場合は、独自のクラス(ジェネリック型を含まないもの)を実装する必要があることに注意してください。serde クラスにジェネリック型が含まれている場合、または Serdes.serdeFrom(Serializer<T>, Deserializer<T>) を使用している場合、その serde はメソッド呼び出し(builder.stream("topicName", Consumed.with(...)) など)にのみ渡すことができます。

注釈

このウェブサイトには、Apache License v2 の条件に基づいて Apache Software Foundation で開発されたコンテンツが含まれています。