Kafka Streams のデータ型とシリアル化¶
すべての Kafka Streams アプリケーションでは、必要時にデータを具現化できるように、レコードキーとレコード値のデータ型(java.lang.String
や Avro オブジェクトなど)に対応する SerDe(シリアライザーと逆シリアライザー)を提供する必要があります。このような SerDe の情報が要求される操作には、stream()
、table()
、to()
、repartition()
、groupByKey()
、groupBy()
があります。
SerDe を指定するには、次のいずれかの方法を使用します。
Properties
インスタンスを通じてデフォルトの SerDe を設定する。- 適切な API メソッドの呼び出し時に明示的に SerDe を指定する。これにより、デフォルトがオーバーライドされます。
Java ストリームアプリケーションは、Kafka コンソールプロデューサー、JDBC Source Connector、Java クライアントプロデューサーなどの複数の方法で、データを逆シリアル化したり取り込んだりするように構成できます。完全なコードの例については、「Kafka Connect と Kafka Streams によるパイプライン処理」を参照してください。
SerDe の構成¶
Properties
構成を通じて Streams の構成に指定されている SerDe は、Kafka Streams アプリケーションでデフォルトとして使用されます。
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsConfig;
Properties settings = new Properties();
// Default serde for keys of data records (here: built-in serde for String type)
settings.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
// Default serde for values of data records (here: built-in serde for Long type)
settings.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Long().getClass().getName());
注釈
Properties
を通じて Serde
を指定する場合、その Serde
クラスにジェネリック型を含めることはできません。つまり、MySerde<T extends Number> implements Serde<T>
というクラスは使用できません。これは、Serdes.serdeFrom(Serializer<T>, Deserializer<T>)
によって作成された Serde
は使用できないことを意味します。Java の型消去の影響により、MySerde implements Serde<MyCustomType>
のように完全に型指定された Serde
クラスだけがサポートされます。
デフォルトの SerDe のオーバーライド¶
適切な API メソッドに SerDe を渡すと、明示的に指定した SerDe でデフォルトの serde 設定をオーバーライドできます。
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
final Serde<String> stringSerde = Serdes.String();
final Serde<Long> longSerde = Serdes.Long();
// The stream userCountByRegion has type `String` for record keys (for region)
// and type `Long` for record values (for user counts).
KStream<String, Long> userCountByRegion = ...;
userCountByRegion.to("RegionCountsTopic", Produced.with(stringSerde, longSerde));
serde を選択的にオーバーライドして、一部のフィールドではデフォルトを保持する場合は、デフォルト設定の使用が必要な場面に対して serde を指定しないようにします。
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
// Use the default serializer for record keys (here: region as String) by not specifying the key serde,
// but override the default serializer for record values (here: userCount as Long).
final Serde<Long> longSerde = Serdes.Long();
KStream<String, Long> userCountByRegion = ...;
userCountByRegion.to("RegionCountsTopic", Produced.valueSerde(Serdes.Long()));
注釈
受信レコードの一部が破損している場合やフォーマットが正しくない場合は、逆シリアライザークラスによってエラーが報告されます。リリース 4.0.0 から導入された org.apache.kafka.streams.errors.DeserializationExceptionHandler
インターフェイスを使用すると、このようなレコードの処理方法をカスタマイズできます。カスタマイズしたインターフェイスの実装は、StreamsConfig
を通じて指定できます。詳細については、FAQ の「エラーと例外処理」を参照してください。
使用可能な SerDe¶
プリミティブ型と基本型¶
Apache Kafka® の kafka-clients
Maven アーティファクトには、byte[]
などの Java のプリミティブ型および基本型に対応する組み込みの serde の実装がいくつか含まれています。
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>6.1.5-ccs</version>
</dependency>
このアーティファクトでは、パッケージ org.apache.kafka.common.serialization で以下の serde の実装を提供します。これらは、Streams 構成でデフォルトのシリアライザーを定義するときなどに利用できます。
データ型 | Serde |
---|---|
byte[] | Serdes.ByteArray() 、Serdes.Bytes() (下記のヒントを参照) |
ByteBuffer | Serdes.ByteBuffer() |
Double | Serdes.Double() |
Integer | Serdes.Integer() |
Long | Serdes.Long() |
String | Serdes.String() |
UUID | Serdes.UUID() |
ちなみに
Bytes は Java の byte[]
(バイト配列)に対するラッパーで、適切な等価性と順序付けのセマンティクスをサポートします。アプリケーションでは、byte[]
の代わりに Bytes
を使用することを検討できます。
String の serde を例として、組み込みの SerDe の使用方法を以下に示します。
// When configuring the default SerDes of StreamConfig
Properties streamsConfiguration = new Properties();
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
// When you want to override SerDes explicitly/selectively
final Serde<String> stringSerde = Serdes.String();
StreamsBuilder builder = new StreamsBuilder();
builder.stream("my-avro-topic", Consumed.with(keyGenericAvroSerde, valueGenericAvroSerde));
Avro¶
Confluent は、汎用的な Avro と固有の Avro フォーマットのデータに対して、Schema Registry 互換の Avro serde を提供しています。
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-streams-avro-serde</artifactId>
<version>6.1.5</version>
</dependency>
汎用と固有のどちらの Avro serde でも、schema.registry.url
設定を通じて Confluent Schema Registry のエンドポイントを構成する必要があります。
StreamsConfig
を使用して汎用または固有の Avro serde をデフォルトの serde として定義する場合は、StreamsConfig
に Schema Registry のエンドポイントも設定する必要があります。- 汎用または固有の Avro serde を直接インスタンス化する場合は(
new GenericAvroSerde()
など)、serde インスタンスを使用する前に、serde インスタンスのSerde#configure()
を呼び出して Schema Registry のエンドポイントを設定する必要があります。さらに、Serde#configure()
でブール型のパラメーターを使用して、レコード "キー"(true
)またはレコード "値"(false
)のシリアル化と逆シリアル化に serde インスタンスを使用するかどうかを指定する必要があります。
Confluent の GenericAvroSerde
を使用する例を以下に示します。
// Generic Avro serde example
import io.confluent.kafka.streams.serdes.avro.GenericAvroSerde;
// When configuring the default serdes of StreamConfig
final Properties streamsConfiguration = new Properties();
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, GenericAvroSerde.class);
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, GenericAvroSerde.class);
streamsConfiguration.put("schema.registry.url", "http://my-schema-registry:8081");
// When you want to override serdes explicitly/selectively
final Map<String, String> serdeConfig = Collections.singletonMap("schema.registry.url",
"http://my-schema-registry:8081");
final Serde<GenericRecord> keyGenericAvroSerde = new GenericAvroSerde();
keyGenericAvroSerde.configure(serdeConfig, true); // `true` for record keys
final Serde<GenericRecord> valueGenericAvroSerde = new GenericAvroSerde();
valueGenericAvroSerde.configure(serdeConfig, false); // `false` for record values
StreamsBuilder builder = new StreamsBuilder();
KStream<GenericRecord, GenericRecord> textLines =
builder.stream(keyGenericAvroSerde, valueGenericAvroSerde, "my-avro-topic");
Confluent の SpecificAvroSerde
を使用する例を以下に示します。
// Specific Avro serde example
import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde;
// When configuring the default serdes of StreamConfig
final Properties streamsConfiguration = new Properties();
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, SpecificAvroSerde.class);
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, SpecificAvroSerde.class);
streamsConfiguration.put("schema.registry.url", "http://my-schema-registry:8081");
// When you want to override serdes explicitly/selectively
final Map<String, String> serdeConfig = Collections.singletonMap("schema.registry.url",
"http://my-schema-registry:8081");
// `Foo` and `Bar` are Java classes generated from Avro schemas
final Serde<Foo> keySpecificAvroSerde = new SpecificAvroSerde<>();
keySpecificAvroSerde.configure(serdeConfig, true); // `true` for record keys
final Serde<Bar> valueSpecificAvroSerde = new SpecificAvroSerde<>();
valueSpecificAvroSerde.configure(serdeConfig, false); // `false` for record values
StreamsBuilder builder = new StreamsBuilder();
KStream<Foo, Bar> textLines = builder.stream("my-avro-topic", Consumed.with(keySpecificAvroSerde, valueSpecificAvroSerde));
ソースストリームを作成するときは、Streams DSL を使用して入力 serde を指定します。低レベルの Processor API を使用してプロセッサートポロジーを構築する場合は、Confluent の GenericAvroSerde
クラスや SpecificAvroSerde
クラスなどの serde クラスを指定できます。
TopologyBuilder builder = new TopologyBuilder();
builder.addSource("Source", keyGenericAvroSerde.deserializer(), valueGenericAvroSerde.deserializer(), inputTopic);
以下のエンドツーエンドのデモは、Confluent Avro serde の使用例を示しています。
リフレクション Avro¶
バージョン 5.4.0 以降の Confluent Platform には、"リフレクション Avro" フォーマットでデータを読み書きするためのシリアライザーと逆シリアライザーも用意されています。この serde の "汎用 Avro" バージョンが GenericAvroSerde
です。この serde は、「フォーマット、シリアライザー、逆シリアライザー」に定義されているワイヤ形式に従ってデータの読み取りと書き込みを行います。Schema Registry エンドポイントへのアクセスが必要とされ、schema.registry.url パラメーターを使用して GenericAvroDeserializer で定義する必要があります。
以下のコード例では、レコードキーとレコード値の両方に対して、この serde を Kafka Streams アプリケーションのデフォルトの serde として構成します。
Properties streamsConfiguration = new Properties();
streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, ReflectionAvroSerde.class);
streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, ReflectionAvroSerde.class);
streamsConfiguration.put(
AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
"http://confluent-schema-registry-server:8081/");
以下のコード例では、アプリケーションのデフォルトの serde がどのように構成されているかにかかわらず、Kafka Streams のような特定の操作でのみこの serde を使用するように、明示的にオーバーライドします。
Serde<MyJavaClassGeneratedFromAvroSchema> reflectionAvroSerde = new ReflectionAvroSerde<>();
boolean isKeySerde = false;
reflectionAvroSerde.configure(
Collections.singletonMap(
AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
"http://confluent-schema-registry-server:8081/"),
isKeySerde);
KStream<String, MyJavaClassGeneratedFromAvroSchema> stream = ...;
stream.to(Serdes.String(), reflectionAvroSerde, "my-output-topic");
JSON¶
Kafka Streams のコード例には、JSON スキーマ用の基本的な serde の実装も含まれています。
サンプルファイルに示されているように、JSONSerdes の内部クラスの Serdes.serdeFrom(<serializerInstance>, <deserializerInstance>)
を使用すると、JSON と互換性のあるシリアライザーと逆シリアライザーを構築できます。
JSON スキーマ¶
Confluent は、JSON フォーマットのデータに対応する Schema Registry 互換の JSON スキーマ Serde を提供しています。
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-streams-json-schema-serde</artifactId>
<version>6.1.5</version>
</dependency>
Avro デシリアライザーが具体的な Avro レコード型のインスタンスまたは GenericRecord
を返すように、JSON Schema デシリアライザーも、具体的な Java クラスまたは JsonNode
のインスタンスを返します。詳細については、「JSON Schema のシリアライザーと逆シリアライザー」を参照してください。
以下のコード例では、KafkaJsonSchemaSerde
クラスを使用して、スキーマ付きの JSON レコードをシリアル化および逆シリアル化する方法を示します。コード全体の記述については、KafkaJsonSchemaSerdeTest.java を参照してください。
private static final String ANY_TOPIC = "any-topic";
private static ObjectMapper objectMapper = new ObjectMapper();
private static final String recordSchemaString = "{\"properties\": {\n"
+ " \"null\": {\"type\": \"null\"},\n"
+ " \"boolean\": {\"type\": \"boolean\"},\n"
+ " \"number\": {\"type\": \"number\"},\n"
+ " \"string\": {\"type\": \"string\"}\n"
+ " },\n"
+ " \"additionalProperties\": false\n"
+ "}";
private static final JsonSchema recordSchema = new JsonSchema(recordSchemaString);
private Object createJsonRecord() throws IOException {
String json = "{\n"
+ " \"null\": null,\n"
+ " \"boolean\": true,\n"
+ " \"number\": 12,\n"
+ " \"string\": \"string\"\n"
+ "}";
return objectMapper.readValue(json, Object.class);
}
private static KafkaJsonSchemaSerde<Object> createConfiguredSerdeForRecordValues() {
SchemaRegistryClient schemaRegistryClient = new SchemaRegistryClient();
KafkaJsonSchemaSerde<Object> serde = new KafkaJsonSchemaSerde<>(schemaRegistryClient);
Map<String, Object> serdeConfig = new HashMap<>();
serdeConfig.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "demo");
serde.configure(serdeConfig, false);
return serde;
}
public void shouldRoundTripRecords() throws Exception {
// Given
KafkaJsonSchemaSerde<Object> serde = createConfiguredSerdeForRecordValues();
Object record = createJsonRecord();
// When
Object roundtrippedRecord = serde.deserializer().deserialize(
ANY_TOPIC, serde.serializer().serialize(ANY_TOPIC, record));
// Then
assertThat(roundtrippedRecord, equalTo(record));
// Cleanup
serde.close();
}
Protobuf¶
Confluent は、Protobuf フォーマットのデータに対応する Schema Registry 互換の Protobuf Serde を提供しています。
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-streams-protobuf-serde</artifactId>
<version>6.1.5</version>
</dependency>
Protobuf serde では、スキーマから他のスキーマを参照する "参照スキーマ" という機能がサポートされます。Protobuf スキーマは、参照スキーマを自動的に登録することもできます。詳細については、「Protobuf スキーマのシリアライザーと逆シリアライザー」を参照してください。
以下のコード例では、KafkaProtobufSerde
クラスを使用して、スキーマ付きの Protoobuf レコードをシリアル化および逆シリアル化する方法を示します。コード全体の記述については、KafkaProtobufSerdeTest.java を参照してください。
private static final String ANY_TOPIC = "any-topic";
private static final String recordSchemaString = "syntax = \"proto3\";\n"
+ "\n"
+ "option java_package = \"io.confluent.kafka.serializers.protobuf.test\";\n"
+ "option java_outer_classname = \"TestMessageProtos\";\n"
+ "\n"
+ "import \"google/protobuf/descriptor.proto\";\n"
+ "\n"
+ "message TestMessage {\n"
+ " string test_string = 1 [json_name = \"test_str\"];\n"
+ " bool test_bool = 2;\n"
+ " bytes test_bytes = 3;\n"
...
+ "}\n";
private static final ProtobufSchema recordSchema = new ProtobufSchema(recordSchemaString);
private DynamicMessage createDynamicMessage() {
DynamicMessage.Builder builder = recordSchema.newMessageBuilder();
Descriptors.Descriptor desc = builder.getDescriptorForType();
Descriptors.FieldDescriptor fd = desc.findFieldByName("test_string");
builder.setField(fd, "string");
fd = desc.findFieldByName("test_bool");
builder.setField(fd, true);
fd = desc.findFieldByName("test_bytes");
builder.setField(fd, ByteString.copyFromUtf8("hello"));
...
return builder.build();
}
private static KafkaProtobufSerde<Message> createConfiguredSerdeForRecordValues() {
SchemaRegistryClient schemaRegistryClient = new SchemaRegistryClient();
KafkaProtobufSerde<Message> serde = new KafkaProtobufSerde<>(schemaRegistryClient);
Map<String, Object> serdeConfig = new HashMap<>();
serdeConfig.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "demo");
serde.configure(serdeConfig, false);
return serde;
}
public void shouldRoundTripRecords() {
// Given
KafkaProtobufSerde<Message> serde = createConfiguredSerdeForRecordValues();
DynamicMessage record = createDynamicMessage();
// When
Message roundtrippedRecord = serde.deserializer().deserialize(
ANY_TOPIC, serde.serializer().serialize(ANY_TOPIC, record));
// Then
assertThat(roundtrippedRecord, equalTo(record));
// Cleanup
serde.close();
}
その他の serde¶
Confluent のサンプルリポジトリ には、テンプレート化された serde の実装方法を示す例が含まれています。
PriorityQueue<T>
serde: PriorityQueueSerde
カスタム SerDe の実装¶
カスタムの SerDe を実装する必要がある場合は、最初の手順として、既存の SerDe のソースコード(前のセクションを参照)を参考にするのが最適です。一般に、実装のワークフローは次のようになります。
- org.apache.kafka.common.serialization.Serializer を実装して、データ型
T
に対応する "シリアライザー" を記述します。 - org.apache.kafka.common.serialization.Deserializer を実装して、
T
に対応する "逆シリアライザー" を記述します。 - org.apache.kafka.common.serialization.Serde を実装して、
T
に対応する "serde" を記述します。これは手動で行うか(前のセクションで紹介した既存の SerDe を参照)、Serdes に含まれているSerdes.serdeFrom(Serializer<T>, Deserializer<T>)
などのヘルパー関数を利用します。KafkaStreams
に提供する構成でカスタム serde を使用する場合は、独自のクラス(ジェネリック型を含まないもの)を実装する必要があることに注意してください。serde クラスにジェネリック型が含まれている場合、またはSerdes.serdeFrom(Serializer<T>, Deserializer<T>)
を使用している場合、その serde はメソッド呼び出し(builder.stream("topicName", Consumed.with(...))
など)にのみ渡すことができます。
注釈
このウェブサイトには、Apache License v2 の条件に基づいて Apache Software Foundation で開発されたコンテンツが含まれています。