Looking for Schema Management Confluent Cloud docs? You are currently viewing Confluent Platform documentation. If you are looking for Confluent Cloud docs, check out Schema Management on Confluent Cloud.
Protobuf Schema Serializer and Deserializer¶
This document describes how to use Protocol Buffers (Protobuf) with the Apache Kafka® Java client and console tools.
Protobuf Serializer¶
Plug the KafkaProtobufSerializer
into KafkaProducer
to send messages of Protobuf type to Kafka.
When providing an instance of a Protobuf generated class to the serializer, the serializer can
register the Protobuf schema, and all referenced schemas. For referenced schemas,
by default the serializer will register each referenced schema under a subject with the same name
as the reference. For example, if the main schema references “google/protobuf/timestamp.proto
”,
then the timestamp schema will be registered under a subject named “google/protobuf/timestamp.proto
”.
This behavior can be customized by providing a custom implementation of ReferenceSubjectNameStrategy
to the serializer using the reference.subject.name.strategy
configuration, as described under
Configuration Details for schema formats.
The interface looks like this:
public interface ReferenceSubjectNameStrategy extends Configurable
{
/**
* For a given reference name, topic, and message, returns the subject name under which the
* referenced schema should be registered in the schema registry.
*
* @param refName The name of the reference.
* @param topic The Kafka topic name to which the message is being published.
* @param isKey True when encoding a message key, false for a message value.
* @param schema The referenced schema.
* @return The subject name under which the referenced schema should be registered.
*/
String subjectName(String refName, String topic, boolean isKey, ParsedSchema schema);
}
For example, suppose you have the following schema.
syntax = "proto3";
package com.acme;
import "other.proto";
message MyRecord {
string f1 = 1;
OtherRecord f2 = 2;
}
The above schema references other.proto
, which looks like this:
syntax = "proto3";
package com.acme;
message OtherRecord {
int32 other_id = 1;
}
The code below creates an instance of the MyRecord
class that is generated by the Protobuf compiler.
The Kafka producer is configured to serialize the MyRecord
instance with the Protobuf serializer.
Tip
The examples below use the default address and port for the Kafka bootstrap server (localhost:9092
) and Schema Registry (localhost:8081
).
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer");
props.put("schema.registry.url", "http://127.0.0.1:8081");
Producer<String, MyRecord> producer = new KafkaProducer<String, MyRecord>(props);
String topic = "testproto";
String key = "testkey";
OtherRecord otherRecord = OtherRecord.newBuilder()
.setOtherId(123).build();
MyRecord myrecord = MyRecord.newBuilder()
.setF1("value1").setF2(otherRecord).build();
ProducerRecord<String, MyRecord> record
= new ProducerRecord<String, MyRecord>(topic, key, myrecord);
producer.send(record).get();
producer.close();
While serializing the Protobuf instance to Kafka, the above code will
automatically register two schemas to Schema Registry, one for MyRecord
and
another for OtherRecord
, to two different subjects. (The default behavior of
automatically registering schemas can be disabled by passing the property
auto.register.schemas=false
to the serializer).
Now, use the REST endpoints to examine the schemas that were registered. First, you can see which subjects were used by using the following command.
curl http://localhost:8081/subjects
Here is the expected output:
["testproto-value", "other.proto"]
The subject for the top-level schema is determined by a SubjectNameStrategy
,
which defaults to suffixing the topic with either -key
or -value
. The
subjects for referenced schemas are determined by a ReferenceSubjectNameStrategy
,
which defaults to the name used in the import statement. Both strategies can be customized.
In the schemas below, note that the new schemaType
field that, which is added for Confluent Platform 5.5.
Also, the top-level schema has a new references
field that refers to other.proto
.
Type the following command to view the testproto-value
schema.
curl http://localhost:8081/subjects/testproto-value/versions/1
Here is the expected output:
{
"subject": "testproto-value",
"version": 1,
"id": 2,
"schemaType": "PROTOBUF",
"references": [
{
"name": "other.proto",
"subject": "other.proto",
"version": 1
}
],
"schema": "syntax = \"proto3\";\npackage com.acme;\n\nimport \"other.proto\";\n\nmessage MyRecord {\n string f1 = 1;\n .com.acme.OtherRecord f2 = 2;\n}\n"
}
Note that the testproto-value
schema ID is 2
, and the other.proto
schema ID is 1
.
Type the following command to view the other.proto
schema.
curl http://localhost:8081/subjects/other.proto/versions/1
Here is the expected output:
{
"subject": "other.proto",
"version": 1,
"id": 1,
"schemaType": "PROTOBUF",
"schema": "syntax = \"proto3\";\npackage com.acme;\n\nmessage OtherRecord {\n int32 other_id = 1;\n}\n"
}
Now, try to delete the referenced schema, other.proto
.
curl -X DELETE http://localhost:8081/subjects/other.proto/versions/1
You will get the following error because Schema Registry prevents you from creating a dangling reference, which would be the case if the referenced schema was deleted before the top-level schema.
{"error_code":42206,"message":"One or more references exist to the schema {magic=1,keytype=SCHEMA,subject=other.proto,version=1}."}
You know that the schema with ID 2
references the schema you just tried to delete,
but you can also get that information from Schema Registry directly with this command:
curl http://localhost:8081/subjects/other.proto/versions/1/referencedby
The above command returns the ID of the testproto-value
schema (ID 2
)
that references your other.proto
schema:
[2]
A schema can exist in multiple subjects, so you must delete the schema in each subject where it occurs before the referenced schema can deleted. Here is how to determine exactly where a schema ID is used:
curl http://localhost:8081/schemas/ids/2/versions
The above command returns the following, showing that in this case there is only one schema that references other.proto
:
[{"subject":"testproto-value","version":1}]
Delete the top-level schema (testproto-value
) that is referencing other.proto
:
curl -X DELETE http://localhost:8081/subjects/testproto-value/versions/1
The expected output is simply 1
.
Now you can delete the referenced schema, and the command will succeed.
curl -X DELETE http://localhost:8081/subjects/other.proto/versions/1
The expected output is simply 1
.
Protobuf Deserializer¶
Plug KafkaProtobufDeserializer
into KafkaConsumer
to receive messages of any Protobuf type from Kafka.
Tip
The examples below use the default address and port for the Kafka bootstrap server (localhost:9092
) and Schema Registry (localhost:8081
).
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializer");
props.put("schema.registry.url", "http://localhost:8081");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
String topic = "testproto";
final Consumer<String, Message> consumer = new KafkaConsumer<String, Message>(props);
consumer.subscribe(Arrays.asList(topic));
try {
while (true) {
ConsumerRecords<String, Message> records = consumer.poll(100);
for (ConsumerRecord<String, Message> record : records) {
System.out.printf("offset = %d, key = %s, value = %s \n", record.offset(), record.key(), record.value());
}
}
} finally {
consumer.close();
}
When deserializing a Protobuf payload, the KafkaProtobufDeserializer
can behave in these ways:
- If given a
specific.protobuf.key.type
orspecific.protobuf.value.type
, the deserializer uses the specified type to perform deserialization. - The previous configuration will not work for
RecordNameStrategy
, where more than one type of Protobufderive.type=true
. In this case, you must also specify eitherjava_outer_classname
orjava_multiple_files = true
in the original Protobuf file. This allows the deserializer to derive the Java type from the schema to deserialize the Protobuf payload. - Finally, if no type is provided or no type can be derived, the deserializer uses the schema to return an instance of a Protobuf
DynamicMessage
.
Similar to how the Avro deserializer can return an instance of a specific Avro
record type or a GenericRecord
, the Protobuf deserializer can return an instance
of a specific Protobuf message type or a DynamicMessage
.
If the Protobuf deserializer cannot determine a specific type, then a generic type is returned.
One way to return a specific type is to use an explicit property.
For the Protobuf deserializer, you can configure the property
KafkaProtobufDeserializerConfig.SPECIFIC_PROTOBUF_VALUE_TYPE
or
KafkaProtobufDeserializerConfig.SPECIFIC_PROTOBUF_KEY_TYPE
.
In order to allow the Protobuf deserializer to work with topics with heterogeneous types,
you must provide additional information to the schema. Configure the deserializer with
derive_type=true
, and then specify either java_outer_classname
or java_multiple_files=true
in the schema.
Here is a summary of specific and generic return types for each schema format.
Avro | Protobuf | JSON Schema | |
---|---|---|---|
Specific type | Generated class that implements org.apache.avro.SpecificRecord | Generated class that extends com.google.protobuf.Message | Java class (that is compatible with Jackson serialization) |
Generic type | org.apache.avro.GenericRecord | com.google.protobuf.DynamicMessage | com.fasterxml.jackson.databind.JsonNode |
Test Drive Protobuf Schema¶
To get started with Protobuf, you can use the command line producer and consumer for Protobuf. Similar to Avro, Protobuf defines both a binary serialization format and a JSON serialization format. This allows you to use JSON when human-readability is desired, and the more efficient binary format to store data in topics.
Note
- Prerequisites to run these examples are generally the same as those described for the Schema Registry Tutorial with the exception of Maven, which is not needed here. Also, Confluent Platform version 5.5.0 or later is required here.
- The following examples use the default Schema Registry URL value (
localhost:8081
). The examples show how to configure this inline by supplying the URL as an argument to the--property
flag in the command line arguments of the producer and consumer (--property schema.registry.url=<address of your schema registry>
). Alternatively, you could set this property in$CONFLUENT_HOME/etc/kafka/server.properties
, and not have to include it in the producer and consumer commands. For example:confluent.schema.registry.url=http://localhost:8081
These examples make use of the kafka-protobuf-console-producer
and kafka-protobuf-console-consumer
, which are located in $CONFLUENT_HOME/bin
.
The command line producer and consumer are useful for understanding how the built-in Protobuf schema support works on Confluent Platform.
When you incorporate the serializer and deserializer into the code for your own producers and consumers, messages and associated schemas are processed the same way as they are on the console producers and consumers.
The suggested consumer commands include a flag to read --from-beginning
to
be sure you capture the messages even if you don’t run the consumer immediately
after running the producer. If you leave off the --from-beginning
flag, the
consumer will read only the last message produced during its current session.
Start Confluent Platform using the following command:
confluent local services start
Tip
- Alternatively, you can simply run
confluent local schema-registry
which also startskafka
andzookeeper
as dependencies. This demo does not directly reference the other services, such as Connect and Control Center. That said, you may want to run the full stack anyway to further explore, for example, how the topics and messages display on Control Center. To learn more aboutconfluent local
, see Quick Start for Confluent Platform (Local install) and confluent local in the Confluent CLI command reference. - The
confluent local
commands run in the background so you can re-use this command window. Separate sessions are required for the producer and consumer.
- Alternatively, you can simply run
Verify registered schema types.
Starting with Confluent Platform 5.5.0, Schema Registry now supports arbitrary schema types. You should verify which schema types are currently registered with Schema Registry.
To do so, type the following command (assuming you use the default URL and port for Schema Registry,
localhost:8081
):curl http://localhost:8081/schemas/types
The response will be one or more of the following. If additional schema format plugins are installed, these will also be available.
["JSON", "PROTOBUF", "AVRO"]
Alternatively, use the curl
--silent
flag, and pipe the command through jq (curl --silent http://localhost:8081/schemas/types | jq
) to get nicely formatted output:"JSON", "PROTOBUF", "AVRO"
Use the producer to send Protobuf records in JSON as the message value.
The new topic,
t1-p
, will be created as a part of this producer command if it does not already exist.kafka-protobuf-console-producer --broker-list localhost:9092 \ --property schema.registry.url=http://localhost:8081 --topic t1-p \ --property value.schema='syntax = "proto3"; message MyRecord { string f1 = 1; }'
Tip
The current Protobuf specific producer does not show a
>
prompt, just a blank line at which to type producer messages.Type the following command in the shell, and hit return.
{"f1": "value1-p"}
The command line Protobuf producer will convert the JSON object to a Protobuf message (using the schema specified in
<value.schema>
) and then use an underlying serializer to serialize the message to the Kafka topict1-p
.Use the consumer to read from topic
t1-p
and get the value of the message in JSON.kafka-protobuf-console-consumer --bootstrap-server localhost:9092 --from-beginning --topic t1-p --property schema.registry.url=http://localhost:8081
You should see following in the console.
{"f1": "value1-p"}
The underlying deserializer will create a Protobuf message, and then serialize the message to a JSON format for readability.
In another shell, use curl commands to examine the schema that was registered with Schema Registry.
curl http://localhost:8081/subjects/t1-p-value/versions/1/schema
Here is the expected output:
syntax = "proto3"; message MyRecord { string f1 = 1; }
Run this curl command to view the schema in more detail. For more readable output, you can pipe it through jq (with
curl
download messages suppressed).curl --silent -X GET http://localhost:8081/subjects/t1-p-value/versions/latest | jq
Here is the expected output of the above command:
"subject": "t1-p-value", "version": 1, "id": 21, "schemaType": "PROTOBUF", "schema": "syntax = \"proto3\";\n\nmessage MyRecord {\n string f1 = 1;\n}\n"}
Use Confluent Control Center to examine schemas and messages.
Messages that were successfully produced also show on Control Center (http://localhost:9021/) in Topics > <topicName> > Messages. You may have to select a partition or jump to a timestamp to see messages sent earlier. (For timestamp, type in a number, which will default to partition
1/Partition: 0
, and press return. To get the message view shown here, select the cards icon on the upper right.)Schemas you create are available on the Schemas tab for the selected topic.
Run shutdown and cleanup tasks.
- You can stop the consumer and producer with Ctl-C in their respective command windows.
- To stop Confluent Platform, type
confluent local stop
. - If you would like to clear out existing data (topics, schemas, and messages) before starting again with another test, type
confluent local destroy
.
Schema References in Protobuf¶
Confluent Platform provides full support for the notion of schema references, the ability of a schema to refer to other schemas.
Tip
Schema references are also supported in Confluent Cloud on Avro, Protobuf, and JSON Schema formats. On the Confluent Cloud CLI, you can use the --refs <file>
flag on ccloud schema-registry schema create to reference another schema.
If you give the KafkaProtobufSerializer
an instance of a generated Protobuf
class, it can automatically register all referenced schemas.
The easiest way to manually register referenced schemas is with the Schema Registry Maven Plugin. Alternatively, you can use the REST APIs to manually register referenced schemas.
This Protobuf example imports a schema for the order
subject and its references for the product
and customer
subject.
<plugin>
<groupId>io.confluent</groupId>
<artifactId>kafka-schema-registry-maven-plugin</artifactId>
<version>5.5.0</version>
<configuration>
<schemaRegistryUrls>
<param>http://192.168.99.100:8081</param>
</schemaRegistryUrls>
<subjects>
<order>src/main/protobuf/Order.proto</order>
<product>src/main/protobuf/Product.proto</product>
<customer>src/main/protobuf/Customer.proto</customer>
</subjects>
<schemaTypes>
<order>PROTOBUF</order>
<product>PROTOBUF</product>
<customer>PROTOBUF</customer>
</schemaTypes>
<references>
<order>
<reference>
<name>Product.proto</name>
<subject>product</subject>
</reference>
<reference>
<name>Customer.proto</name>
<subject>customer</subject>
</reference>
</order>
</references>
</configuration>
<goals>
<goal>register</goal>
</goals>
</plugin>
The schema for the order
subject might look like:
syntax = "proto3";
package io.confluent.examples.generated_sources.protobuf;
import "Product.proto";
import "Customer.proto";
message Order {
int32 order_id = 1;
string order_date = 2;
int32 order_amount = 3;
repeated Product products = 4;
Customer customer = 5;
}
And the schema for the customer
subject might look like:
syntax = "proto3";
package io.confluent.examples.generated_sources.protobuf;
message Customer {
int64 customer_id = 1;
string customer_name = 2;
string customer_email = 3;
string customer_address = 4;
}
For backward compatibility reasons, both "schemaType"
and "references"
are optional. If "schemaType"
is omitted, it is assumed to be AVRO.
Multiple Event Types in the Same Topic¶
In addition to providing a way for one schema to call other schemas, schema references can be used to efficiently combine multiple event types in the same topic and still maintain subject-topic constraints.
In Protobuf, this is accomplished as follows:
Use the default subject naming strategy,
TopicNameStrategy
, which uses the topic name to determine the subject to be used for schema lookups, and helps to enforce subject-topic constraints.Wrap a
oneOf
in a message to define a list of schema references, for example:syntax = "proto3"; package io.confluent.examples.proto; import "Customer.proto"; import "Product.proto"; import "Order.proto"; message AllTypes { oneof oneof_type { Customer customer = 1; Product product = 2; Order order = 3; } }
When the schema is registered, send an array of reference versions. For example:
[ { "name": "Customer.proto", "subject": "customer", "version": 1 }, { "name": "Product.proto", "subject": "product", "version": 1 }, { "name": "Order.proto", "subject": "order", "version": 1 } ]
Tip
- In Protobuf, top-level
oneOfs
are not permitted, which is why you must wrap theoneOf
in a message. - One advantage of wrapping the
oneOf
with a message is that auto-registration of the top-level schema will work properly by default (unlike Avro or JSON Schema, which require additional configuration for this use case). In the case of Protobuf, all referenced schemas will also be auto-registered, recursively.
- In Protobuf, top-level
Handling Null Values with Wrappers¶
Protobuf does not allow setting null values on any object field. Null values will cause a null pointer exception.
As a workaround, you can import google/protobuf/wrappers.proto
and use the wrapper.for.nullables
converter, which specifies
whether or not to use Protobuf wrapper types for nullable fields. The default boolean value is false
. If set to true
, nullable fields use the wrapper types described on GitHub in
protocolbuffers/protobuf, and in the
google.protobuf package documentation.
For example, you can replace a non-nullable name that defaults to an empty string (""
):
message GreetRequest {
string name = 1;
}
with a nullable name, using a wrapper class from wrappers.proto
(in this case, StringValue):
message GreetRequest {
google.protobuf.StringValue name = 1;
}
In Confluent Platform, using the flag wrappers.for.nullable
in the Protobuf converter causes the following results:
- When converting from a Connect schema to a Protobuf schema, an optional Connect field converts to a Protobuf field using a wrapper class.
- When converting from Protobuf schema to Connect schema, a Protobuf field using a wrapper class converts to an optional Connect field.
To learn more, see Tomas Basham’s post on Protocol Buffers and Optional Fields, and the description of “Wrapper types” for null values in the table under JSON Mapping in the Protocol Buffers Developer Guide.
Protobuf Schema Compatibility Rules¶
Compatibility rules support schema evolution and the ability of downstream consumers to handle data encoded with old and new schemas. There is some overlap in these rules across formats, especially for Protobuf and Avro, with the exception of Protobuf backward compatibility, which differs between the two.
For more detail on Protobuf compatibility rules, including backward compatibility, see Compatibility Checks in the overview.
Suggested Reading¶
- Examples of Kafka client producers and consumers, with and without Avro, are documented at Code Examples for Apache Kafka.
- Schema Registry API Reference
- Schema Registry API Usage Examples
- Quick Start for Schema Management on Confluent Cloud
- Protocol Buffers (Protobuf) Developer’s Guide
- Blog post: Getting Started with Protobuf in Confluent Cloud
- Blog post: Understanding Protobuf Compatibility