Protobuf Schema Serializer and Deserializer for Schema Registry on Confluent Platform¶
This document describes how to use Protocol Buffers (Protobuf) with the Apache Kafka® Java client and console tools.
The Confluent Schema Registry based Protobuf serializer, by design, does not include the message schema; but rather, includes the schema ID (in addition to a magic byte) followed by message indexes, and finally the normal binary encoding of the data in the protobuf-payload. You can choose whether or not to embed a schema inline; allowing for cases where you may want to communicate the schema offline, with headers, or some other way. This is in contrast to other systems, such as Hadoop, that always include the schema with the message data. To learn more, see Wire format.
Protobuf serializer¶
Plug the KafkaProtobufSerializer
into KafkaProducer
to send messages of Protobuf type to Kafka.
When providing an instance of a Protobuf generated class to the serializer, the serializer can
register the Protobuf schema, and all referenced schemas. For referenced schemas,
by default the serializer will register each referenced schema under a subject with the same name
as the reference. For example, if the main schema references “google/protobuf/timestamp.proto
”,
then the timestamp schema will be registered under a subject named “google/protobuf/timestamp.proto
”.
This behavior can be customized by providing a custom implementation of ReferenceSubjectNameStrategy
to the serializer using the reference.subject.name.strategy
configuration, as described under
Configuration details for schema formats.
The interface looks like this:
public interface ReferenceSubjectNameStrategy extends Configurable
{
/**
* For a given reference name, topic, and message, returns the subject name under which the
* referenced schema should be registered in the schema registry.
*
* @param refName The name of the reference.
* @param topic The Kafka topic name to which the message is being published.
* @param isKey True when encoding a message key, false for a message value.
* @param schema The referenced schema.
* @return The subject name under which the referenced schema should be registered.
*/
String subjectName(String refName, String topic, boolean isKey, ParsedSchema schema);
}
Two implementations are available for the above interface:
io.confluent.kafka.serializers.subject.DefaultReferenceSubjectNameStrategy
- This is the default. It simply returns the reference name as the subject name.io.confluent.kafka.serializers.subject.QualifiedReferenceSubjectNameStrategy
- Given a reference name, replace slashes with dots, and remove the.proto
suffix to obtain the subject name. For example,mypackage/myfile.proto
becomesmypackage.myfile
.
For example, suppose you have the following schema.
syntax = "proto3";
package com.acme;
import "other.proto";
message MyRecord {
string f1 = 1;
OtherRecord f2 = 2;
}
The above schema references other.proto
, which looks like this:
syntax = "proto3";
package com.acme;
message OtherRecord {
int32 other_id = 1;
}
The code below creates an instance of the MyRecord
class that is generated by the Protobuf compiler.
The Kafka producer is configured to serialize the MyRecord
instance with the Protobuf serializer.
Tip
The examples below use the default hostname and port for the Kafka bootstrap server (localhost:9092
) and Schema Registry (localhost:8081
).
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer");
props.put("schema.registry.url", "http://127.0.0.1:8081");
Producer<String, MyRecord> producer = new KafkaProducer<String, MyRecord>(props);
String topic = "testproto";
String key = "testkey";
OtherRecord otherRecord = OtherRecord.newBuilder()
.setOtherId(123).build();
MyRecord myrecord = MyRecord.newBuilder()
.setF1("value1").setF2(otherRecord).build();
ProducerRecord<String, MyRecord> record
= new ProducerRecord<String, MyRecord>(topic, key, myrecord);
producer.send(record).get();
producer.close();
While serializing the Protobuf instance to Kafka, the above code will
automatically register two schemas to Schema Registry, one for MyRecord
and
another for OtherRecord
, to two different subjects. (The default behavior of
automatically registering schemas can be disabled by passing the property
auto.register.schemas=false
to the serializer).
Now, use the REST endpoints to examine the schemas that were registered. First, you can see which subjects were used by using the following command.
curl http://localhost:8081/subjects
Here is the expected output:
["testproto-value", "other.proto"]
The subject for the top-level schema is determined by a SubjectNameStrategy
,
which defaults to suffixing the topic with either -key
or -value
. The
subjects for referenced schemas are determined by a ReferenceSubjectNameStrategy
,
which defaults to the name used in the import statement. Both strategies can be customized.
In the schemas below, note that the new schemaType
field that, which is added for Confluent Platform 5.5.
Also, the top-level schema has a new references
field that refers to other.proto
.
Type the following command to view the testproto-value
schema.
curl http://localhost:8081/subjects/testproto-value/versions/1
Here is the expected output:
{
"subject": "testproto-value",
"version": 1,
"id": 2,
"schemaType": "PROTOBUF",
"references": [
{
"name": "other.proto",
"subject": "other.proto",
"version": 1
}
],
"schema": "syntax = \"proto3\";\npackage com.acme;\n\nimport \"other.proto\";\n\nmessage MyRecord {\n string f1 = 1;\n .com.acme.OtherRecord f2 = 2;\n}\n"
}
Note that the testproto-value
schema ID is 2
, and the other.proto
schema ID is 1
.
Type the following command to view the other.proto
schema.
curl http://localhost:8081/subjects/other.proto/versions/1
Here is the expected output:
{
"subject": "other.proto",
"version": 1,
"id": 1,
"schemaType": "PROTOBUF",
"schema": "syntax = \"proto3\";\npackage com.acme;\n\nmessage OtherRecord {\n int32 other_id = 1;\n}\n"
}
Now, try to delete the referenced schema, other.proto
.
curl -X DELETE http://localhost:8081/subjects/other.proto/versions/1
You will get the following error because Schema Registry prevents you from creating a dangling reference, which would be the case if the referenced schema was deleted before the top-level schema.
{"error_code":42206,"message":"One or more references exist to the schema {magic=1,keytype=SCHEMA,subject=other.proto,version=1}."}
You know that the schema with ID 2
references the schema you just tried to delete,
but you can also get that information from Schema Registry directly with this command:
curl http://localhost:8081/subjects/other.proto/versions/1/referencedby
The above command returns the ID of the testproto-value
schema (ID 2
)
that references your other.proto
schema:
[2]
A schema can exist in multiple subjects, so you must delete the schema in each subject where it occurs before the referenced schema can deleted. Here is how to determine exactly where a schema ID is used:
curl http://localhost:8081/schemas/ids/2/versions
The above command returns the following, showing that in this case there is only one schema that references other.proto
:
[{"subject":"testproto-value","version":1}]
Delete the top-level schema (testproto-value
) that is referencing other.proto
:
curl -X DELETE http://localhost:8081/subjects/testproto-value/versions/1
The expected output is simply 1
.
Now you can delete the referenced schema, and the command will succeed.
curl -X DELETE http://localhost:8081/subjects/other.proto/versions/1
The expected output is simply 1
.
Protobuf deserializer¶
Plug KafkaProtobufDeserializer
into KafkaConsumer
to receive messages of any Protobuf type from Kafka.
Tip
The examples below use the default hostname and port for the Kafka bootstrap server (localhost:9092
) and Schema Registry (localhost:8081
).
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializer");
props.put("schema.registry.url", "http://localhost:8081");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
String topic = "testproto";
final Consumer<String, Message> consumer = new KafkaConsumer<String, Message>(props);
consumer.subscribe(Arrays.asList(topic));
try {
while (true) {
ConsumerRecords<String, Message> records = consumer.poll(100);
for (ConsumerRecord<String, Message> record : records) {
System.out.printf("offset = %d, key = %s, value = %s \n", record.offset(), record.key(), record.value());
}
}
} finally {
consumer.close();
}
When deserializing a Protobuf payload, the KafkaProtobufDeserializer
can behave in these ways:
- If given a
specific.protobuf.key.type
orspecific.protobuf.value.type
, the deserializer uses the specified type to perform deserialization. - The previous configuration will not work for
RecordNameStrategy
when there is more than one Message in the Protobuf definition. In this case, you must also specify eitherjava_outer_classname
orjava_multiple_files = true
in the original Protobuf file. This allows the deserializer to derive the Java type from the schema to deserialize the Protobuf payload. - Finally, if no type is provided or no type can be derived, the deserializer uses the schema to return an instance of a Protobuf
DynamicMessage
.
Similar to how the Avro deserializer can return an instance of a specific Avro
record type or a GenericRecord
, the Protobuf deserializer can return an instance
of a specific Protobuf message type or a DynamicMessage
.
If the Protobuf deserializer cannot determine a specific type, then a generic type is returned.
One way to return a specific type is to use an explicit property.
For the Protobuf deserializer, you can configure the property
KafkaProtobufDeserializerConfig.SPECIFIC_PROTOBUF_VALUE_TYPE
or
KafkaProtobufDeserializerConfig.SPECIFIC_PROTOBUF_KEY_TYPE
.
In order to allow the Protobuf deserializer to work with topics with heterogeneous types,
you must provide additional information to the schema. Configure the deserializer with
derive.type=true
, and then specify either java_outer_classname
or java_multiple_files=true
in the schema.
Here is a summary of specific and generic return types for each schema format.
Avro | Protobuf | JSON Schema | |
---|---|---|---|
Specific type | Generated class that implements org.apache.avro.SpecificRecord | Generated class that extends com.google.protobuf.Message | Java class (that is compatible with Jackson serialization) |
Generic type | org.apache.avro.GenericRecord | com.google.protobuf.DynamicMessage | com.fasterxml.jackson.databind.JsonNode |
Configure expiry time for client-side schema caches¶
The following format-agnostic configuration options for cache expiry time are available on both the serializer and deserializer:
latest.cache.size
- The maximum size for caches holding latest schemaslatest.cache.ttl.sec
- The time to live (TTL) in seconds for caches holding latest schemas, or-1
for no TTL
Test drive Protobuf schema¶
To get started with Protobuf, you can use the command line producer and consumer for Protobuf. Similar to Avro, Protobuf defines both a binary serialization format and a JSON serialization format. This allows you to use JSON when human-readability is desired, and the more efficient binary format to store data in topics.
The command line producer and consumer are useful for understanding how the built-in Protobuf schema support works on Confluent Platform.
When you incorporate the serializer and deserializer into the code for your own producers and consumers, messages and associated schemas are processed the same way as they are on the console producers and consumers.
The suggested consumer commands include a flag to read --from-beginning
to
be sure you capture the messages even if you don’t run the consumer immediately
after running the producer. If you leave off the --from-beginning
flag, the
consumer will read only the last message produced during its current session.
The examples below include a few minimal configs. For full property references, see Configurations reference.
Prerequisites¶
- Prerequisites to run these examples are generally the same as those described for the Schema Registry Tutorial with the exception of Maven, which is not needed here. Also, Confluent Platform version 5.5.0 or later is required here.
- The following examples use the default Schema Registry URL value (
localhost:8081
). The examples show how to configure this inline by supplying the URL as an argument to the--property
flag in the command line arguments of the producer and consumer (--property schema.registry.url=<address of your schema registry>
). Alternatively, you could set this property in$CONFLUENT_HOME/etc/kafka/server.properties
, and not have to include it in the producer and consumer commands. For example:confluent.schema.registry.url=http://localhost:8081
- These examples make use of the
kafka-avro-console-producer
andkafka-avro-console-consumer
, which are located in$CONFLUENT_HOME/bin
.
Confluent Cloud prerequisites are:
- A Confluent Cloud account
- Permission to create a topic and schema in a cluster in Confluent Cloud
- Stream Governance Package enabled
- API key and secret for Confluent Cloud cluster (
$APIKEY
,$APISECRET
) - API key and secret for Schema Registry (
$SR_APIKEY
,$SR_APISECRET
) - Schema Registry endpoint URL (
$SCHEMA_REGISTRY_URL
) - Cluster ID (
$CLUSTER_ID
) - Schema registry cluster ID (
$SR_CLUSTER_ID
)
The examples assume that API keys, secrets, cluster IDs, and API endpoints are
stored in persistent environment variables wherever possible, and refer to them
as such. You can store these in shell variables if your setup is temporary. If
you want to return to this environment and cluster for future work, consider
storing them in a profile (such as .zsh
, .bashrc
, or powershell.exe
profiles).
The following steps provide guidelines on these prerequisites specific to these examples. To learn more general information, see Manage Clusters.
Log in to Confluent Cloud:
confluent login
Create a Kafka cluster in Confluent Cloud
confluent kafka cluster create <name> [flags]
For example:
confluent kafka cluster create quickstart_cluster --cloud "aws" --region "us-west-2"
Your output will include a cluster ID (in the form of
lkc-xxxxxx
), show the cluster name and cluster type (in this case, “Basic”), and endpoints. Take note of the cluster ID, and store it in an environment variable such as$CLUSTER_ID
.Get an API key and secret for the cluster:
confluent api-key create --resource $CLUSTER_ID
Store the API key and secret for your cluster in a safe place, such as shell environment variables:
$APIKEY
,$APISECRET
View Stream Governance packages and Schema Registry endpoint URL.
A Stream Governance package was enabled as a part of creating the environment.
To view governance packages, use the Confluent CLI command confluent environment list:
confluent environment list
Your output will show the environment ID, name, and associated Stream Governance packages.
To view the Stream Governance API endpoint URL, use the command confluent schema-registry cluster describe:
confluent schema-registry cluster describe
Your output will show the Schema Registry cluster ID in the form of
lsrc-xxxxxx
) and endpoint URL, which is also available to you in Cloud Console on the right side panel under “Stream Governance API” in the environment. Store these in environment variables:$SR_CLUSTER_ID
and$SCHEMA_REGISTRY_URL
.
Create a Schema Registry API key, using the Schema Registry cluster ID (
$SR_CLUSTER_ID
) from the previous step as the resource ID.confluent api-key create --resource $SR_CLUSTER_ID
Store the API key and secret for your Schema Registry in a safe place, such as shell environment variables:
$SR_APIKEY
and$SR_APISECRET
Create and use schemas¶
Start Confluent Platform using the following command:
confluent local services start
Tip
- Alternatively, you can simply run
confluent local services schema-registry start
which also startskafka
andzookeeper
as dependencies. This demo does not directly reference the other services, such as Connect and Control Center. That said, you may want to run the full stack anyway to further explore, for example, how the topics and messages display on Control Center. To learn more aboutconfluent local
, see Quick Start for Confluent Platform and confluent local in the Confluent CLI command reference. - The
confluent local
commands run in the background so you can re-use this command window. Separate sessions are required for the producer and consumer.
- Alternatively, you can simply run
Verify registered schema types.
Schema Registry supports arbitrary schema types. You should verify which schema types are currently registered with Schema Registry.
To do so, type the following command (assuming you use the default URL and port for Schema Registry,
localhost:8081
):curl http://localhost:8081/schemas/types
The response will be one or more of the following. If additional schema format plugins are installed, these will also be available.
["JSON", "PROTOBUF", "AVRO"]
Alternatively, use the curl
--silent
flag, and pipe the command through jq (curl --silent http://localhost:8081/schemas/types | jq
) to get nicely formatted output:"JSON", "PROTOBUF", "AVRO"
Use the producer to send Protobuf records in JSON as the message value.
The new topic,
transactions-proto
, will be created as a part of this producer command if it does not already exist. This command starts a producer, and creates a schema for the transactions-avro topic. The schema has two fields,id
andamount
.kafka-protobuf-console-producer --bootstrap-server localhost:9092 \ --property schema.registry.url=http://localhost:8081 --topic transactions-proto \ --property value.schema='syntax = "proto3"; message MyRecord { string id = 1; float amount = 2;}'
Tip
The producer does not show a
>
prompt, just a blank line at which to type producer messages.Type the following command in the shell, and hit return.
{ "id":"1000", "amount":500 }
The command line Protobuf producer will convert the JSON object to a Protobuf message (using the schema specified in
<value.schema>
) and then use an underlying serializer to serialize the message to the Kafka topictransactions-proto
.Open a new terminal window, and use the consumer to read from topic
transactions-proto
and get the value of the message in JSON.kafka-protobuf-console-consumer --bootstrap-server localhost:9092 --from-beginning --topic transactions-proto
You should see following in the console.
{"id":"1000","amount":500.0
The underlying deserializer will create a Protobuf message, and then serialize the message to a JSON format for readability.
Leave this consumer running.
Register a new schema version under the same subject by adding a new field,
customer_id
.Since the default subject level compatibility is BACKWARD, you must add the new field as “optional” in order for it to be compatible with the previous version.
Open a new terminal window (or use Ctrl+C to shut down the previous producer), and run the following command to start a new producer:
kafka-protobuf-console-producer --bootstrap-server localhost:9092 \ --property schema.registry.url=http://localhost:8081 --topic transactions-proto \ --property value.schema='syntax = "proto3"; message MyRecord {string id = 1; float amount = 2; string customer_id=3;}'
Type the following into this producer, and hit return:
{ "id":"1001", "amount":700, "customer_id":"1221"}
Return to your running consumer to read from topic
transactions-proto
and get the new message.You should see the new output added to the original.
{"id":"1000","amount":500.0} {"id":"1001","amount":700.0,"customerId":"1221"
(If by chance you closed the original consumer, just restart it using the same command shown in step 5.)
In another shell, use curl commands to examine the schema that was registered with Schema Registry.
curl http://localhost:8081/subjects/transactions-proto-value/versions/2/schema
Here is the expected output, showing the
id
andamount
fields added in version 1 of the schema:syntax = "proto3"; message MyRecord { string id = 1; float amount = 2; string customer_id = 3; }
To view version 2:
curl http://localhost:8081/subjects/transactions-proto-value/versions/2/schema
Output for version 2 will include the
customer_id
field:syntax = "proto3"; message MyRecord { string id = 1; float amount = 2; string customer_id = 3; }
Run this command to view the schema in more detail. (The command as shown is piped through
jq
withcurl
download messages suppressed for more readable output.)curl --silent -X GET http://localhost:8081/subjects/transactions-proto-value/versions/latest | jq
Your output should resemble:
"subject": "transactions-proto-value", "version": 2, "id": 3, "schemaType": "PROTOBUF", "schema": "syntax = \"proto3\";\n\nmessage MyRecord {\n string id = 1;\n float amount = 2;\n string customer_id = 3;\n}\n"
Use Confluent Control Center to examine schemas and messages.
Messages that were successfully produced also show on Control Center (http://localhost:9021/) in Topics > <topicName> > Messages. You may have to select a partition or jump to a timestamp to see messages sent earlier. (For timestamp, type in a number, which will default to partition
1/Partition: 0
, and press return. To get the message view shown here, select the cards icon on the upper right.)Schemas you create are available on the Schemas tab for the selected topic.
Run shutdown and cleanup tasks.
- You can stop the consumer and producer with Ctl-C in their respective command windows.
- To stop Confluent Platform, type
confluent local services stop
. - If you would like to clear out existing data (topics, schemas, and messages) before starting again with another test, type
confluent local destroy
.
Create a Kafka topic:
confluent kafka topic create transactions-protobuf --cluster $CLUSTER_ID
Copy the following schema and store it in a file called
schema.txt
:syntax = "proto3"; message MyRecord { string id = 1; float amount = 2; }
Run the following command to create a producer with the schema created in the previous step:
confluent kafka topic produce transactions-protobuf \ --cluster $CLUSTER_ID \ --schema "<full path to file>/schema.txt" --schema-registry-endpoint $SCHEMA_REGISTRY_URL \ --schema-registry-api-key $SR_APIKEY \ --schema-registry-api-secret $SR_APISECRET \ --api-key $APIKEY --api-secret $APISECRET \ --value-format "protobuf"
Your output should resemble:
Successfully registered schema with ID 100001 Starting Kafka Producer. Use Ctrl-C or Ctrl-D to exit.
Tip
- You must provide the full path to the schema file even if it resides in the current directory.
- The examples assume you are using the latest version of the Confluent CLI,
where the deprecated
--sr-endpoint
,--sr-api-key
, and--sr-api-secret
string have been superseded by the new--schema-registry-endpoint
,--schema-registry-api-key
, and--schema-registry-api-secret
, respectively. If the examples don’t work because these flags aren’t recognized, you must either update to the new CLI, or use the deprecated flags.
Type the following command in the shell, and hit return.
{ "id":"1000", "amount":500 }
Open another terminal and run a consumer to read from topic
transactions-protobuf
and get the value of the message in JSON:confluent kafka topic consume transactions-json \ --cluster $CLUSTER_ID \ --from-beginning \ --value-format "protobuf" \ --schema-registry-endpoint $SCHEMA_REGISTRY_URL \ --schema-registry-api-key $SR_APIKEY \ --schema-registry-api-secret $SR_APISECRET \ --api-key $APIKEY --api-secret $APISECRET
Your output should be:
{"id":"1000","amount":500}
Register a new schema version under the same subject by adding a new field,
customer_id
.syntax = "proto3"; message MyRecord { string id = 1; float amount = 2; string customer_id=3; }
Open another terminal, and run the following command:
confluent kafka topic produce transactions-protobuf \ --cluster $CLUSTER_ID \ --schema "<full path to file>/schema.txt" --schema-registry-endpoint $SCHEMA_REGISTRY_URL \ --schema-registry-api-key $SR_APIKEY \ --schema-registry-api-secret $SR_APISECRET \ --api-key $APIKEY --api-secret $APISECRET \ --value-format "protobuf"
Your output should resemble:
Successfully registered schema with ID 100002 Starting Kafka Producer. Use Ctrl-C or Ctrl-D to exit.
Type the following into your producer, and hit return:
{ "id":"1001", "amount":700, "customer_id":"1221"}
Switch to the terminal with your running consumer to read from topic
transactions-avro
and get the new message.You should see the new output added to the original.:
{"id":"1000","amount":500} {"id":"1001","amount":700,"customerId":"1221"}
(If by chance you closed the original consumer, just restart it using the same command shown in step 5.)
View the schemas that were registered with Schema Registry as versions 1 and 2.
confluent schema-registry schema describe --subject transactions-protobuf-value --version 1
Your output should be similar to the following, showing the
id
andamount
fields added in version 1 of the schema:Schema ID: 100001 Type: PROTOBUF Schema: syntax = "proto3"; message MyRecord { string id = 1; float amount = 2; }
To view version 2:
confluent schema-registry schema describe --subject transactions-protobuf-value --version 2
Output for version 2 will include the
customer_id
field:Schema ID: 100002 Type: PROTOBUF Schema: syntax = "proto3"; message MyRecord { string id = 1; float amount = 2; string customer_id = 3; }
Run shutdown and cleanup tasks.
- You can stop the consumer and producer with Ctl-C in their respective command windows.
- If you were using shell environment variables and want to keep them for later, remember to store them in a safe, persistent location.
- You can remove topics, clusters, and environments from the command line or from the Confluent Cloud Console.
Configurations reference¶
The following configuration properties are available for producers and consumers. These are not specific to a particular schema format, but applicable to any Kafka producers and consumers.
Adding security credentials¶
The test drive examples show how to use the producer and consumer console clients as serializers and deserializers by passing Schema Registry properties on the command line and in config files. In addition to examples given in the “Test Drives”, you can pass truststore and keystore credentials for the Schema Registry, as described in Additional configurations for HTTPS. Here is an example for the producer on Confluent Platform:
protobuf-console-producer --broker-list localhost:9093 --topic myTopic \
--producer.config ~/ect/kafka/producer.properties\
--property value.schema='syntax = "proto3"; message MyRecord {string id = 1; float amount = 2; string customer_id=3;}' \
--property schema.registry.url=https://localhost:8081 \
--property schema.registry.ssl.truststore.location=/etc/kafka/security/schema.registry.client.truststore.jks \
--property schema.registry.ssl.truststore.password=myTrustStorePassword
Kafka producer configurations¶
A complete reference of producer configuration properties is available in Kafka Producer Configurations.
Kafka consumer configurations¶
A complete reference of consumer configuration properties is available in Kafka Consumer Configurations.
Schema Registry configuration options¶
A complete reference for Schema Registry configuration is available in the Confluent Platform documentation at Schema Registry Configuration Options.
Using Schema Registry with Connect¶
If you are using serializers and deserializers with Kafka Connect, you will need information on key and value converters. To learn more, see Configuring key and value converters. in the Connect documentation.
Schema references in Protobuf¶
Confluent Platform provides full support for the notion of schema references, the ability of a schema to refer to other schemas.
Tip
Schema references are also supported in Confluent Cloud on Avro, Protobuf, and JSON Schema formats. On the Confluent CLI, you can use the --refs <file>
flag on confluent schema-registry schema create to reference another schema.
If you give the KafkaProtobufSerializer
an instance of a generated Protobuf
class, it can automatically register all referenced schemas.
The easiest way to manually register referenced schemas is with the Schema Registry Maven Plugin for Confluent Platform. Alternatively, you can use the Confluent Cloud APIs or Confluent Platform APIs to manually register referenced schemas.
This Protobuf example imports a schema for the order
subject (defined in the file Order.proto
) and its references for the product
and customer
subject.
<plugin>
<groupId>io.confluent</groupId>
<artifactId>kafka-schema-registry-maven-plugin</artifactId>
<version>5.5.0</version>
<configuration>
<schemaRegistryUrls>
<param>http://192.168.99.100:8081</param>
</schemaRegistryUrls>
<subjects>
<order>src/main/protobuf/Order.proto</order>
<product>src/main/protobuf/Product.proto</product>
<customer>src/main/protobuf/Customer.proto</customer>
</subjects>
<schemaTypes>
<order>PROTOBUF</order>
<product>PROTOBUF</product>
<customer>PROTOBUF</customer>
</schemaTypes>
<references>
<order>
<reference>
<name>Product.proto</name>
<subject>product</subject>
</reference>
<reference>
<name>Customer.proto</name>
<subject>customer</subject>
</reference>
</order>
</references>
</configuration>
<goals>
<goal>register</goal>
</goals>
</plugin>
The schema for the order
subject, defined in the Order.proto
file, might look like:
syntax = "proto3";
package io.confluent.examples.generated_sources.protobuf;
import "Product.proto";
import "Customer.proto";
message Order {
int32 order_id = 1;
string order_date = 2;
int32 order_amount = 3;
repeated Product products = 4;
Customer customer = 5;
}
And the schema for the customer
, defined in Customer.proto
, subject might look like:
syntax = "proto3";
package io.confluent.examples.generated_sources.protobuf;
message Customer {
int64 customer_id = 1;
string customer_name = 2;
string customer_email = 3;
string customer_address = 4;
}
For backward compatibility reasons, both "schemaType"
and "references"
are optional. If "schemaType"
is omitted, it is assumed to be AVRO.
To learn more about reusing existing Protobuf message definitions by importing (referencing) them, see Using Other Message Types in the Protobuf documentation.
Multiple event types in the same topic¶
In addition to providing a way for one schema to call other schemas, schema references can be used to efficiently combine multiple event types in the same topic and still maintain subject-topic constraints.
In Protobuf, this is accomplished as follows:
Use the default subject naming strategy,
TopicNameStrategy
, which uses the topic name to determine the subject to be used for schema lookups, and helps to enforce subject-topic constraints.Wrap a
oneOf
in a message to define a list of schema references, for example:syntax = "proto3"; package io.confluent.examples.proto; import "Customer.proto"; import "Product.proto"; import "Order.proto"; message AllTypes { oneof oneof_type { Customer customer = 1; Product product = 2; Order order = 3; } }
When the schema is registered, send an array of reference versions. For example:
[ { "name": "Customer.proto", "subject": "customer", "version": 1 }, { "name": "Product.proto", "subject": "product", "version": 1 }, { "name": "Order.proto", "subject": "order", "version": 1 } ]
Tip
- In Protobuf, top-level
oneOfs
are not permitted, which is why you must wrap theoneOf
in a message. - One advantage of wrapping the
oneOf
with a message is that auto-registration of the top-level schema will work properly by default (unlike Avro or JSON Schema, which require additional configuration for this use case). In the case of Protobuf, all referenced schemas will also be auto-registered, recursively. - By default,
oneOf
messages have their field names suffixed with an index (for example_0
). You can disable this suffix by configuringgenerate.index.for.unions
(added in Confluent Platform 7.3.0). To learn more about using this option, see Protobuf in Using Kafka Connect with Schema Registry.
- In Protobuf, top-level
Handling null values with wrappers¶
Protobuf does not allow setting null values on any object field. Null values will cause a null pointer exception.
As a workaround, you can import google/protobuf/wrappers.proto
and use the wrapper.for.nullables
converter, which specifies
whether or not to use Protobuf wrapper types for nullable fields. The default boolean value is false
. If set to true
, nullable fields use the wrapper types described on GitHub in
protocolbuffers/protobuf, and in the
google.protobuf package documentation.
For example, you can replace a non-nullable name that defaults to an empty string (""
):
message GreetRequest {
string name = 1;
}
with a nullable name, using a wrapper class from wrappers.proto
(in this case, StringValue):
message GreetRequest {
google.protobuf.StringValue name = 1;
}
In Confluent Platform, using the flag wrappers.for.nullable
in the Protobuf converter causes the following results:
- When converting from a Connect schema to a Protobuf schema, an optional Connect field converts to a Protobuf field using a wrapper class.
- When converting from Protobuf schema to Connect schema, a Protobuf field using a wrapper class converts to an optional Connect field.
To learn more, see Tomas Basham’s post on Protocol Buffers and Optional Fields, and the description of “Wrapper types” for null values in the table under JSON Mapping in the Protocol Buffers Developer Guide.
Tip
Alternatively, Protocol Buffers v3 (proto3) does support optional primitive fields, which are similar to “nullable” types. To learn more about this strategy, see the Google documentation on Optional Primitive Fields and Protocol Buffer types.
Protobuf schema compatibility rules¶
Compatibility rules support schema evolution and the ability of downstream consumers to handle data encoded with old and new schemas. There is some overlap in these rules across formats, especially for Protobuf and Avro, with the exception of Protobuf backward compatibility, which differs between the two.
In general, it’s recommended to use BACKWARD_TRANSITIVE with Protobuf, as adding new message types is not forward compatible.
For more detail on Protobuf compatibility rules, including backward compatibility, see:
Limitations and developer notes¶
- The console consumer does not currently support Protobuf
Any
type.Any
is an advanced Protobuf feature that requires the Protobuf type to be registered with a TypeRegistry. The ability to pass additional types to thekafka-protobuf-console-consumer
is not supported at this time. - The Confluent
kafka-protobuf-serializer
works with Google Protobuf v.3. Google Protobuf v.4 is currently not supported. - The “import public” feature of protobuf2 and protobuf3 is not supported in the Java, which means that it is not supported in the Schema Registry tools, so transitively included dependencies will not resolve as you might expect. This is not a limitation of Schema Registry, but a limitation of Protobuf itself that can look like a failure in Schema Registry.
- Do not upload Google/Protobuf dependencies into Schema Registry. Although you might naturally expect the tooling to automatically exclude any “dangerous” files which ought not be imported, this is not the case.