Protobuf Schema Serializer and Deserializer

This document describes how to use Protocol Buffers (Protobuf) with the Apache Kafka® Java client and console tools.

Protobuf Serializer

Plug the KafkaProtobufSerializer into KafkaProducer to send messages of Protobuf type to Kafka.

When providing an instance of a Protobuf generated class to the serializer, the serializer can register the Protobuf schema, and all referenced schemas. For referenced schemas, by default the serializer will register each referenced schema under a subject with the same name as the reference. For example, if the main schema references “google/protobuf/timestamp.proto”, then the timestamp schema will be registered under a subject named “google/protobuf/timestamp.proto”. This behavior can be customized by providing a custom ReferenceSubjectNameStrategy to the serializer.

The interface looks like this:

public interface ReferenceSubjectNameStrategy extends Configurable
{
 /**
  * For a given reference name, topic, and message, returns the subject name under which the
  * referenced schema should be registered in the schema registry.
  *
  * @param refName The name of the reference.
  * @param topic The Kafka topic name to which the message is being published.
  * @param isKey True when encoding a message key, false for a message value.
  * @param schema The referenced schema.
  * @return The subject name under which the referenced schema should be registered.
  */
 String subjectName(String refName, String topic, boolean isKey, ParsedSchema schema);
}

For example, suppose you have the following schema.

syntax = "proto3";
package com.acme;

import "other.proto";

message MyRecord {
  string f1 = 1;
  OtherRecord f2 = 2;
}

The above schema references other.proto, which looks like this:

syntax = "proto3";
package com.acme;

message OtherRecord {
  int32 other_id = 1;
}

The code below creates an instance of the MyRecord class that is generated by the Protobuf compiler. The Kafka producer is configured to serialize the MyRecord instance with the Protobuf serializer.

Tip

The examples below use the default address and port for the Kafka bootstrap server (localhost:9092) and Schema Registry (localhost:8081).

Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
  "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
  "io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer");
props.put("schema.registry.url", "http://127.0.0.1:8081");

Producer<String, MyRecord> producer = new KafkaProducer<String, MyRecord>(props);

String topic = "testproto";
String key = "testkey";
OtherRecord otherRecord = OtherRecord.newBuilder()
  .setOtherId(123).build();
MyRecord myrecord = MyRecord.newBuilder()
  .setF1("value1").setF2(otherRecord).build();

ProducerRecord<String, MyRecord> record
      = new ProducerRecord<String, MyRecord>(topic, key, myrecord);
producer.send(record).get();
producer.close();

While serializing the Protobuf instance to Kafka, the above code will automatically register two schemas to Schema Registry, one for MyRecord and another for OtherRecord, to two different subjects. (The default behavior of automatically registering schemas can be disabled by passing the property auto.register.schemas=false to the serializer).

Now, use the REST endpoints to examine the schemas that were registered. First, you can see which subjects were used by using the following command.

curl http://localhost:8081/subjects

Here is the expected output:

["testproto-value", "other.proto"]

The subject for the top-level schema is determined by a SubjectNameStrategy, which defaults to suffixing the topic with either -key or -value. The subjects for referenced schemas are determined by a ReferenceSubjectNameStrategy, which defaults to the name used in the import statement. Both strategies can be customized.

In the schemas below, note that the new schemaType field that, which is added for Confluent Platform 5.5. Also, the top-level schema has a new references field that refers to other.proto.

Type the following command to view the testproto-value schema.

curl http://localhost:8081/subjects/testproto-value/versions/1

Here is the expected output:

{
  "subject": "testproto-value",
  "version": 1,
  "id": 2,
  "schemaType": "PROTOBUF",
  "references": [
    {
      "name": "other.proto",
      "subject": "other.proto",
      "version": 1
    }
  ],
  "schema": "syntax = \"proto3\";\npackage com.acme;\n\nimport \"other.proto\";\n\nmessage MyRecord {\n  string f1 = 1;\n  .com.acme.OtherRecord f2 = 2;\n}\n"
}

Note that the testproto-value schema ID is 2, and the other.proto schema ID is 1.

Type the following command to view the other.proto schema.

curl http://localhost:8081/subjects/other.proto/versions/1

Here is the expected output:

{
"subject": "other.proto",
"version": 1,
"id": 1,
"schemaType": "PROTOBUF",
"schema": "syntax = \"proto3\";\npackage com.acme;\n\nmessage OtherRecord {\n  int32 other_id = 1;\n}\n"
}

Now, try to delete the referenced schema, other.proto.

curl -X DELETE http://localhost:8081/subjects/other.proto/versions/1

You will get the following error because Schema Registry prevents you from creating a dangling reference, which would be the case if the referenced schema was deleted before the top-level schema.

{"error_code":42206,"message":"One or more references exist to the schema {magic=1,keytype=SCHEMA,subject=other.proto,version=1}."}

You know that the schema with ID 2 references the schema you just tried to delete, but you can also get that information from Schema Registry directly with this command:

curl http://localhost:8081/subjects/other.proto/versions/1/referencedby

The above command returns the ID of the testproto-value schema (ID 2) that references your other.proto schema:

[2]

A schema can exist in multiple subjects, so you must delete the schema in each subject where it occurs before the referenced schema can deleted. Here is how to determine exactly where a schema ID is used:

curl http://localhost:8081/schemas/ids/2/versions

The above command returns the following, showing that in this case there is only one schema that references other.proto:

[{"subject":"testproto-value","version":1}]

Delete the top-level schema (testproto-value) that is referencing other.proto:

curl -X DELETE http://localhost:8081/subjects/testproto-value/versions/1

The expected output is simply 1.

Now you can delete the referenced schema, and the command will succeed.

curl -X DELETE http://localhost:8081/subjects/other.proto/versions/1

The expected output is simply 1.

Protobuf Deserializer

Plug KafkaProtobufDeserializer into KafkaConsumer to receive messages of any Protobuf type from Kafka.

Tip

The examples below use the default address and port for the Kafka bootstrap server (localhost:9092) and Schema Registry (localhost:8081).

Properties props = new Properties();

props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializer");
props.put("schema.registry.url", "http://localhost:8081");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

String topic = "testproto";
final Consumer<String, Message> consumer = new KafkaConsumer<String, Message>(props);
consumer.subscribe(Arrays.asList(topic));

try {
  while (true) {
    ConsumerRecords<String, Message> records = consumer.poll(100);
    for (ConsumerRecord<String, Message> record : records) {
      System.out.printf("offset = %d, key = %s, value = %s \n", record.offset(), record.key(), record.value());
    }
  }
} finally {
  consumer.close();
}

When deserializing a Protobuf payload, the KafkaProtobufDeserializer can behave in these ways:

  • If given a specific.protobuf.key.type or specific.protobuf.value.type, the deserializer uses the specified type to perform deserialization.
  • The previous configuration will not work for RecordNameStrategy, where more than one type of Protobuf derive.type=true. In this case, you must also specify either java_outer_classname or java_multiple_files = true in the original Protobuf file. This allows the deserializer to derive the Java type from the schema to deserialize the Protobuf payload.
  • Finally, if no type is provided or no type can be derived, the deserializer uses the schema to return an instance of a Protobuf DynamicMessage.

Similar to how the Avro deserializer can return an instance of a specific Avro record type or a GenericRecord, the Protobuf deserializer can return an instance of a specific Protobuf message type or a DynamicMessage.

If the Protobuf deserializer cannot determine a specific type, then a generic type is returned.

One way to return a specific type is to use an explicit property. For the Protobuf deserializer, you can configure the property KafkaProtobufDeserializerConfig.SPECIFIC_PROTOBUF_VALUE_TYPE or KafkaProtobufDeserializerConfig.SPECIFIC_PROTOBUF_KEY_TYPE.

In order to allow the Protobuf deserializer to work with topics with heterogeneous types, you must provide additional information to the schema. Configure the deserializer with derive_type=true, and then specify either java_outer_classname or java_multiple_files=true in the schema.

Here is a summary of specific and generic return types for each schema format.


Avro Protobuf JSON Schema
Specific type Generated class that extends org.apache.avro.SpecificRecord Generated class that extends com.google.protobuf.Message Java class (that is compatible with Jackson serialization)
Generic type org.apache.avro.GenericRecord com.google.protobuf.DynamicMessage com.fasterxml.jackson.databind.JsonNode

Test Drive Protobuf Schema

To get started with Protobuf, you can use the command line producer and consumer for Protobuf. Similar to Avro, Protobuf defines both a binary serialization format and a JSON serialization format. This allows you to use JSON when human-readability is desired, and the more efficient binary format to store data in topics.

Note

  • Prerequisites to run these examples are generally the same as those described for the Schema Registry Tutorial with the exception of Maven, which is not needed here. Also, Confluent Platform version 5.5.0 or later is required here.
  • The following examples use the the default Schema Registry URL value (localhost:8081). The examples show how to configure this inline by supplying the URL as an argument to the --property flag in the command line arguments of the producer and consumer (--property schema.registry.url=<address of your schema registry>). Alternatively, you could set this property in $CONFLUENT_HOME/etc/kafka/server.properties, and not have to include it in the producer and consumer commands. For example: confluent.schema.registry.url=http://localhost:8081

These examples make use of the kafka-protobuf-console-producer and kafka-protobuf-console-consumer, which are located in $CONFLUENT_HOME/bin.

The command line producer and consumer are useful for understanding how the built-in Protobuf schema support works on Confluent Platform.

When you incorporate the serializer and deserializer into the code for your own producers and consumers, messages and associated schemas are processed the same way as they are on the console producers and consumers.

The suggested consumer commands include a flag to read --from-beginning to be sure you capture the messages even if you don’t run the consumer immediately after running the producer. If you leave off the --from-beginning flag, the consumer will read only the last message produced during its current session.

  1. Start Confluent Platform using the following command:

    confluent local start
    

    Tip

    • Alternatively, you can simply run confluent local schema-registry which also starts kafka and zookeeper as dependencies. This demo does not directly reference the other services, such as Connect and Control Center. That said, you may want to run the full stack anyway to further explore, for example, how the topics and messages display on Control Center. To learn more about confluent local, see Confluent Platform Quick Start (Local) and confluent local in the Confluent CLI command reference.
    • The confluent local commands run in the background so you can re-use this command window. Separate sessions are required for the producer and consumer.
  2. Verify registered schema types.

    Starting with Confluent Platform 5.5.0, Schema Registry now supports arbitrary schema types. You should verify which schema types are currently registered with Schema Registry.

    To do so, type the following command (assuming you use the default URL and port for Schema Registry, localhost:8081):

    curl http://localhost:8081/schemas/types
    

    The response will be one or more of the following. If additional schema format plugins are installed, these will also be available.

    ["JSON", "PROTOBUF", "AVRO"]
    

    Alternatively, use the curl --silent flag, and pipe the command through jq (curl --silent http://localhost:8081/schemas/types | jq) to get nicely formatted output:

    "JSON",
    "PROTOBUF",
    "AVRO"
    
  3. Use the producer to send Protobuf records in JSON as the message value.

    The new topic, t1-p, will be created as a part of this producer command if it does not already exist.

    kafka-protobuf-console-producer --broker-list localhost:9092 \
    --property schema.registry.url=http://localhost:8081 --topic t1-p \
    --property value.schema='syntax = "proto3"; message MyRecord { string f1 = 1; }'
    

    Tip

    The current Protobuf specific producer does not show a > prompt, just a blank line at which to type producer messages.

  4. Type the following command in the shell, and hit return.

    {"f1": "value1-p"}
    

    The command line Protobuf producer will convert the JSON object to a Protobuf message (using the schema specified in <value.schema>) and then use an underlying serializer to serialize the message to the Kafka topic t1-p.

  5. Use the consumer to read from topic t1-p and get the value of the message in JSON.

    kafka-protobuf-console-consumer --bootstrap-server localhost:9092 --from-beginning --topic t1-p --property schema.registry.url=http://localhost:8081
    

    You should see following in the console.

    {"f1": "value1-p"}
    

    The underlying deserializer will create a Protobuf message, and then serialize the message to a JSON format for readability.

  6. In another shell, use curl commands to examine the schema that was registered with Schema Registry.

    curl http://localhost:8081/subjects/t1-p-value/versions/1/schema
    

    Here is the expected output:

    syntax = "proto3";
    
    message MyRecord {
      string f1 = 1;
    }
    
  7. Run this curl command to view the schema in more detail. For more readable output, you can pipe it through jq (with curl download messages suppressed).

    curl --silent -X GET http://localhost:8081/subjects/t1-p-value/versions/latest | jq
    

    Here is the expected output of the above command:

    "subject": "t1-p-value",
    "version": 1,
    "id": 21,
    "schemaType": "PROTOBUF",
    "schema": "syntax = \"proto3\";\n\nmessage MyRecord {\n  string f1 = 1;\n}\n"}
    
  8. Use Confluent Control Center to examine schemas and messages.

    Messages that were successfully produced also show on Control Center (http://localhost:9021/) in Topics > <topicName> > Messages. You may have to select a partition or jump to a timestamp to see messages sent earlier. (For timestamp, type in a number, which will default to partition 1/Partition: 0, and press return. To get the message view shown here, select the cards icon on the upper right.)

    ../../_images/serdes-protobuf-c3-messages.png

    Schemas you create are available on the Schemas tab for the selected topic.

    ../../_images/serdes-protobuf-c3-schema.png
  9. Run shutdown and cleanup tasks.

    • You can stop the consumer and producer with Ctl-C in their respective command windows.
    • To stop Confluent Platform, type confluent local stop.
    • If you would like to clear out existing data (topics, schemas, and messages) before starting again with another test , type confluent local destroy.

Schemas References in Protobuf

Confluent Platform provides full support for the notion of schema references, the ability of a schema to refer to other schemas.

Tip

Schema references are also supported in Confluent Cloud on Avro, Protobuf, and JSON Schema formats. On the Confluent Cloud CLI, you can use the --refs <file> flag on ccloud schema-registry schema create to reference another schema.

If you give the KafkaProtobufSerializer an instance of a generated Protobuf class, it can automatically register all referenced schemas.

The easiest way to manually register referenced schemas is with the Schema Registry Maven Plugin. Alternatively, you can use the REST APIs to manually register referenced schemas.

This Protobuf example imports a schema for the order subject and its references for the product and customer subject.

<plugin>
    <groupId>io.confluent</groupId>
    <artifactId>kafka-schema-registry-maven-plugin</artifactId>
    <version>5.5.0</version>
    <configuration>
        <schemaRegistryUrls>
            <param>http://192.168.99.100:8081</param>
        </schemaRegistryUrls>
        <subjects>
            <order>src/main/protobuf/Order.proto</order>
            <product>src/main/protobuf/Product.proto</product>
            <customer>src/main/protobuf/Customer.proto</customer>
        </subjects>
        <schemaTypes>
            <order>PROTOBUF</order>
            <product>PROTOBUF</product>
            <customer>PROTOBUF</customer>
        </schemaTypes>
        <references>
            <order>
                <reference>
                    <name>Product.proto</name>
                    <subject>product</subject>
                </reference>
                <reference>
                    <name>Customer.proto</name>
                    <subject>customer</subject>
                </reference>
            </order>
        </references>
    </configuration>
    <goals>
        <goal>register</goal>
    </goals>
</plugin>

The schema for the order subject might look like:

syntax = "proto3";

package io.confluent.examples.generated_sources.protobuf;

import "Product.proto";
import "Customer.proto";

message Order {
    int32 order_id = 1;
    string order_date = 2;
    int32 order_amount = 3;
    repeated Product products = 4;
    Customer customer = 5;
}

And the schema for the customer subject might look like:

syntax = "proto3";

package io.confluent.examples.generated_sources.protobuf;

message Customer {
    int64 customer_id = 1;
    string customer_name = 2;
    string customer_email = 3;
    string customer_address = 4;
}

For backward compatibility reasons, both "schemaType" and "references" are optional. If "schemaType" is omitted, it is assumed to be AVRO.

Protobuf Schema Compatibility Rules

Compatibility rules support schema evolution and the ability of downstream consumers to handle data encoded with old and new schemas. There is some overlap in these rules across formats, especially for Protobuf and Avro, with the exception of Protobuf backward compatibility, which differs between the two.

For more detail on Protobuf compatibility rules, including backward compatibility, see Compatibility Checks in the overview.

Suggested Reading