Protobuf Schema Serializer and Deserializer for Schema Registry on Confluent Platform

This document describes how to use Protocol Buffers (Protobuf) with the Apache Kafka® Java client and console tools.

The Confluent Schema Registry based Protobuf serializer, by design, does not include the message schema; but rather, includes the schema ID (in addition to a magic byte) followed by message indexes, and finally the normal binary encoding of the data in the protobuf-payload. You can choose whether or not to embed a schema inline; allowing for cases where you may want to communicate the schema offline, with headers, or some other way. This is in contrast to other systems, such as Hadoop, that always include the schema with the message data. To learn more, see Wire format.

Protobuf serializer

Plug the KafkaProtobufSerializer into KafkaProducer to send messages of Protobuf type to Kafka.

When providing an instance of a Protobuf generated class to the serializer, the serializer can register the Protobuf schema, and all referenced schemas. For referenced schemas, by default the serializer will register each referenced schema under a subject with the same name as the reference. For example, if the main schema references “google/protobuf/timestamp.proto”, then the timestamp schema will be registered under a subject named “google/protobuf/timestamp.proto”. This behavior can be customized by providing a custom implementation of ReferenceSubjectNameStrategy to the serializer using the reference.subject.name.strategy configuration, as described under Configuration details for schema formats.

The interface looks like this:

public interface ReferenceSubjectNameStrategy extends Configurable
{
 /**
  * For a given reference name, topic, and message, returns the subject name under which the
  * referenced schema should be registered in the schema registry.
  *
  * @param refName The name of the reference.
  * @param topic The Kafka topic name to which the message is being published.
  * @param isKey True when encoding a message key, false for a message value.
  * @param schema The referenced schema.
  * @return The subject name under which the referenced schema should be registered.
  */
 String subjectName(String refName, String topic, boolean isKey, ParsedSchema schema);
}
Copy

Two implementations are available for the above interface:

  1. io.confluent.kafka.serializers.subject.DefaultReferenceSubjectNameStrategy - This is the default. It simply returns the reference name as the subject name.
  2. io.confluent.kafka.serializers.subject.QualifiedReferenceSubjectNameStrategy - Given a reference name, replace slashes with dots, and remove the .proto suffix to obtain the subject name. For example, mypackage/myfile.proto becomes mypackage.myfile.

For example, suppose you have the following schema.

syntax = "proto3";
package com.acme;

import "other.proto";

message MyRecord {
  string f1 = 1;
  OtherRecord f2 = 2;
}
Copy

The above schema references other.proto, which looks like this:

syntax = "proto3";
package com.acme;

message OtherRecord {
  int32 other_id = 1;
}
Copy

The code below creates an instance of the MyRecord class that is generated by the Protobuf compiler. The Kafka producer is configured to serialize the MyRecord instance with the Protobuf serializer.

Tip

The examples below use the default hostname and port for the Kafka bootstrap server (localhost:9092) and Schema Registry (localhost:8081).

Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
  "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
  "io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer");
props.put("schema.registry.url", "http://127.0.0.1:8081");

Producer<String, MyRecord> producer = new KafkaProducer<String, MyRecord>(props);

String topic = "testproto";
String key = "testkey";
OtherRecord otherRecord = OtherRecord.newBuilder()
  .setOtherId(123).build();
MyRecord myrecord = MyRecord.newBuilder()
  .setF1("value1").setF2(otherRecord).build();

ProducerRecord<String, MyRecord> record
      = new ProducerRecord<String, MyRecord>(topic, key, myrecord);
producer.send(record).get();
producer.close();
Copy

While serializing the Protobuf instance to Kafka, the above code will automatically register two schemas to Schema Registry, one for MyRecord and another for OtherRecord, to two different subjects. (The default behavior of automatically registering schemas can be disabled by passing the property auto.register.schemas=false to the serializer).

Now, use the REST endpoints to examine the schemas that were registered. First, you can see which subjects were used by using the following command.

curl http://localhost:8081/subjects
Copy

Here is the expected output:

["testproto-value", "other.proto"]
Copy

The subject for the top-level schema is determined by a SubjectNameStrategy, which defaults to suffixing the topic with either -key or -value. The subjects for referenced schemas are determined by a ReferenceSubjectNameStrategy, which defaults to the name used in the import statement. Both strategies can be customized.

In the schemas below, note that the new schemaType field that, which is added for Confluent Platform 5.5. Also, the top-level schema has a new references field that refers to other.proto.

Type the following command to view the testproto-value schema.

curl http://localhost:8081/subjects/testproto-value/versions/1
Copy

Here is the expected output:

{
  "subject": "testproto-value",
  "version": 1,
  "id": 2,
  "schemaType": "PROTOBUF",
  "references": [
    {
      "name": "other.proto",
      "subject": "other.proto",
      "version": 1
    }
  ],
  "schema": "syntax = \"proto3\";\npackage com.acme;\n\nimport \"other.proto\";\n\nmessage MyRecord {\n  string f1 = 1;\n  .com.acme.OtherRecord f2 = 2;\n}\n"
}
Copy

Note that the testproto-value schema ID is 2, and the other.proto schema ID is 1.

Type the following command to view the other.proto schema.

curl http://localhost:8081/subjects/other.proto/versions/1
Copy

Here is the expected output:

{
"subject": "other.proto",
"version": 1,
"id": 1,
"schemaType": "PROTOBUF",
"schema": "syntax = \"proto3\";\npackage com.acme;\n\nmessage OtherRecord {\n  int32 other_id = 1;\n}\n"
}
Copy

Now, try to delete the referenced schema, other.proto.

curl -X DELETE http://localhost:8081/subjects/other.proto/versions/1
Copy

You will get the following error because Schema Registry prevents you from creating a dangling reference, which would be the case if the referenced schema was deleted before the top-level schema.

{"error_code":42206,"message":"One or more references exist to the schema {magic=1,keytype=SCHEMA,subject=other.proto,version=1}."}
Copy

You know that the schema with ID 2 references the schema you just tried to delete, but you can also get that information from Schema Registry directly with this command:

curl http://localhost:8081/subjects/other.proto/versions/1/referencedby
Copy

The above command returns the ID of the testproto-value schema (ID 2) that references your other.proto schema:

[2]
Copy

A schema can exist in multiple subjects, so you must delete the schema in each subject where it occurs before the referenced schema can deleted. Here is how to determine exactly where a schema ID is used:

curl http://localhost:8081/schemas/ids/2/versions
Copy

The above command returns the following, showing that in this case there is only one schema that references other.proto:

[{"subject":"testproto-value","version":1}]
Copy

Delete the top-level schema (testproto-value) that is referencing other.proto:

curl -X DELETE http://localhost:8081/subjects/testproto-value/versions/1
Copy

The expected output is simply 1.

Now you can delete the referenced schema, and the command will succeed.

curl -X DELETE http://localhost:8081/subjects/other.proto/versions/1
Copy

The expected output is simply 1.

Protobuf deserializer

Plug KafkaProtobufDeserializer into KafkaConsumer to receive messages of any Protobuf type from Kafka.

Tip

The examples below use the default hostname and port for the Kafka bootstrap server (localhost:9092) and Schema Registry (localhost:8081).

Properties props = new Properties();

props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializer");
props.put("schema.registry.url", "http://localhost:8081");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

String topic = "testproto";
final Consumer<String, Message> consumer = new KafkaConsumer<String, Message>(props);
consumer.subscribe(Arrays.asList(topic));

try {
  while (true) {
    ConsumerRecords<String, Message> records = consumer.poll(100);
    for (ConsumerRecord<String, Message> record : records) {
      System.out.printf("offset = %d, key = %s, value = %s \n", record.offset(), record.key(), record.value());
    }
  }
} finally {
  consumer.close();
}
Copy

When deserializing a Protobuf payload, the KafkaProtobufDeserializer can behave in these ways:

  • If given a specific.protobuf.key.type or specific.protobuf.value.type, the deserializer uses the specified type to perform deserialization.
  • The previous configuration will not work for RecordNameStrategy when there is more than one Message in the Protobuf definition. In this case, you must also specify either java_outer_classname or java_multiple_files = true in the original Protobuf file. This allows the deserializer to derive the Java type from the schema to deserialize the Protobuf payload.
  • Finally, if no type is provided or no type can be derived, the deserializer uses the schema to return an instance of a Protobuf DynamicMessage.

Similar to how the Avro deserializer can return an instance of a specific Avro record type or a GenericRecord, the Protobuf deserializer can return an instance of a specific Protobuf message type or a DynamicMessage.

If the Protobuf deserializer cannot determine a specific type, then a generic type is returned.

One way to return a specific type is to use an explicit property. For the Protobuf deserializer, you can configure the property KafkaProtobufDeserializerConfig.SPECIFIC_PROTOBUF_VALUE_TYPE or KafkaProtobufDeserializerConfig.SPECIFIC_PROTOBUF_KEY_TYPE.

In order to allow the Protobuf deserializer to work with topics with heterogeneous types, you must provide additional information to the schema. Configure the deserializer with derive.type=true, and then specify either java_outer_classname or java_multiple_files=true in the schema.

Here is a summary of specific and generic return types for each schema format.


Avro Protobuf JSON Schema
Specific type Generated class that implements org.apache.avro.SpecificRecord Generated class that extends com.google.protobuf.Message Java class (that is compatible with Jackson serialization)
Generic type org.apache.avro.GenericRecord com.google.protobuf.DynamicMessage com.fasterxml.jackson.databind.JsonNode

Configure expiry time for client-side schema caches

The following format-agnostic configuration options for cache expiry time are available on both the serializer and deserializer:

  • latest.cache.size - The maximum size for caches holding latest schemas
  • latest.cache.ttl.sec - The time to live (TTL) in seconds for caches holding latest schemas, or -1 for no TTL

Test drive Protobuf schema

To get started with Protobuf, you can use the command line producer and consumer for Protobuf. Similar to Avro, Protobuf defines both a binary serialization format and a JSON serialization format. This allows you to use JSON when human-readability is desired, and the more efficient binary format to store data in topics.

The command line producer and consumer are useful for understanding how the built-in Protobuf schema support works on Confluent Platform.

When you incorporate the serializer and deserializer into the code for your own producers and consumers, messages and associated schemas are processed the same way as they are on the console producers and consumers.

The suggested consumer commands include a flag to read --from-beginning to be sure you capture the messages even if you don’t run the consumer immediately after running the producer. If you leave off the --from-beginning flag, the consumer will read only the last message produced during its current session.

The examples below include a few minimal configs. For full property references, see Configurations reference.

Prerequisites

  • Prerequisites to run these examples are generally the same as those described for the Schema Registry Tutorial with the exception of Maven, which is not needed here. Also, Confluent Platform version 5.5.0 or later is required here.
  • The following examples use the default Schema Registry URL value (localhost:8081). The examples show how to configure this inline by supplying the URL as an argument to the --property flag in the command line arguments of the producer and consumer (--property schema.registry.url=<address of your schema registry>). Alternatively, you could set this property in $CONFLUENT_HOME/etc/kafka/server.properties, and not have to include it in the producer and consumer commands. For example: confluent.schema.registry.url=http://localhost:8081
  • These examples make use of the kafka-avro-console-producer and kafka-avro-console-consumer, which are located in $CONFLUENT_HOME/bin.

Create and use schemas

  1. Start Confluent Platform using the following command:

    confluent local services start
    
    Copy

    Tip

    • Alternatively, you can simply run confluent local services schema-registry start which also starts kafka and zookeeper as dependencies. This demo does not directly reference the other services, such as Connect and Control Center. That said, you may want to run the full stack anyway to further explore, for example, how the topics and messages display on Control Center. To learn more about confluent local, see Quick Start for Confluent Platform and confluent local in the Confluent CLI command reference.
    • The confluent local commands run in the background so you can re-use this command window. Separate sessions are required for the producer and consumer.
  2. Verify registered schema types.

    Schema Registry supports arbitrary schema types. You should verify which schema types are currently registered with Schema Registry.

    To do so, type the following command (assuming you use the default URL and port for Schema Registry, localhost:8081):

    curl http://localhost:8081/schemas/types
    
    Copy

    The response will be one or more of the following. If additional schema format plugins are installed, these will also be available.

    ["JSON", "PROTOBUF", "AVRO"]
    
    Copy

    Alternatively, use the curl --silent flag, and pipe the command through jq (curl --silent http://localhost:8081/schemas/types | jq) to get nicely formatted output:

    "JSON",
    "PROTOBUF",
    "AVRO"
    
    Copy
  3. Use the producer to send Protobuf records in JSON as the message value.

    The new topic, transactions-proto, will be created as a part of this producer command if it does not already exist. This command starts a producer, and creates a schema for the transactions-avro topic. The schema has two fields, id and amount.

    kafka-protobuf-console-producer --bootstrap-server localhost:9092 \
    --property schema.registry.url=http://localhost:8081 --topic transactions-proto \
    --property value.schema='syntax = "proto3"; message MyRecord { string id = 1; float amount = 2;}'
    
    Copy

    Tip

    The producer does not show a > prompt, just a blank line at which to type producer messages.

  4. Type the following command in the shell, and hit return.

    { "id":"1000", "amount":500 }
    
    Copy

    The command line Protobuf producer will convert the JSON object to a Protobuf message (using the schema specified in <value.schema>) and then use an underlying serializer to serialize the message to the Kafka topic transactions-proto.

  5. Open a new terminal window, and use the consumer to read from topic transactions-proto and get the value of the message in JSON.

    kafka-protobuf-console-consumer --bootstrap-server localhost:9092 --from-beginning --topic transactions-proto
    
    Copy

    You should see following in the console.

    {"id":"1000","amount":500.0
    
    Copy

    The underlying deserializer will create a Protobuf message, and then serialize the message to a JSON format for readability.

    Leave this consumer running.

  6. Register a new schema version under the same subject by adding a new field, customer_id.

    Since the default subject level compatibility is BACKWARD, you must add the new field as “optional” in order for it to be compatible with the previous version.

    Open a new terminal window (or use Ctrl+C to shut down the previous producer), and run the following command to start a new producer:

    kafka-protobuf-console-producer --bootstrap-server localhost:9092 \
    --property schema.registry.url=http://localhost:8081 --topic transactions-proto \
    --property value.schema='syntax = "proto3"; message MyRecord {string id = 1; float amount = 2; string customer_id=3;}'
    
    Copy
  7. Type the following into this producer, and hit return:

    { "id":"1001", "amount":700, "customer_id":"1221"}
    
    Copy
  8. Return to your running consumer to read from topic transactions-proto and get the new message.

    You should see the new output added to the original.

    {"id":"1000","amount":500.0}
    {"id":"1001","amount":700.0,"customerId":"1221"
    
    Copy

    (If by chance you closed the original consumer, just restart it using the same command shown in step 5.)

  9. In another shell, use curl commands to examine the schema that was registered with Schema Registry.

    curl http://localhost:8081/subjects/transactions-proto-value/versions/2/schema
    
    Copy

    Here is the expected output, showing the id and amount fields added in version 1 of the schema:

    syntax = "proto3";
    
    message MyRecord {
      string id = 1;
      float amount = 2;
      string customer_id = 3;
    }
    
    Copy

    To view version 2:

    curl http://localhost:8081/subjects/transactions-proto-value/versions/2/schema
    
    Copy

    Output for version 2 will include the customer_id field:

    syntax = "proto3";
    
    message MyRecord {
      string id = 1;
      float amount = 2;
      string customer_id = 3;
    }
    
    Copy
  10. Run this command to view the schema in more detail. (The command as shown is piped through jq with curl download messages suppressed for more readable output.)

    curl --silent -X GET http://localhost:8081/subjects/transactions-proto-value/versions/latest | jq
    
    Copy

    Your output should resemble:

    "subject": "transactions-proto-value",
    "version": 2,
    "id": 3,
    "schemaType": "PROTOBUF",
    "schema": "syntax = \"proto3\";\n\nmessage MyRecord {\n  string id = 1;\n  float amount = 2;\n  string customer_id = 3;\n}\n"
    
    Copy
  11. Use Confluent Control Center to examine schemas and messages.

    Messages that were successfully produced also show on Control Center (http://localhost:9021/) in Topics > <topicName> > Messages. You may have to select a partition or jump to a timestamp to see messages sent earlier. (For timestamp, type in a number, which will default to partition 1/Partition: 0, and press return. To get the message view shown here, select the cards icon on the upper right.)

    ../../../_images/serdes-protobuf-c3-messages.png

    Schemas you create are available on the Schemas tab for the selected topic.

    ../../../_images/serdes-protobuf-c3-schema.png
  12. Run shutdown and cleanup tasks.

    • You can stop the consumer and producer with Ctl-C in their respective command windows.
    • To stop Confluent Platform, type confluent local services stop.
    • If you would like to clear out existing data (topics, schemas, and messages) before starting again with another test, type confluent local destroy.

Configurations reference

The following configuration properties are available for producers and consumers. These are not specific to a particular schema format, but applicable to any Kafka producers and consumers.

Adding security credentials

The test drive examples show how to use the producer and consumer console clients as serializers and deserializers by passing Schema Registry properties on the command line and in config files. In addition to examples given in the “Test Drives”, you can pass truststore and keystore credentials for the Schema Registry, as described in Additional configurations for HTTPS. Here is an example for the producer on Confluent Platform:

protobuf-console-producer --broker-list localhost:9093 --topic myTopic \
--producer.config ~/ect/kafka/producer.properties\
--property value.schema='syntax = "proto3"; message MyRecord {string id = 1; float amount = 2; string customer_id=3;}' \
--property schema.registry.url=https://localhost:8081 \
--property schema.registry.ssl.truststore.location=/etc/kafka/security/schema.registry.client.truststore.jks \
--property schema.registry.ssl.truststore.password=myTrustStorePassword
Copy

Kafka producer configurations

A complete reference of producer configuration properties is available in Kafka Producer Configurations.

Kafka consumer configurations

A complete reference of consumer configuration properties is available in Kafka Consumer Configurations.

Schema Registry configuration options

A complete reference for Schema Registry configuration is available in the Confluent Platform documentation at Schema Registry Configuration Options.

Using Schema Registry with Connect

If you are using serializers and deserializers with Kafka Connect, you will need information on key and value converters. To learn more, see Configuring key and value converters. in the Connect documentation.

Schema references in Protobuf

Confluent Platform provides full support for the notion of schema references, the ability of a schema to refer to other schemas.

Tip

Schema references are also supported in Confluent Cloud on Avro, Protobuf, and JSON Schema formats. On the Confluent CLI, you can use the --refs <file> flag on confluent schema-registry schema create to reference another schema.

If you give the KafkaProtobufSerializer an instance of a generated Protobuf class, it can automatically register all referenced schemas.

The easiest way to manually register referenced schemas is with the Schema Registry Maven Plugin for Confluent Platform. Alternatively, you can use the Confluent Cloud APIs or Confluent Platform APIs to manually register referenced schemas.

This Protobuf example imports a schema for the order subject (defined in the file Order.proto) and its references for the product and customer subject.

<plugin>
    <groupId>io.confluent</groupId>
    <artifactId>kafka-schema-registry-maven-plugin</artifactId>
    <version>5.5.0</version>
    <configuration>
        <schemaRegistryUrls>
            <param>http://192.168.99.100:8081</param>
        </schemaRegistryUrls>
        <subjects>
            <order>src/main/protobuf/Order.proto</order>
            <product>src/main/protobuf/Product.proto</product>
            <customer>src/main/protobuf/Customer.proto</customer>
        </subjects>
        <schemaTypes>
            <order>PROTOBUF</order>
            <product>PROTOBUF</product>
            <customer>PROTOBUF</customer>
        </schemaTypes>
        <references>
            <order>
                <reference>
                    <name>Product.proto</name>
                    <subject>product</subject>
                </reference>
                <reference>
                    <name>Customer.proto</name>
                    <subject>customer</subject>
                </reference>
            </order>
        </references>
    </configuration>
    <goals>
        <goal>register</goal>
    </goals>
</plugin>
Copy

The schema for the order subject, defined in the Order.proto file, might look like:

syntax = "proto3";

package io.confluent.examples.generated_sources.protobuf;

import "Product.proto";
import "Customer.proto";

message Order {
    int32 order_id = 1;
    string order_date = 2;
    int32 order_amount = 3;
    repeated Product products = 4;
    Customer customer = 5;
}
Copy

And the schema for the customer, defined in Customer.proto, subject might look like:

syntax = "proto3";

package io.confluent.examples.generated_sources.protobuf;

message Customer {
    int64 customer_id = 1;
    string customer_name = 2;
    string customer_email = 3;
    string customer_address = 4;
}
Copy

For backward compatibility reasons, both "schemaType" and "references" are optional. If "schemaType" is omitted, it is assumed to be AVRO.

To learn more about reusing existing Protobuf message definitions by importing (referencing) them, see Using Other Message Types in the Protobuf documentation.

Multiple event types in the same topic

In addition to providing a way for one schema to call other schemas, schema references can be used to efficiently combine multiple event types in the same topic and still maintain subject-topic constraints.

In Protobuf, this is accomplished as follows:

  • Use the default subject naming strategy, TopicNameStrategy, which uses the topic name to determine the subject to be used for schema lookups, and helps to enforce subject-topic constraints.

  • Wrap a oneOf in a message to define a list of schema references, for example:

    syntax = "proto3";
    
    package io.confluent.examples.proto;
    
    import "Customer.proto";
    import "Product.proto";
    import "Order.proto";
    
    message AllTypes {
        oneof oneof_type {
            Customer customer = 1;
            Product product = 2;
            Order order = 3;
        }
    }
    
    Copy

    When the schema is registered, send an array of reference versions. For example:

    [
      {
        "name": "Customer.proto",
        "subject": "customer",
        "version": 1
      },
      {
        "name": "Product.proto",
        "subject": "product",
        "version": 1
      },
      {
        "name": "Order.proto",
        "subject": "order",
        "version": 1
      }
    ]
    
    Copy

    Tip

    • In Protobuf, top-level oneOfs are not permitted, which is why you must wrap the oneOf in a message.
    • One advantage of wrapping the oneOf with a message is that auto-registration of the top-level schema will work properly by default (unlike Avro or JSON Schema, which require additional configuration for this use case). In the case of Protobuf, all referenced schemas will also be auto-registered, recursively.
    • By default, oneOf messages have their field names suffixed with an index (for example _0). You can disable this suffix by configuring generate.index.for.unions (added in Confluent Platform 7.3.0). To learn more about using this option, see Protobuf in Using Kafka Connect with Schema Registry.

Handling null values with wrappers

Protobuf does not allow setting null values on any object field. Null values will cause a null pointer exception.

As a workaround, you can import google/protobuf/wrappers.proto and use the wrapper.for.nullables converter, which specifies whether or not to use Protobuf wrapper types for nullable fields. The default boolean value is false. If set to true, nullable fields use the wrapper types described on GitHub in protocolbuffers/protobuf, and in the google.protobuf package documentation.

For example, you can replace a non-nullable name that defaults to an empty string (""):

message GreetRequest {
  string name = 1;
}
Copy

with a nullable name, using a wrapper class from wrappers.proto (in this case, StringValue):

message GreetRequest {
  google.protobuf.StringValue name = 1;
}
Copy

In Confluent Platform, using the flag wrappers.for.nullable in the Protobuf converter causes the following results:

  1. When converting from a Connect schema to a Protobuf schema, an optional Connect field converts to a Protobuf field using a wrapper class.
  2. When converting from Protobuf schema to Connect schema, a Protobuf field using a wrapper class converts to an optional Connect field.

To learn more, see Tomas Basham’s post on Protocol Buffers and Optional Fields, and the description of “Wrapper types” for null values in the table under JSON Mapping in the Protocol Buffers Developer Guide.

Tip

Alternatively, Protocol Buffers v3 (proto3) does support optional primitive fields, which are similar to “nullable” types. To learn more about this strategy, see the Google documentation on Optional Primitive Fields and Protocol Buffer types.

Protobuf schema compatibility rules

Compatibility rules support schema evolution and the ability of downstream consumers to handle data encoded with old and new schemas. There is some overlap in these rules across formats, especially for Protobuf and Avro, with the exception of Protobuf backward compatibility, which differs between the two.

In general, it’s recommended to use BACKWARD_TRANSITIVE with Protobuf, as adding new message types is not forward compatible.

For more detail on Protobuf compatibility rules, including backward compatibility, see:

Limitations and developer notes

  • The console consumer does not currently support Protobuf Any type. Any is an advanced Protobuf feature that requires the Protobuf type to be registered with a TypeRegistry. The ability to pass additional types to the kafka-protobuf-console-consumer is not supported at this time.
  • The Confluent kafka-protobuf-serializer works with Google Protobuf v.3. Google Protobuf v.4 is currently not supported.
  • The “import public” feature of protobuf2 and protobuf3 is not supported in the Java, which means that it is not supported in the Schema Registry tools, so transitively included dependencies will not resolve as you might expect. This is not a limitation of Schema Registry, but a limitation of Protobuf itself that can look like a failure in Schema Registry.
  • Do not upload Google/Protobuf dependencies into Schema Registry. Although you might naturally expect the tooling to automatically exclude any “dangerous” files which ought not be imported, this is not the case.