Source code for confluent_kafka.schema_registry.protobuf

#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Copyright 2020 Confluent Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import io
import sys
import base64
import struct
import warnings
from collections import deque

from google.protobuf.message import DecodeError
from google.protobuf.message_factory import MessageFactory

from . import (_MAGIC_BYTE,
               reference_subject_name_strategy,
               topic_subject_name_strategy,)
from .schema_registry_client import (Schema,
                                     SchemaReference)
from confluent_kafka.serialization import SerializationError

# Converts an int to bytes (opposite of ord)
# Python3.chr() -> Unicode
# Python2.chr() -> str(alias for bytes)
if sys.version > '3':
    def _bytes(b):
        """
        Convert int to bytes

        Args:
            b (int): int to format as bytes.

        """
        return bytes((b,))
else:
    def _bytes(b):
        """
        Convert int to bytes

        Args:
            b (int): int to format as bytes.

        """
        return chr(b)


class _ContextStringIO(io.BytesIO):
    """
    Wrapper to allow use of StringIO via 'with' constructs.

    """

    def __enter__(self):
        return self

    def __exit__(self, *args):
        self.close()
        return False


def _create_msg_index(msg_desc):
    """
    Maps the location of msg_desc within a FileDescriptor.

    Args:
        msg_desc (MessageDescriptor): Protobuf MessageDescriptor

    Returns:
        [int]: Protobuf MessageDescriptor index

    Raises:
        ValueError: If the message descriptor is malformed.

    """
    msg_idx = deque()
    current = msg_desc
    found = False
    # Traverse tree upwardly it's root
    while current.containing_type is not None:
        previous = current
        current = previous.containing_type
        # find child's position
        for idx, node in enumerate(current.nested_types):
            if node == previous:
                msg_idx.appendleft(idx)
                found = True
                break
        if not found:
            raise ValueError("Nested MessageDescriptor not found")

    found = False
    # find root's position in protofile
    for idx, msg_type_name in enumerate(msg_desc.file.message_types_by_name):
        if msg_type_name == current.name:
            msg_idx.appendleft(idx)
            found = True
            break
    if not found:
        raise ValueError("MessageDescriptor not found in file")

    return list(msg_idx)


def _schema_to_str(proto_file):
    """
    Base64 encodes a FileDescriptor

    Args:
        proto_file (FileDescriptor): FileDescriptor to encode.

    Returns:
        str: Base64 encoded FileDescriptor

    """
    return base64.standard_b64encode(proto_file.serialized_pb).decode('ascii')


[docs]class ProtobufSerializer(object): """ ProtobufSerializer serializes objects in the Confluent Schema Registry binary format for Protobuf. ProtobufSerializer configuration properties: +-------------------------------------+----------+------------------------------------------------------+ | Property Name | Type | Description | +=====================================+==========+======================================================+ | | | Registers schemas automatically if not | | ``auto.register.schemas`` | bool | previously associated with a particular subject. | | | | Defaults to True. | +-------------------------------------+----------+------------------------------------------------------+ | | | Whether to use the latest subject version for | | ``use.latest.version`` | bool | serialization. | | | | WARNING: There is no check that the latest | | | | schema is backwards compatible with the object | | | | being serialized. | | | | Defaults to False. | +-------------------------------------+----------+------------------------------------------------------+ | | | Whether to skip known types when resolving schema | | ``skip.known.types`` | bool | dependencies. | | | | Defaults to False. | +-------------------------------------+----------+------------------------------------------------------+ | | | Callable(SerializationContext, str) -> str | | | | | | ``subject.name.strategy`` | callable | Instructs the ProtobufSerializer on how to construct | | | | Schema Registry subject names. | | | | Defaults to topic_subject_name_strategy. | +-------------------------------------+----------+------------------------------------------------------+ | | | Callable(SerializationContext, str) -> str | | | | | | ``reference.subject.name.strategy`` | callable | Instructs the ProtobufSerializer on how to construct | | | | Schema Registry subject names for Schema References | | | | Defaults to reference_subject_name_strategy | +-------------------------------------+----------+------------------------------------------------------+ | ``use.deprecated.format`` | bool | Specifies whether the Protobuf serializer should | | | | serialize message indexes without zig-zag encoding. | | | | This option must be explicitly configured as older | | | | and newer Protobuf producers are incompatible. | | | | If the consumers of the topic being produced to are | | | | using confluent-kafka-python <1.8 then this property | | | | must be set to True until all old consumers have | | | | have been upgraded. | | | | Warning: This configuration property will be removed | | | | in a future version of the client. | +-------------------------------------+----------+------------------------------------------------------+ Schemas are registered to namespaces known as Subjects which define how a schema may evolve over time. By default the subject name is formed by concatenating the topic name with the message field separated by a hyphen. i.e. {topic name}-{message field} Alternative naming strategies may be configured with the property ``subject.name.strategy``. Supported subject name strategies +--------------------------------------+------------------------------+ | Subject Name Strategy | Output Format | +======================================+==============================+ | topic_subject_name_strategy(default) | {topic name}-{message field} | +--------------------------------------+------------------------------+ | topic_record_subject_name_strategy | {topic name}-{record name} | +--------------------------------------+------------------------------+ | record_subject_name_strategy | {record name} | +--------------------------------------+------------------------------+ See `Subject name strategy <https://docs.confluent.io/current/schema-registry/serializer-formatter.html#subject-name-strategy>`_ for additional details. Args: msg_type (GeneratedProtocolMessageType): Protobuf Message type. schema_registry_client (SchemaRegistryClient): Schema Registry client instance. conf (dict): ProtobufSerializer configuration. See Also: `Protobuf API reference <https://googleapis.dev/python/protobuf/latest/google/protobuf.html>`_ """ # noqa: E501 __slots__ = ['_auto_register', '_use_latest_version', '_skip_known_types', '_registry', '_known_subjects', '_msg_class', '_msg_index', '_schema', '_schema_id', '_ref_reference_subject_func', '_subject_name_func', '_use_deprecated_format'] # default configuration _default_conf = { 'auto.register.schemas': True, 'use.latest.version': False, 'skip.known.types': False, 'subject.name.strategy': topic_subject_name_strategy, 'reference.subject.name.strategy': reference_subject_name_strategy, 'use.deprecated.format': False, } def __init__(self, msg_type, schema_registry_client, conf=None): if conf is None or 'use.deprecated.format' not in conf: raise RuntimeError( "ProtobufSerializer: the 'use.deprecated.format' configuration " "property must be explicitly set due to backward incompatibility " "with older confluent-kafka-python Protobuf producers and consumers. " "See the release notes for more details") # handle configuration conf_copy = self._default_conf.copy() if conf is not None: conf_copy.update(conf) self._auto_register = conf_copy.pop('auto.register.schemas') if not isinstance(self._auto_register, bool): raise ValueError("auto.register.schemas must be a boolean value") self._use_latest_version = conf_copy.pop('use.latest.version') if not isinstance(self._use_latest_version, bool): raise ValueError("use.latest.version must be a boolean value") if self._use_latest_version and self._auto_register: raise ValueError("cannot enable both use.latest.version and auto.register.schemas") self._skip_known_types = conf_copy.pop('skip.known.types') if not isinstance(self._skip_known_types, bool): raise ValueError("skip.known.types must be a boolean value") self._use_deprecated_format = conf_copy.pop('use.deprecated.format') if not isinstance(self._use_deprecated_format, bool): raise ValueError("use.deprecated.format must be a boolean value") if not self._use_deprecated_format: warnings.warn("ProtobufSerializer: the 'use.deprecated.format' " "configuration property, and the ability to use the " "old incorrect Protobuf serializer heading format " "introduced in confluent-kafka-python v1.4.0, " "will be removed in an upcoming release in 2021 Q2. " "Please migrate your Python Protobuf producers and " "consumers to 'use.deprecated.format':True as " "soon as possible") self._subject_name_func = conf_copy.pop('subject.name.strategy') if not callable(self._subject_name_func): raise ValueError("subject.name.strategy must be callable") self._ref_reference_subject_func = conf_copy.pop( 'reference.subject.name.strategy') if not callable(self._ref_reference_subject_func): raise ValueError("subject.name.strategy must be callable") if len(conf_copy) > 0: raise ValueError("Unrecognized properties: {}" .format(", ".join(conf_copy.keys()))) self._registry = schema_registry_client self._schema_id = None # Avoid calling registry if schema is known to be registered self._known_subjects = set() self._msg_class = msg_type descriptor = msg_type.DESCRIPTOR self._msg_index = _create_msg_index(descriptor) self._schema = Schema(_schema_to_str(descriptor.file), schema_type='PROTOBUF') @staticmethod def _write_varint(buf, val, zigzag=True): """ Writes val to buf, either using zigzag or uvarint encoding. Args: buf (BytesIO): buffer to write to. val (int): integer to be encoded. zigzag (bool): whether to encode in zigzag or uvarint encoding """ if zigzag: val = (val << 1) ^ (val >> 63) while (val & ~0x7f) != 0: buf.write(_bytes((val & 0x7f) | 0x80)) val >>= 7 buf.write(_bytes(val)) @staticmethod def _encode_varints(buf, ints, zigzag=True): """ Encodes each int as a uvarint onto buf Args: buf (BytesIO): buffer to write to. ints ([int]): ints to be encoded. zigzag (bool): whether to encode in zigzag or uvarint encoding """ assert len(ints) > 0 # The root element at the 0 position does not need a length prefix. if ints == [0]: buf.write(_bytes(0x00)) return ProtobufSerializer._write_varint(buf, len(ints), zigzag=zigzag) for value in ints: ProtobufSerializer._write_varint(buf, value, zigzag=zigzag) def _resolve_dependencies(self, ctx, file_desc): """ Resolves and optionally registers schema references recursively. Args: ctx (SerializationContext): Serialization context. file_desc (FileDescriptor): file descriptor to traverse. """ schema_refs = [] for dep in file_desc.dependencies: if self._skip_known_types and dep.name.startswith("google/protobuf/"): continue dep_refs = self._resolve_dependencies(ctx, dep) subject = self._ref_reference_subject_func(ctx, dep) schema = Schema(_schema_to_str(dep), references=dep_refs, schema_type='PROTOBUF') if self._auto_register: self._registry.register_schema(subject, schema) reference = self._registry.lookup_schema(subject, schema) # schema_refs are per file descriptor schema_refs.append(SchemaReference(dep.name, subject, reference.version)) return schema_refs
[docs] def __call__(self, message_type, ctx): """ Serializes a Protobuf Message to the Confluent Schema Registry Protobuf binary format. Args: message_type (Message): Protobuf message instance. ctx (SerializationContext): Metadata pertaining to the serialization operation. Note: None objects are represented as Kafka Null. Raises: SerializerError if any error occurs serializing obj Returns: bytes: Confluent Schema Registry formatted Protobuf bytes """ if message_type is None: return None if not isinstance(message_type, self._msg_class): raise ValueError("message must be of type {} not {}" .format(self._msg_class, type(message_type))) subject = self._subject_name_func(ctx, message_type.DESCRIPTOR.full_name) if subject not in self._known_subjects: if self._use_latest_version: latest_schema = self._registry.get_latest_version(subject) self._schema_id = latest_schema.schema_id else: self._schema.references = self._resolve_dependencies( ctx, message_type.DESCRIPTOR.file) if self._auto_register: self._schema_id = self._registry.register_schema(subject, self._schema) else: self._schema_id = self._registry.lookup_schema( subject, self._schema).schema_id self._known_subjects.add(subject) with _ContextStringIO() as fo: # Write the magic byte and schema ID in network byte order # (big endian) fo.write(struct.pack('>bI', _MAGIC_BYTE, self._schema_id)) # write the record index to the buffer self._encode_varints(fo, self._msg_index, zigzag=not self._use_deprecated_format) # write the record itself fo.write(message_type.SerializeToString()) return fo.getvalue()
[docs]class ProtobufDeserializer(object): """ ProtobufDeserializer decodes bytes written in the Schema Registry Protobuf format to an object. Args: message_type (GeneratedProtocolMessageType): Protobuf Message type. conf (dict): Configuration dictionary. ProtobufDeserializer configuration properties: +-------------------------------------+----------+------------------------------------------------------+ | Property Name | Type | Description | +-------------------------------------+----------+------------------------------------------------------+ | ``use.deprecated.format`` | bool | Specifies whether the Protobuf deserializer should | | | | deserialize message indexes without zig-zag encoding.| | | | This option must be explicitly configured as older | | | | and newer Protobuf producers are incompatible. | | | | If Protobuf messages in the topic to consume were | | | | produced with confluent-kafka-python <1.8 then this | | | | property must be set to True until all old messages | | | | have been processed and producers have been upgraded.| | | | Warning: This configuration property will be removed | | | | in a future version of the client. | +-------------------------------------+----------+------------------------------------------------------+ See Also: `Protobuf API reference <https://googleapis.dev/python/protobuf/latest/google/protobuf.html>`_ """ __slots__ = ['_msg_class', '_msg_index', '_use_deprecated_format'] # default configuration _default_conf = { 'use.deprecated.format': False, } def __init__(self, message_type, conf=None): # Require use.deprecated.format to be explicitly configured # during a transitionary period since old/new format are # incompatible. if conf is None or 'use.deprecated.format' not in conf: raise RuntimeError( "ProtobufDeserializer: the 'use.deprecated.format' configuration " "property must be explicitly set due to backward incompatibility " "with older confluent-kafka-python Protobuf producers and consumers. " "See the release notes for more details") # handle configuration conf_copy = self._default_conf.copy() if conf is not None: conf_copy.update(conf) self._use_deprecated_format = conf_copy.pop('use.deprecated.format') if not isinstance(self._use_deprecated_format, bool): raise ValueError("use.deprecated.format must be a boolean value") if not self._use_deprecated_format: warnings.warn("ProtobufDeserializer: the 'use.deprecated.format' " "configuration property, and the ability to use the " "old incorrect Protobuf serializer heading format " "introduced in confluent-kafka-python v1.4.0, " "will be removed in an upcoming release in 2022 Q2. " "Please migrate your Python Protobuf producers and " "consumers to 'use.deprecated.format':True as " "soon as possible") descriptor = message_type.DESCRIPTOR self._msg_index = _create_msg_index(descriptor) self._msg_class = MessageFactory().GetPrototype(descriptor) @staticmethod def _decode_varint(buf, zigzag=True): """ Decodes a single varint from a buffer. Args: buf (BytesIO): buffer to read from zigzag (bool): decode as zigzag or uvarint Returns: int: decoded varint Raises: EOFError: if buffer is empty """ value = 0 shift = 0 try: while True: i = ProtobufDeserializer._read_byte(buf) value |= (i & 0x7f) << shift shift += 7 if not (i & 0x80): break if zigzag: value = (value >> 1) ^ -(value & 1) return value except EOFError: raise EOFError("Unexpected EOF while reading index") @staticmethod def _read_byte(buf): """ Returns int representation for a byte. Args: buf (BytesIO): buffer to read from .. _ord: https://docs.python.org/2/library/functions.html#ord """ i = buf.read(1) if i == b'': raise EOFError("Unexpected EOF encountered") return ord(i) @staticmethod def _decode_index(buf, zigzag=True): """ Extracts message index from Schema Registry Protobuf formatted bytes. Args: buf (BytesIO): byte buffer Returns: int: Protobuf Message index. """ size = ProtobufDeserializer._decode_varint(buf, zigzag=zigzag) if size < 0 or size > 100000: raise DecodeError("Invalid Protobuf msgidx array length") if size == 0: return [0] msg_index = [] for _ in range(size): msg_index.append(ProtobufDeserializer._decode_varint(buf, zigzag=zigzag)) return msg_index
[docs] def __call__(self, value, ctx): """ Deserializes Schema Registry formatted Protobuf to Protobuf Message. Args: value (bytes): Confluent Schema Registry formatted Protobuf bytes. ctx (SerializationContext): Metadata pertaining to the serialization operation. Returns: Message: Protobuf Message instance. Raises: SerializerError: If response payload and expected Message type differ. """ if value is None: return None # SR wire protocol + msg_index length if len(value) < 6: raise SerializationError("Message too small. This message was not" " produced with a Confluent" " Schema Registry serializer") with _ContextStringIO(value) as payload: magic, schema_id = struct.unpack('>bI', payload.read(5)) if magic != _MAGIC_BYTE: raise SerializationError("Unknown magic byte. This message was" " not produced with a Confluent" " Schema Registry serializer") # Protobuf Messages are self-describing; no need to query schema # Move the reader cursor past the index _ = self._decode_index(payload, zigzag=not self._use_deprecated_format) msg = self._msg_class() try: msg.ParseFromString(payload.read()) except DecodeError as e: raise SerializationError(str(e)) return msg