Schema Formats, Serializers, and Deserializers

Confluent Platform 5.5 adds support for Protocol Buffers and JSON Schema along with Avro, the original default format for Confluent Platform. Support for these new serialization formats is not limited to Schema Registry, but provided throughout Confluent Platform. Additionally, as of Confluent Platform 5.5, Schema Registry is extensible to support adding custom schema formats as schema plugins.

New Kafka serializers and deserializers are available for Protobuf and JSON Schema, along with Avro. The serializers can automatically register schemas when serializing a Protobuf message or a JSON-serializable object. The Protobuf serializer can recursively register all imported schemas, .

The serializers and deserializers are available in multiple languages, including Java, .NET and Python.

Schema Registry supports multiple formats at the same time. For example, you can have Avro schemas in one subject and Protobuf schemas in another. Furthermore, both Protobuf and JSON Schema have their own compatibility rules, so you can have your Protobuf schemas evolve in a backward or forward compatible manner, just as with Avro.

Schema Registry in Confluent Platform 5.5 also adds the support for schema references in Protobuf by modeling the import statement.

Supported Formats

The following schema formats are supported out-of-the box with Confluent Platform, with serializers, deserializers, and command line tools available for each format:

Format Producer Consumer
Avro io.confluent.kafka.serializers.KafkaAvroSerializer io.confluent.kafka.serializers.KafkaAvroDeserializer
ProtoBuf io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializer
JSON Schema io.confluent.kafka.serializers.json.KafkaJsonSchemaSerializer io.confluent.kafka.serializers.json.KafkaJsonSchemaDeserializer

Use the serializer and deserializer for your schema format. Specify the serializer in the code for the Kafka producer to send messages, and specify the deserializer in the code for the Kafka consumer to read messages.

The new Protobuf and JSON Schema serializers and deserializers support many of the same configuration properties as the Avro equivalents, including subject name strategies for the key and value. In the case of the RecordNameStrategy (and TopicRecordNameStrategy), the subject name will be:

  • For Avro, the record name.
  • For Protobuf, the message name.
  • For JSON Schema, the title.

When using RecordNameStrategy with Protobuf and JSON Schema, there is additional configuration that is required. This, along with examples and command line testing utilities, is covered in the deep dive sections:

The serializers and Kafka Connect converters for all supported schema formats automatically register schemas by default. The Protobuf serializer recursively registers all referenced schemas separately.

With Protobuf and JSON Schema support, the Confluent Platform adds the ability to add new schema formats using schema plugins (the existing Avro support has been wrapped with an Avro schema plugin).

Schema References

Confluent Platform versions 5.5.0 and later provide full support for the notion of schema references, the ability of a schema to refer to other schemas. Support for schema references is provided for out-of-the-box schema formats: Avro, JSON Schema, and Protobuf. Schema references use the import statement of Protobuf and the $ref field of JSON Schema. Avro in Confluent Platform is also updated to support schema references.

A schema reference consists of the following:

  • A name for the reference. (For Avro, the reference name is the fully qualified schema name, for JSON Schema it is a URL, and for Protobuf, it is the name of another Protobuf file.)
  • A subject, representing the subject under which the referenced schema is registered.
  • A version, representing the exact version of the schema under the registered subject.

When registering a schema, you must provide the associated references, if any. Typically the referenced schemas would be registered first, then their subjects and versions can be used when registering the schema that references them.

When a schema that has references is retrieved from Schema Registry, the referenced schemas are also retrieved if needed.

See these sections for examples of schema references in each of the formats:

Multiple Event Types in the Same Topic

In addition to providing a way for one schema to call other schemas, Schema References can be used to efficiently combine multiple event types in the same topic and still maintain subject-topic constraints. Using schema references to achieve this is a new approach to putting multiple event types in the same topic.

See these sections for examples in each of the formats:

APIs

Starting with Confluent Platform 5.5.0, two additional endpoints are available, as further described in the Schema Registry API Reference.

  • This endpoint shows the IDs of schemas that reference the schema with the given subject and version.

    GET /subjects/{subject}/versions/{version}/referencedby
    
  • This endpoint shows all subject-version pairs where the ID is used.

    GET /schemas/ids/{id}/versions
    

Schema Format Extensibility

By default, Schema Registry loads schema plugins for Avro, Protobuf, and JSON Schema. When using the REST API, specify the schema type as AVRO, PROTOBUF, or JSON, respectively.

You can create custom schema plugins by implementing the SchemaProvider and ParsedSchema interfaces. To load the custom schema plugin into Schema Registry, place the JARs for the plugins on the CLASSPATH and then use the following Schema Registry configuration property to identify the comma separated list of additional plugin provider classes to be used:

schema.providers=com.acme.MySchemaProvider

Do not include the schema plugins for AVRO, PROTOBUF, or JSON, since Schema Registry always loads them.

Subject Name Strategy

A serializer registers a schema in Schema Registry under a subject name, which defines a namespace in the registry:

  • Compatibility checks are per subject
  • Versions are tied to subjects
  • When schemas evolve, they are still associated to the same subject but get a new schema ID and version

Overview

The subject name depends on the subject name strategy. Three supported strategies include:

Strategy Description
TopicNameStrategy Derives subject name from topic name. (This is the default.)
RecordNameStrategy Derives subject name from record name, and provides a way to group logically related events that may have different data structures under a subject.
TopicRecordNameStrategy Derives the subject name from topic and record name, as a way to group logically related events that may have different data structures under a subject.

Note

The full class names for the above strategies consist of the strategy name prefixed by io.confluent.kafka.serializers.subject.

Group by Topic or Other Relationships

The default naming strategy (TopicNameStrategy) names the schema based on the topic name and implicitly requires that all messages in the same topic conform to the same schema, otherwise a new record type could break compatibility checks on the topic. This is a good strategy for scenarios where grouping messages by topic name makes sense, such as aggregating logged activities or stream processing website comment threads.

The non-default naming strategies (RecordNameStrategy and TopicRecordNameStrategy) support schema management for use cases where grouping by topic isn’t optimal, for example a single topic can have records that use multiple schemas. This is useful when your data represents a time-ordered sequence of events, and the messages have different data structures. In this case, it is more useful to keep an ordered sequence of related messages together that play a part in a chain of events, regardless of topic names. For example, a financial service that tracks a customer account might include initiating checking and savings, making a deposit, then a withdrawal, applying for a loan, getting approval, and so forth.

How the Naming Strategies Work

The following table compares the strategies.

Behavior TopicNameStrategy RecordNameStrategy TopicRecordNameStrategy
Subject format plus “-key” or “-value” depending on configuration <topic name> <fully-qualified record name> <topic name>-<fully-qualified record name>
Unique subject per topic Yes No Yes
Schema Registry checks compatibility across all schemas in a topic Yes No, checks compatibility of any occurrences of the same record name across all topics. Yes, moreover, different topics may contain mutually incompatible versions of the same record name, since the compatibility check is scoped to a particular record name within a particular topic.
Multiple topics can have records with the same schema Yes Yes Yes
Multiple subjects can have schemas with the same schema ID, if schema is identical Yes Yes Yes
A single topic can have multiple schemas for the same record type, i.e. schema evolution Yes Yes Yes
A single topic can have multiple record types Not generally because a new record type could break Schema Registry compatibility checks done on the topic Yes Yes
Requires client application to change setting No, because it is already the default for all clients Yes Yes
The same subject can be reused for replicated topics that have been renamed, i.e., Replicator configured with topic.rename.format No, requires manual subject registration with new topic name Yes No, requires manual subject registration with new topic name

Limitations

  • The Go client currently has no Schema Registry integration.
  • ksqlDB uses only the default TopicNameStrategy, and does not currently support multiple schemas in a single topic.
  • Confluent Control Center options to view and edit schemas through the user interface are available only for schemas that use the default TopicNameStrategy.

Configuration Details

The Kafka serializers and deserializers default to using <topicName>-Key and <topicName>-value as the corresponding subject name while registering or retrieving the schema.

This behavior can be modified by using the following configs:

key.subject.name.strategy

Determines how to construct the subject name under which the key schema is registered with the Schema Registry.

Any implementation of io.confluent.kafka.serializers.subject.strategy.SubjectNameStrategy can be specified. By default, <topic>-key is used as subject. Specifying an implementation of io.confluent.kafka.serializers.subject.SubjectNameStrategy is deprecated as of 4.1.3 and if used may have some performance degradation.

  • Type: class
  • Default: class io.confluent.kafka.serializers.subject.TopicNameStrategy
  • Importance: medium
value.subject.name.strategy

Determines how to construct the subject name under which the value schema is registered with Schema Registry.

Any implementation of io.confluent.kafka.serializers.subject.strategy.SubjectNameStrategy can be specified. By default, <topic>-value is used as subject. Specifying an implementation of io.confluent.kafka.serializers.subject.SubjectNameStrategy is deprecated as of 4.1.3 and if used may have some performance degradation.

  • Type: class
  • Default: class io.confluent.kafka.serializers.subject.TopicNameStrategy
  • Importance: medium

The other available options that can be configured out of the box include:

io.confluent.kafka.serializers.subject.RecordNameStrategy
For any Avro record type that is published to Kafka, registers the schema in the registry under the fully-qualified record name (regardless of the topic). This strategy allows a topic to contain a mixture of different record types, since no intra-topic compatibility checking is performed. Instead, checks compatibility of any occurrences of the same record name across all topics.
io.confluent.kafka.serializers.subject.TopicRecordNameStrategy
For any Avro record type that is published to Kafka topic <topicName>, registers the schema in the registry under the subject name <topicName>-<recordName>, where <recordName> is the fully-qualified Avro record name. This strategy allows a topic to contain a mixture of different record types, since no intra-topic compatibility checking is performed. Moreover, different topics may contain mutually incompatible versions of the same record name, since the compatibility check is scoped to a particular record name within a particular topic.

Compatibility Checks

For Schema Evolution and Compatibility, the following compatibility levels can be defined for all schema formats:

  • BACKWARD
  • FORWARD
  • FULL
  • BACKWARD_TRANSITIVE
  • FORWARD_TRANSITIVE
  • FULL_TRANSITIVE
  • NONE

One of the methods in ParsedSchema is isBackwardCompatible`(ParsedSchema previousSchema). As long as a schema type uses this method to define backward compatibility for a schema type, the other types of compatibility can be derived from it.

The rules for Avro are detailed in the Avro specification under Schema Resolution.

The rules for Protobuf backward compatibility are derived from the Protobuf language specification. These are as follows:

  • Fields can be added. All fields in Protobuf are optional, by default. If you specify defaults, these will be used for backward compatibility.
  • Fields can be removed. A field number can be reused by a new field of the same type. A field number cannot be reused by a new field of a different type.
  • Types int32, uint32, int64, uint64 and bool types are compatible (can be swapped in the same field).
  • Types sint32 and sint64 are compatible (can be swapped in the same field).
  • Types string and bytes are compatible (can be swapped in the same field).
  • Types fixed32 and sfixed32 are compatible (can be swapped in the same field).
  • Types fixed64 and sfixed64 are compatible (can be swapped in the same field).
  • Type enum is compatible with int32, uint32, int64, and uint64 (can be swapped in the same field).
  • Changing a single value into a member of a new oneof is compatible.

The rules for JSON Schema backward compatibility are a bit more involved and so appear in the last section in the JSON Schema deep dive, under JSON Schema Compatibility Rules.

Command-Line Utilities and JSON Encoding of Messages

Both Avro and Protobuf provide options to use human-readable JSON or storage-efficient binary format to encode the messages of either schema format, as described in the respective specifications:

The command line utilities (as well as REST Proxy and Confluent Control Center) make use of these JSON encodings.

To start the Protobuf command line producer:

kafka-protobuf-console-producer --broker-list localhost:9092 --topic t1 --property value.schema='message Foo { required string f1 = 1; }'

To start the Protobuf command line consumer (at a separate terminal):

kafka-protobuf-console-consumer --topic t1 --bootstrap-server localhost:9092

You can now send JSON messages in the form: { “f1”: “some-value” }.

Likewise, to start the JSON Schema command line producer:

kafka-json-schema-console-producer --broker-list localhost:9092 --topic t2 --property value.schema='{"type":"object","properties":{"f1":{"type":"string"}}}'

To start JSON Schema command line consumer:

kafka-json-schema-console-consumer --topic t2 --bootstrap-server localhost:9092

You can send JSON messages of the form { “f1”: “some-value” }.

The producers can also be passed references as either <key.refs> or <value.refs>, for example:

--property value.refs=’[ { “name”: “myName”, “subject”: “mySubject”, “version”: 1 } ]’.

More examples of using these command line utilities are provided in the “Test Drive ..” sections for each of the formats:

Basic Auth Security for Producers and Consumers

Schema Registry supports the ability to authenticate requests using Basic Auth headers. You can send the Basic Auth headers by setting the following configuration in your producer or consumer example.

basic.auth.credentials.source

Specify how to pick the credentials for Basic Auth header. The supported values are URL, USER_INFO and SASL_INHERIT

  • Type: string
  • Default: “URL”
  • Importance: medium

URL - The user info is configured as part of the schema.registry.url config in the form of http://<username>:<password>@sr-host:<sr-port>

USER_INFO - The user info is configured using the below configuration.

basic.auth.user.info

Specify the user info for Basic Auth in the form of {username}:{password}

  • Type: password
  • Default: “”
  • Importance: medium

SASL_INHERIT - Inherit the settings used by the Kafka client to communicate with the broker using SASL SCRAM or SASL PLAIN.

Wire Format

In most cases, you can use the serializers and formatter directly and not worry about the details of how messages are mapped to bytes. However, if you’re working with a language that Confluent has not developed serializers for, or simply want a deeper understanding of how the Confluent Platform works, here is more detail on how data is mapped to low-level bytes.

The wire format currently has only a couple of components:

Bytes Area Description
0 Magic Byte Confluent serialization format version number; currently always 0.
1-4 Schema ID 4-byte schema ID as returned by Schema Registry.
5-… Data Serialized data for the specified schema format (for example, binary encoding for Avro or Protocol Buffers). The only exception is raw bytes, which will be written directly without any special encoding.

Important

  • All components are encoded with big-endian ordering; that is, standard network byte order.
  • The wire format applies to both Kafka message keys and message values.

Compatibility Guarantees across Confluent Platform Versions

The serialization format used by Confluent Platform serializers is guaranteed to be stable over major releases. No changes are made without advanced warning. This is critical because the serialization format affects how keys are mapped across partitions. Since many applications depend on keys with the same logical format being routed to the same physical partition, it is usually important that the physical byte format of serialized data does not change unexpectedly for an application. Even the smallest modification can result in records with the same logical key being routed to different partitions because messages are routed to partitions based on the hash of the key.

To prevent variation even as the serializers are updated with new formats, the serializers are very conservative when updating output formats. To guarantee stability for clients, Confluent Platform and its serializers ensure the following:

  • The format (including magic byte) will not change without significant warning over multiple Confluent Platform major releases. Although the default may change infrequently to allow adoption of new features by default, this will be done very conservatively and with at least one major release between changes, during which the relevant changes will result in user-facing warnings to make sure users are not caught off guard by the need for transition. Significant, compatibility-affecting changes will guarantee at least 1 major release of warning and 2 major releases before an incompatible change is made.
  • Within the version specified by the magic byte, the format will never change in any backwards-incompatible way. Any changes made will be fully backward compatible with documentation in release notes and at least one version of warning provided if it introduces a new serialization feature that requires additional downstream support.
  • Deserialization will be supported over multiple major releases. This does not guarantee indefinite support, but support for deserializing any earlier formats will be supported indefinitely as long as there is no notified reason for incompatibility.

For more information about compatibility or support, reach out to the community mailing list.