.. _serdes_and_formatter_protobuf: Protobuf Schema Serializer and Deserializer =========================================== .. important:: Support for Protocol Buffers (Protobuf) is available in preview. Preview features are components of |cp| introduced to gain early feedback from customers. Preview features can be used for evaluation and non-production testing purposes or to provide feedback to Confluent. This document describes how to use `Protocol Buffers (Protobuf) `__ with the |ak-tm| Java client and console tools. .. _referenced-schemas: Referenced Schemas ------------------ Protobuf provides support the notion of `referenced schemas`, the ability of a schema to refer to other schemas. Also, a Protobuf schema can automatically register a referenced schema. If you give the ``KafkaProtobufSerializer`` an instance of a generated ``Protobuf`` class, it can automatically register all referenced schemas. The easiest way to manually register referenced schemas is with the |sr| Maven Plugin. Alternatively, you can use the :ref:`REST APIs ` to manually register referenced schemas. The example imports a schema for the ``order`` subject and its references for the ``product`` and ``customer`` subject. :: io.confluent kafka-schema-registry-maven-plugin 5.5.0 http://192.168.99.100:8081 src/main/protobuf/Order.proto src/main/protobuf/Product.proto src/main/protobuf/Customer.proto PROTOBUF PROTOBUF PROTOBUF Product.proto product Customer.proto customer register The schema for the ``order`` subject might look like: :: syntax = "proto3"; package io.confluent.examples.generated_sources.protobuf; import "Product.proto"; import "Customer.proto"; message Order { int32 order_id = 1; string order_date = 2; int32 order_amount = 3; repeated Product products = 4; Customer customer = 5; } And the schema for the ``customer`` subject might look like: :: syntax = "proto3"; package io.confluent.examples.generated_sources.protobuf; message Customer { int64 customer_id = 1; string customer_name = 2; string customer_email = 3; string customer_address = 4; } For backward compatibility reasons, both ``"schemaType"`` and ``"references"`` are optional. If ``"schemaType"`` is omitted, it is assumed to be AVRO. .. include:: ../includes/json-protobuf-endpoints.rst .. _sr-serializer-protobuf: Protobuf Serializer ------------------- Plug the ``KafkaProtobufSerializer`` into ``KafkaProducer`` to send messages of Protobuf type to |ak|. When providing an instance of a Protobuf generated class to the serializer, the serializer can register the Protobuf schema, and all referenced schemas. For referenced schemas, by default the serializer will register each referenced schema under a subject with the same name as the reference. For example, if the main schema references “``google/protobuf/timestamp.proto``”, then the timestamp schema will be registered under a subject named “``google/protobuf/timestamp.proto``”. This behavior can be customized by providing a custom ``ReferenceSubjectNameStrategy`` to the serializer. The interface looks like this: :: public interface ReferenceSubjectNameStrategy extends Configurable { /** * For a given reference name, topic, and message, returns the subject name under which the * referenced schema should be registered in the schema registry. * * @param refName The name of the reference. * @param topic The Kafka topic name to which the message is being published. * @param isKey True when encoding a message key, false for a message value. * @param schema The referenced schema. * @return The subject name under which the referenced schema should be registered. */ String subjectName(String refName, String topic, boolean isKey, ParsedSchema schema); } For example, suppose you have the following schema. :: syntax = "proto3"; package com.acme; import "other.proto"; message MyRecord { string f1 = 1; OtherRecord f2 = 2; } The above schema references ``other.proto``, which looks like this: :: syntax = "proto3"; package com.acme; message OtherRecord { int32 other_id = 1; } The code below creates an instance of the ``MyRecord`` class that is generated by the Protobuf compiler. The |ak| producer is configured to serialize the ``MyRecord`` instance with the Protobuf serializer. .. tip:: The examples below use the default address and port for the |ak| bootstrap server (``localhost:9092``) and |sr| (``localhost:8081``). :: Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer"); props.put("schema.registry.url", "http://127.0.0.1:8081"); Producer producer = new KafkaProducer(props); String topic = "testproto"; String key = "testkey"; OtherRecord otherRecord = OtherRecord.newBuilder() .setOtherId(123).build(); MyRecord myrecord = MyRecord.newBuilder() .setF1("value1").setF2(otherRecord).build(); ProducerRecord record = new ProducerRecord(topic, key, myrecord); producer.send(record).get(); producer.close(); While serializing the Protobuf instance to |ak|, the above code will automatically register two schemas to |sr|, one for ``MyRecord`` and another for ``OtherRecord``, to two different subjects. (The default behavior of automatically registering schemas can be disabled by passing the property ``auto.register.schemas=false`` to the serializer). Now, use the REST endpoints to examine the schemas that were registered. First, you can see which subjects were used by using the following command. :: curl http://localhost:8081/subjects Here is the expected output: :: ["testproto-value", "other.proto"] The subject for the top-level schema is determined by a ``SubjectNameStrategy``, which defaults to suffixing the topic with either ``-key`` or ``-value``. The subjects for referenced schemas are determined by a ``ReferenceSubjectNameStrategy``, which defaults to the name used in the import statement. Both strategies can be customized. In the schemas below, note that the new ``schemaType`` field that, which is added for |cp| 5.5. Also, the top-level schema has a new ``references`` field that refers to ``other.proto``. Type the following command to view the ``testproto-value`` schema. :: curl http://localhost:8081/subjects/testproto-value/versions/1 Here is the expected output: :: { "subject": "testproto-value", "version": 1, "id": 2, "schemaType": "PROTOBUF", "references": [ { "name": "other.proto", "subject": "other.proto", "version": 1 } ], "schema": "syntax = \"proto3\";\npackage com.acme;\n\nimport \"other.proto\";\n\nmessage MyRecord {\n string f1 = 1;\n .com.acme.OtherRecord f2 = 2;\n}\n" } Note that the ``testproto-value`` schema ID is ``2``, and the ``other.proto`` schema ID is ``1``. Type the following command to view the ``other.proto`` schema. :: curl http://localhost:8081/subjects/other.proto/versions/1 Here is the expected output: :: { "subject": "other.proto", "version": 1, "id": 1, "schemaType": "PROTOBUF", "schema": "syntax = \"proto3\";\npackage com.acme;\n\nmessage OtherRecord {\n int32 other_id = 1;\n}\n" } Now, try to delete the referenced schema, ``other.proto``. :: curl -X DELETE http://localhost:8081/subjects/other.proto/versions/1 You will get the following error because |sr| prevents you from creating a dangling reference, which would be the case if the referenced schema was deleted before the top-level schema. :: {"error_code":42206,"message":"One or more references exist to the schema {magic=1,keytype=SCHEMA,subject=other.proto,version=1}."} You know that the schema with ID ``2`` references the schema you just tried to delete, but you can also get that information from |sr| directly with this command: :: curl http://localhost:8081/subjects/other.proto/versions/1/referencedby The above command returns the ID of the ``testproto-value`` schema (ID ``2``) that references your ``other.proto`` schema: :: [2] A schema can exist in multiple subjects, so you must delete the schema in each subject where it occurs before the referenced schema can deleted. Here is how to determine exactly where a schema ID is used: :: curl http://localhost:8081/schemas/ids/2/versions The above command returns the following, showing that in this case there is only one schema that references ``other.proto``: :: [{"subject":"testproto-value","version":1}] Delete the top-level schema (``testproto-value``) that is referencing ``other.proto``: :: curl -X DELETE http://localhost:8081/subjects/testproto-value/versions/1 The expected output is simply ``1``. Now you can delete the referenced schema, and the command will succeed. :: curl -X DELETE http://localhost:8081/subjects/other.proto/versions/1 The expected output is simply ``1``. .. _sr-deserializer-protobuf: Protobuf Deserializer --------------------- Plug ``KafkaProtobufDeserializer`` into ``KafkaConsumer`` to receive messages of any Protobuf type from |ak|. .. tip:: The examples below use the default address and port for the |ak| bootstrap server (``localhost:9092``) and |sr| (``localhost:8081``). :: Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ConsumerConfig.GROUP_ID_CONFIG, "group1"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializer"); props.put("schema.registry.url", "http://localhost:8081"); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); String topic = "testproto"; final Consumer consumer = new KafkaConsumer(props); consumer.subscribe(Arrays.asList(topic)); try { while (true) { ConsumerRecords records = consumer.poll(100); for (ConsumerRecord record : records) { System.out.printf("offset = %d, key = %s, value = %s \n", record.offset(), record.key(), record.value()); } } } finally { consumer.close(); } When deserializing a Protobuf payload, the ``KafkaProtobufDeserializer`` can behave in these ways: - If given a ``specific.protobuf.key.type`` or ``specific.protobuf.value.type``, the deserializer uses the specified type to perform deserialization. - The previous configuration will not work for ``RecordNameStrategy``, where more than one type of Protobuf ``derive.type=true``. In this case, you must also specify either ``java_outer_classname`` or ``java_multiple_files = true`` in the original Protobuf file. This allows the deserializer to derive the Java type from the schema to deserialize the Protobuf payload. - Finally, if no type is provided or no type can be derived, the deserializer uses the schema to return an instance of a Protobuf ``DynamicMessage``. Similar to how the Avro deserializer can return an instance of a specific Avro record type or a ``GenericRecord``, the Protobuf deserializer can return an instance of a specific Protobuf message type or a ``DynamicMessage``. If the Protobuf deserializer cannot determine a specific type, then a generic type is returned. One way to return a specific type is to use an explicit property. For the Protobuf deserializer, you can configure the property ``KafkaProtobufDeserializerConfig.SPECIFIC_PROTOBUF_VALUE_TYPE`` or ``KafkaProtobufDeserializerConfig.SPECIFIC_PROTOBUF_KEY_TYPE``. In order to allow the Protobuf deserializer to work with topics with heterogeneous types, you must provide additional information to the schema. Configure the deserializer with ``derive_type=true``, and then specify either ``java_outer_classname`` or ``java_multiple_files=true`` in the schema. .. include:: ../includes/deserialize-return-types-per-format.rst .. _sr-test-drive-protobuf: Test Drive Protobuf Schema -------------------------- To get started with Protobuf, you can use the command line producer and consumer for Protobuf. Similar to Avro, Protobuf defines both a binary serialization format and a JSON serialization format. This allows you to use JSON when human-readability is desired, and the more efficient binary format to store data in topics. .. include:: ../includes/serdes-formatter-prereqs.rst These examples make use of the ``kafka-protobuf-console-producer`` and ``kafka-protobuf-console-consumer``, which are located in ``$CONFLUENT_HOME/bin``. The command line producer and consumer are useful for understanding how the built-in Protobuf schema support works on |cp|. .. include:: ../includes/serdes-console-tools-overview.rst #. Start |cp| using the following command: .. code:: bash confluent local start .. include:: ../includes/sr-start-confluent-tips.rst #. Verify registered schema types. .. include:: ../includes/verify-registered-schema-types.rst #. Use the producer to send Protobuf records in JSON as the message value. The new topic, ``t1-p``, will be created as a part of this producer command if it does not already exist. :: kafka-protobuf-console-producer --broker-list localhost:9092 \ --property schema.registry.url=http://localhost:8081 --topic t1-p \ --property value.schema='syntax = "proto3"; message MyRecord { string f1 = 1; }' .. tip:: The current Protobuf specific producer does `not` show a ``>`` prompt, just a blank line at which to type producer messages. #. Type the following command in the shell, and hit return. :: {"f1": "value1-p"} The command line Protobuf producer will convert the JSON object to a Protobuf message (using the schema specified in ````) and then use an underlying serializer to serialize the message to the |ak| topic ``t1-p``. #. Use the consumer to read from topic ``t1-p`` and get the value of the message in JSON. :: kafka-protobuf-console-consumer --bootstrap-server localhost:9092 --from-beginning --topic t1-p --property schema.registry.url=http://localhost:8081 You should see following in the console. :: {"f1": "value1-p"} The underlying deserializer will create a Protobuf message, and then serialize the message to a JSON format for readability. #. In another shell, use `curl `__ commands to examine the schema that was registered with |sr|. :: curl http://localhost:8081/subjects/t1-p-value/versions/1/schema Here is the expected output: :: syntax = "proto3"; message MyRecord { string f1 = 1; } #. Run this curl command to view the schema in more detail. For more readable output, you can pipe it through `jq `__ (with ``curl`` download messages suppressed). :: curl --silent -X GET http://localhost:8081/subjects/t1-p-value/versions/latest | jq Here is the expected output of the above command: .. code:: bash "subject": "t1-p-value", "version": 1, "id": 21, "schemaType": "PROTOBUF", "schema": "syntax = \"proto3\";\n\nmessage MyRecord {\n string f1 = 1;\n}\n"} #. Use |c3| to examine schemas and messages. .. include:: ../includes/serdes-c3-view-messages.rst .. figure:: ../../images/serdes-protobuf-c3-messages.png :align: center Schemas you create are available on the **Schemas** tab for the selected topic. .. figure:: ../../images/serdes-protobuf-c3-schema.png :align: center #. .. include:: ../includes/cp-local-examples-cleanup.rst .. _sr-protobuf-schema-compatibility: Protobuf Schema Compatibility Rules ----------------------------------- Compatibility rules support :ref:`schema evolution ` and the ability of downstream consumers to handle data encoded with old and new schemas. There is some overlap in these rules across formats, especially for Protobuf and Avro, with the exception of Protobuf backward compatibility, which differs between the two. For more detail on Protobuf compatibility rules, including backward compatibility, see :ref:`sr-serdes-schemas-compatibility-checks` in the overview. Suggested Reading ----------------- - Developer examples of |ak| client producers and consumers, with and without Avro, are on GitHub in :devx-examples:`examples/clients|clients/`. - :ref:`schemaregistry_api` - `Protocol Buffers (Protobuf) `__ - Blog post: `Getting Started with Protobuf in Confluent Cloud `__ - `Schema Registry and Confluent Cloud `__