Important
You are viewing documentation for an older version of Confluent Platform. For the latest, click here.
Schema Formats, Serializers, and Deserializers¶
Confluent Platform 5.5 adds support for Protocol Buffers and JSON Schema along with Avro, the original default format for Confluent Platform. Support for these new serialization formats is not limited to Schema Registry, but provided throughout Confluent Platform. Additionally, as of Confluent Platform 5.5, Schema Registry is extensible to support adding custom schema formats as schema plugins.
New Kafka serializers and deserializers are available for Protobuf and JSON Schema, along with Avro. The serializers can automatically register schemas when serializing a Protobuf message or a JSON-serializable object. The Protobuf serializer can recursively register all imported schemas, .
The serializers and deserializers are available in multiple languages, including Java, .NET and Python.
Schema Registry supports multiple formats at the same time. For example, you can have Avro schemas in one subject and Protobuf schemas in another. Furthermore, both Protobuf and JSON Schema have their own compatibility rules, so you can have your Protobuf schemas evolve in a backward or forward compatible manner, just as with Avro.
Schema Registry in Confluent Platform 5.5 also adds the support for schema references in Protobuf by modeling the import statement.
Supported Formats¶
The following schema formats are supported out-of-the box with Confluent Platform, with serializers, deserializers, and command line tools available for each format:
Format | Producer | Consumer |
---|---|---|
Avro | io.confluent.kafka.serializers.KafkaAvroSerializer | io.confluent.kafka.serializers.KafkaAvroDeserializer |
ProtoBuf | io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer | io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializer |
JSON Schema | io.confluent.kafka.serializers.json.KafkaJsonSchemaSerializer | io.confluent.kafka.serializers.json.KafkaJsonSchemaDeserializer |
Use the serializer and deserializer for your schema format. Specify the serializer in the code for the Kafka producer to send messages, and specify the deserializer in the code for the Kafka consumer to read messages.
The new Protobuf and JSON Schema serializers and deserializers support many of the same configuration properties
as the Avro equivalents, including subject name strategies
for the key and value. In the case of the RecordNameStrategy
(and TopicRecordNameStrategy
), the subject name will be:
- For Avro, the record name.
- For Protobuf, the message name.
- For JSON Schema, the title.
When using RecordNameStrategy
with Protobuf and JSON Schema, there is
additional configuration that is required. This, along with examples and
command line testing utilities, is covered in the deep dive sections:
The serializers and Kafka Connect converters for all supported schema formats automatically register schemas by default. The Protobuf serializer recursively registers all referenced schemas separately.
With Protobuf and JSON Schema support, the Confluent Platform adds the ability to add new schema formats using schema plugins (the existing Avro support has been wrapped with an Avro schema plugin).
Schema References¶
Confluent Platform versions 5.5.0 and later provide full support for the notion of schema
references, the ability of a schema to refer to other schemas. Support for
schema references is provided for out-of-the-box schema formats: Avro, JSON
Schema, and Protobuf. Schema references use the import
statement of Protobuf
and the $ref
field of JSON Schema. Avro in Confluent Platform is also updated to support schema references.
A schema reference consists of the following:
- A name for the reference. (For Avro, the reference name is the fully qualified schema name, for JSON Schema it is a URL, and for Protobuf, it is the name of another Protobuf file.)
- A subject, representing the subject under which the referenced schema is registered.
- A version, representing the exact version of the schema under the registered subject.
When registering a schema, you must provide the associated references, if any. Typically the referenced schemas would be registered first, then their subjects and versions can be used when registering the schema that references them.
When a schema that has references is retrieved from Schema Registry, the referenced schemas are also retrieved if needed.
See these sections for examples of schema references in each of the formats:
Multiple Event Types in the Same Topic¶
In addition to providing a way for one schema to call other schemas, Schema References can be used to efficiently combine multiple event types in the same topic and still maintain subject-topic constraints. Using schema references to achieve this is a new approach to putting multiple event types in the same topic.
See these sections for examples in each of the formats:
APIs¶
Starting with Confluent Platform 5.5.0, two additional endpoints are available, as further described in the Schema Registry API Reference.
This endpoint shows the IDs of schemas that reference the schema with the given subject and version.
GET /subjects/{subject}/versions/{version}/referencedby
This endpoint shows all subject-version pairs where the ID is used.
GET /schemas/ids/{id}/versions
Schema Format Extensibility¶
By default, Schema Registry loads schema plugins for Avro, Protobuf, and JSON Schema. When using the REST API, specify the schema type as AVRO, PROTOBUF, or JSON, respectively.
You can create custom schema plugins by implementing the SchemaProvider and ParsedSchema interfaces. To load the custom schema plugin into Schema Registry, place the JARs for the plugins on the CLASSPATH and then use the following Schema Registry configuration property to identify the comma separated list of additional plugin provider classes to be used:
schema.providers=com.acme.MySchemaProvider
Do not include the schema plugins for AVRO, PROTOBUF, or JSON, since Schema Registry always loads them.
Subject Name Strategy¶
A serializer registers a schema in Schema Registry under a subject
name, which defines a namespace in the registry:
- Compatibility checks are per subject
- Versions are tied to subjects
- When schemas evolve, they are still associated to the same subject but get a new schema ID and version
Overview¶
The subject name depends on the subject name strategy. Three supported strategies include:
Strategy | Description |
---|---|
TopicNameStrategy | Derives subject name from topic name. (This is the default.) |
RecordNameStrategy | Derives subject name from record name, and provides a way to group logically related events that may have different data structures under a subject. |
TopicRecordNameStrategy | Derives the subject name from topic and record name, as a way to group logically related events that may have different data structures under a subject. |
Note
The full class names for the above strategies consist of the strategy name prefixed by io.confluent.kafka.serializers.subject.
Group by Topic or Other Relationships¶
The default naming strategy (TopicNameStrategy
) names the schema based on
the topic name and implicitly requires that all messages in the same topic
conform to the same schema, otherwise a new record type could break
compatibility checks on the topic. This is a good strategy for scenarios where
grouping by messages by topic name makes sense, such as aggregating logged
activities or stream processing website comment threads.
The non-default naming strategies (RecordNameStrategy
and
TopicRecordNameStrategy
) support schema management for use cases where
grouping by topic isn’t optimal, for example a single topic can have records that use
multiple schemas. This is useful when your data represents a time-ordered sequence
of events, and the messages have different data structures. In this case, it is
more useful to keep an ordered sequence of related messages together that play a part in a
chain of events, regardless of topic names. For example, a financial service
that tracks a customer account might include initiating checking and savings,
making a deposit, then a withdrawal, applying for a loan, getting approval, and
so forth.
How the Naming Strategies Work¶
The following table compares the strategies.
Behavior | TopicNameStrategy | RecordNameStrategy | TopicRecordNameStrategy |
---|---|---|---|
Subject format plus “-key” or “-value” depending on configuration | <topic name> | <fully-qualified record name> | <topic name>-<fully-qualified record name> |
Unique subject per topic | Yes | No | Yes |
Schema Registry checks compatibility across all schemas in a topic | Yes | No, checks compatibility of any occurrences of the same record name across all topics. | Yes, moreover, different topics may contain mutually incompatible versions of the same record name, since the compatibility check is scoped to a particular record name within a particular topic. |
Multiple topics can have records with the same schema | Yes | Yes | Yes |
Multiple subjects can have schemas with the same schema ID, if schema is identical | Yes | Yes | Yes |
A single topic can have multiple schemas for the same record type, i.e. schema evolution | Yes | Yes | Yes |
A single topic can have multiple record types | Not generally because a new record type could break Schema Registry compatibility checks done on the topic | Yes | Yes |
Requires client application to change setting | No, because it is already the default for all clients | Yes | Yes |
The same subject can be reused for replicated topics that have been renamed, i.e., Replicator configured with topic.rename.format | No, requires manual subject registration with new topic name | Yes | No, requires manual subject registration with new topic name |
Limitations¶
- The Go client currently has no Schema Registry integration.
- ksqlDB uses only the default
TopicNameStrategy
, and does not currently support multiple schemas in a single topic. - Confluent Control Center options to view and edit schemas through the user interface are available only for schemas that use the default
TopicNameStrategy
.
Configuration Details¶
The Kafka serializers and deserializers default to using <topicName>-Key and <topicName>-value as the corresponding subject name while registering or retrieving the schema.
This behavior can be modified by using the following configs:
key.subject.name.strategy
Determines how to construct the subject name under which the key schema is registered with the Schema Registry.
Any implementation of
io.confluent.kafka.serializers.subject.strategy.SubjectNameStrategy
can be specified. By default, <topic>-key is used as subject. Specifying an implementation ofio.confluent.kafka.serializers.subject.SubjectNameStrategy
is deprecated as of4.1.3
and if used may have some performance degradation.- Type: class
- Default: class io.confluent.kafka.serializers.subject.TopicNameStrategy
- Importance: medium
value.subject.name.strategy
Determines how to construct the subject name under which the value schema is registered with Schema Registry.
Any implementation of
io.confluent.kafka.serializers.subject.strategy.SubjectNameStrategy
can be specified. By default, <topic>-value is used as subject. Specifying an implementation ofio.confluent.kafka.serializers.subject.SubjectNameStrategy
is deprecated as of4.1.3
and if used may have some performance degradation.- Type: class
- Default: class io.confluent.kafka.serializers.subject.TopicNameStrategy
- Importance: medium
The other available options that can be configured out of the box include:
io.confluent.kafka.serializers.subject.RecordNameStrategy
- For any Avro record type that is published to Kafka, registers the schema in the registry under the fully-qualified record name (regardless of the topic). This strategy allows a topic to contain a mixture of different record types, since no intra-topic compatibility checking is performed. Instead, checks compatibility of any occurrences of the same record name across all topics.
io.confluent.kafka.serializers.subject.TopicRecordNameStrategy
- For any Avro record type that is published to Kafka topic
<topicName>
, registers the schema in the registry under the subject name<topicName>-<recordName>
, where<recordName>
is the fully-qualified Avro record name. This strategy allows a topic to contain a mixture of different record types, since no intra-topic compatibility checking is performed. Moreover, different topics may contain mutually incompatible versions of the same record name, since the compatibility check is scoped to a particular record name within a particular topic.
Compatibility Checks¶
For Schema Evolution and Compatibility, the following compatibility levels can be defined for all schema formats:
- BACKWARD
- FORWARD
- FULL
- BACKWARD_TRANSITIVE
- FORWARD_TRANSITIVE
- FULL_TRANSITIVE
- NONE
One of the methods in ParsedSchema
is isBackwardCompatible`(ParsedSchema previousSchema)
.
As long as a schema type uses this method to define backward compatibility for a
schema type, the other types of compatibility can be derived from it.
The rules for Avro are detailed in the Avro specification under Schema Resolution.
The rules for Protobuf backward compatibility are derived from the Protobuf language specification. These are as follows:
- Fields can be added. All fields in Protobuf are optional, by default. If you specify defaults, these will be used for backward compatibility.
- Fields can be removed. A field number can be reused by a new field of the same type. A field number cannot be reused by a new field of a different type.
- Types
int32
,uint32
,int64
,uint64
andbool
types are compatible (can be swapped in the same field). - Types
sint32
andsint64
are compatible (can be swapped in the same field). - Types
string
andbytes
are compatible (can be swapped in the same field). - Types
fixed32
andsfixed32
are compatible (can be swapped in the same field). - Types
fixed64
andsfixed64
are compatible (can be swapped in the same field). - Type
enum
is compatible withint32
,uint32
,int64
, anduint64
(can be swapped in the same field). - Changing a single value into a member of a new
oneof
is compatible.
The rules for JSON Schema backward compatibility are a bit more involved and so appear in the last section in the JSON Schema deep dive, under JSON Schema Compatibility Rules.
Command-Line Utilities and JSON Encoding of Messages¶
Both Avro and Protobuf provide options to use human-readable JSON or storage-efficient binary format to encode the messages of either schema format, as described in the respective specifications:
The command line utilities (as well as REST Proxy and Confluent Control Center) make use of these JSON encodings.
To start the Protobuf command line producer:
kafka-protobuf-console-producer --broker-list localhost:9092 --topic t1 --property value.schema='message Foo { required string f1 = 1; }'
To start the Protobuf command line consumer (at a separate terminal):
kafka-protobuf-console-consumer --topic t1 --bootstrap-server localhost:9092
You can now send JSON messages in the form: { “f1”: “some-value” }
.
Likewise, to start the JSON Schema command line producer:
kafka-json-schema-console-producer --broker-list localhost:9092 --topic t2 --property value.schema='{"type":"object","properties":{"f1":{"type":"string"}}}'
To start JSON Schema command line consumer:
kafka-json-schema-console-consumer --topic t2 --bootstrap-server localhost:9092
You can send JSON messages of the form { “f1”: “some-value” }
.
The producers can also be passed references as either <key.refs>
or <value.refs>
, for example:
--property value.refs=’[ { “name”: “myName”, “subject”: “mySubject”, “version”: 1 } ]’.
More examples of using these command line utilities are provided in the “Test Drive ..” sections for each of the formats:
Basic Auth Security for Producers and Consumers¶
Schema Registry supports the ability to authenticate requests using Basic Auth headers. You can send the Basic Auth headers by setting the following configuration in your producer or consumer example.
basic.auth.credentials.source¶
Specify how to pick the credentials for Basic Auth header. The supported values are URL, USER_INFO and SASL_INHERIT
- Type: string
- Default: “URL”
- Importance: medium
URL - The user info is configured as part of the schema.registry.url
config in the
form of http://<username>:<password>@sr-host:<sr-port>
USER_INFO - The user info is configured using the below configuration.
basic.auth.user.info
Specify the user info for Basic Auth in the form of {username}:{password}
- Type: password
- Default: “”
- Importance: medium
SASL_INHERIT - Inherit the settings used by the Kafka client to communicate with the broker using SASL SCRAM or SASL PLAIN.
Wire Format¶
In most cases, you can use the serializers and formatter directly and not worry about the details of how messages are mapped to bytes. However, if you’re working with a language that Confluent has not developed serializers for, or simply want a deeper understanding of how the Confluent Platform works, here is more detail on how data is mapped to low-level bytes.
The wire format currently has only a couple of components:
Bytes | Area | Description |
---|---|---|
0 | Magic Byte | Confluent serialization format version number; currently always 0 . |
1-4 | Schema ID | 4-byte schema ID as returned by Schema Registry. |
5-… | Data | Serialized data for the specified schema format (for example, binary encoding for Avro or Protocol Buffers). The only exception is raw bytes, which will be written directly without any special encoding. |
Important
- All components are encoded with big-endian ordering; that is, standard network byte order.
- The wire format applies to both Kafka message keys and message values.
Compatibility Guarantees across Confluent Platform Versions¶
The serialization format used by Confluent Platform serializers is guaranteed to be stable over major releases. No changes are made without advanced warning. This is critical because the serialization format affects how keys are mapped across partitions. Since many applications depend on keys with the same logical format being routed to the same physical partition, it is usually important that the physical byte format of serialized data does not change unexpectedly for an application. Even the smallest modification can result in records with the same logical key being routed to different partitions because messages are routed to partitions based on the hash of the key.
To prevent variation even as the serializers are updated with new formats, the serializers are very conservative when updating output formats. To guarantee stability for clients, Confluent Platform and its serializers ensure the following:
- The format (including magic byte) will not change without significant warning over multiple Confluent Platform major releases. Although the default may change infrequently to allow adoption of new features by default, this will be done very conservatively and with at least one major release between changes, during which the relevant changes will result in user-facing warnings to make sure users are not caught off guard by the need for transition. Significant, compatibility-affecting changes will guarantee at least 1 major release of warning and 2 major releases before an incompatible change is made.
- Within the version specified by the magic byte, the format will never change in any backwards-incompatible way. Any changes made will be fully backward compatible with documentation in release notes and at least one version of warning provided if it introduces a new serialization feature that requires additional downstream support.
- Deserialization will be supported over multiple major releases. This does not guarantee indefinite support, but support for deserializing any earlier formats will be supported indefinitely as long as there is no notified reason for incompatibility.
For more information about compatibility or support, reach out to the community mailing list.
Next Steps¶
Suggested Reading¶
- Blog post: Confluent Platform Now Supports Protobuf, JSON Schema, and Custom Formats
- Podcast: Introducing JSON and Protobuf Support with product manager, David Araujo, and engineer, Tushar Thole
- Blog post: Putting Several Event Types in the Same Topic – Revisited
- Using Kafka Connect with Schema Registry
- Kafka Connect converters
- Configuring Key and Value Converters
- Schema Registry and Confluent Cloud