Source code for confluent_kafka.schema_registry.protobuf

#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Copyright 2020 Confluent Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import io
import sys
import base64
import struct
from collections import deque

from google.protobuf.message import DecodeError
from google.protobuf.message_factory import MessageFactory

from . import (_MAGIC_BYTE,
               reference_subject_name_strategy,
               topic_subject_name_strategy,)
from .schema_registry_client import (Schema,
                                     SchemaReference)
from confluent_kafka.serialization import SerializationError

# Converts an int to bytes (opposite of ord)
# Python3.chr() -> Unicode
# Python2.chr() -> str(alias for bytes)
if sys.version > '3':
    def _bytes(b):
        """
        Convert int to bytes

        Args:
            b (int): int to format as bytes.

        """
        return bytes((b,))
else:
    def _bytes(b):
        """
        Convert int to bytes

        Args:
            b (int): int to format as bytes.

        """
        return chr(b)


class _ContextStringIO(io.BytesIO):
    """
    Wrapper to allow use of StringIO via 'with' constructs.

    """

    def __enter__(self):
        return self

    def __exit__(self, *args):
        self.close()
        return False


def _create_msg_index(msg_desc):
    """
    Maps the location of msg_desc within a FileDescriptor.

    Args:
        msg_desc (MessageDescriptor): Protobuf MessageDescriptor

    Returns:
        [int]: Protobuf MessageDescriptor index

    Raises:
        ValueError: If the message descriptor is malformed.

    """
    msg_idx = deque()
    current = msg_desc
    found = False
    # Traverse tree upwardly it's root
    while current.containing_type is not None:
        previous = current
        current = previous.containing_type
        # find child's position
        for idx, node in enumerate(current.nested_types):
            if node == previous:
                msg_idx.appendleft(idx)
                found = True
                break
        if not found:
            raise ValueError("Nested MessageDescriptor not found")

    found = False
    # find root's position in protofile
    for idx, msg_type_name in enumerate(msg_desc.file.message_types_by_name):
        if msg_type_name == current.name:
            msg_idx.appendleft(idx)
            found = True
            break
    if not found:
        raise ValueError("MessageDescriptor not found in file")

    # The root element at the 0 position does not need a length prefix.
    if len(msg_idx) == 1 and msg_idx[0] == 0:
        return [0]

    msg_idx.appendleft(len(msg_idx))
    return list(msg_idx)


def _schema_to_str(proto_file):
    """
    Base64 encodes a FileDescriptor

    Args:
        proto_file (FileDescriptor): FileDescriptor to encode.

    Returns:
        str: Base64 encoded FileDescriptor

    """
    return base64.standard_b64encode(proto_file.serialized_pb).decode('ascii')


[docs]class ProtobufSerializer(object): """ ProtobufSerializer serializes objects in the Confluent Schema Registry binary format for Protobuf. ProtobufSerializer configuration properties: +-------------------------------------+----------+------------------------------------------------------+ | Property Name | Type | Description | +=====================================+==========+======================================================+ | | | Registers schemas automatically if not | | ``auto.register.schemas`` | bool | previously associated with a particular subject. | | | | Defaults to True. | +-------------------------------------+----------+------------------------------------------------------+ | | | Callable(SerializationContext, str) -> str | | | | | | ``subject.name.strategy`` | callable | Instructs the ProtobufSerializer on how to construct | | | | Schema Registry subject names. | | | | Defaults to topic_subject_name_strategy. | +-------------------------------------+----------+------------------------------------------------------+ | | | Callable(SerializationContext, str) -> str | | | | | | ``reference.subject.name.strategy`` | callable | Instructs the ProtobufSerializer on how to construct | | | | Schema Registry subject names for Schema References | | | | Defaults to reference_subject_name_strategy | +-------------------------------------+----------+------------------------------------------------------+ Schemas are registered to namespaces known as Subjects which define how a schema may evolve over time. By default the subject name is formed by concatenating the topic name with the message field separated by a hyphen. i.e. {topic name}-{message field} Alternative naming strategies may be configured with the property ``subject.name.strategy``. Supported subject name strategies +--------------------------------------+------------------------------+ | Subject Name Strategy | Output Format | +======================================+==============================+ | topic_subject_name_strategy(default) | {topic name}-{message field} | +--------------------------------------+------------------------------+ | topic_record_subject_name_strategy | {topic name}-{record name} | +--------------------------------------+------------------------------+ | record_subject_name_strategy | {record name} | +--------------------------------------+------------------------------+ See `Subject name strategy <https://docs.confluent.io/current/schema-registry/serializer-formatter.html#subject-name-strategy>`_ for additional details. Args: msg_type (GeneratedProtocolMessageType): Protobuf Message type. schema_registry_client (SchemaRegistryClient): Schema Registry client instance. conf (dict): ProtobufSerializer configuration. See Also: `Protobuf API reference <https://googleapis.dev/python/protobuf/latest/google/protobuf.html>`_ """ # noqa: E501 __slots__ = ['_auto_register', '_registry', '_known_subjects', '_msg_class', '_msg_index', '_schema', '_schema_id', '_ref_reference_subject_func', '_subject_name_func'] # default configuration _default_conf = { 'auto.register.schemas': True, 'subject.name.strategy': topic_subject_name_strategy, 'reference.subject.name.strategy': reference_subject_name_strategy } def __init__(self, msg_type, schema_registry_client, conf=None): # handle configuration conf_copy = self._default_conf.copy() if conf is not None: conf_copy.update(conf) self._auto_register = conf_copy.pop('auto.register.schemas') if not isinstance(self._auto_register, bool): raise ValueError("auto.register.schemas must be a boolean value") self._subject_name_func = conf_copy.pop('subject.name.strategy') if not callable(self._subject_name_func): raise ValueError("subject.name.strategy must be callable") self._ref_reference_subject_func = conf_copy.pop( 'reference.subject.name.strategy') if not callable(self._ref_reference_subject_func): raise ValueError("subject.name.strategy must be callable") if len(conf_copy) > 0: raise ValueError("Unrecognized properties: {}" .format(", ".join(conf_copy.keys()))) self._registry = schema_registry_client self._schema_id = None # Avoid calling registry if schema is known to be registered self._known_subjects = set() self._msg_class = msg_type descriptor = msg_type.DESCRIPTOR self._msg_index = _create_msg_index(descriptor) self._schema = Schema(_schema_to_str(descriptor.file), schema_type='PROTOBUF') @staticmethod def _encode_uvarints(buf, ints): """ Encodes each int as a uvarint onto buf Args: buf (BytesIO): buffer to write to. ints ([int]): ints to be encoded. """ for value in ints: while (value & ~0x7f) != 0: buf.write(_bytes((value & 0x7f) | 0x80)) value >>= 7 buf.write(_bytes(value)) def _resolve_dependencies(self, ctx, file_desc): """ Resolves and optionally registers schema references recursively. Args: ctx (SerializationContext): Serialization context. file_desc (FileDescriptor): file descriptor to traverse. """ schema_refs = [] for dep in file_desc.dependencies: dep_refs = self._resolve_dependencies(ctx, dep) subject = self._ref_reference_subject_func(ctx, dep) schema = Schema(_schema_to_str(dep), references=dep_refs, schema_type='PROTOBUF') if self._auto_register: self._registry.register_schema(subject, schema) reference = self._registry.lookup_schema(subject, schema) # schema_refs are per file descriptor schema_refs.append(SchemaReference(dep.name, subject, reference.version)) return schema_refs
[docs] def __call__(self, message_type, ctx): """ Serializes a Protobuf Message to the Confluent Schema Registry Protobuf binary format. Args: message_type (Message): Protobuf message instance. ctx (SerializationContext): Metadata pertaining to the serialization operation. Note: None objects are represented as Kafka Null. Raises: SerializerError if any error occurs serializing obj Returns: bytes: Confluent Schema Registry formatted Protobuf bytes """ if message_type is None: return None if not isinstance(message_type, self._msg_class): raise ValueError("message must be of type {} not {}" .format(self._msg_class, type(message_type))) subject = self._subject_name_func(ctx, message_type.DESCRIPTOR.full_name) if subject not in self._known_subjects: self._schema.references = self._resolve_dependencies( ctx, message_type.DESCRIPTOR.file) if self._auto_register: self._schema_id = self._registry.register_schema(subject, self._schema) else: self._schema_id = self._registry.lookup_schema( subject, self._schema).schema_id with _ContextStringIO() as fo: # Write the magic byte and schema ID in network byte order # (big endian) fo.write(struct.pack('>bI', _MAGIC_BYTE, self._schema_id)) # write the record index to the buffer self._encode_uvarints(fo, self._msg_index) # write the record itself fo.write(message_type.SerializeToString()) return fo.getvalue()
[docs]class ProtobufDeserializer(object): """ ProtobufDeserializer decodes bytes written in the Schema Registry Protobuf format to an object. Args: message_type (GeneratedProtocolMessageType): Protobuf Message type. See Also: `Protobuf API reference <https://googleapis.dev/python/protobuf/latest/google/protobuf.html>`_ """ __slots__ = ['_msg_class', '_msg_index'] def __init__(self, message_type): descriptor = message_type.DESCRIPTOR self._msg_index = _create_msg_index(descriptor) self._msg_class = MessageFactory().GetPrototype(descriptor) @staticmethod def _decode_uvarint(buf): """ Decodes a single uvarint from a buffer. Args: buf (BytesIO): buffer to read from Returns: int: decoded uvarint Raises: EOFError: if buffer is empty """ value = 0 shift = 0 try: while True: i = ProtobufDeserializer._read_byte(buf) value |= (i & 0x7f) << shift shift += 7 if not (i & 0x80): return value except EOFError: raise EOFError("Unexpected EOF while reading index") @staticmethod def _read_byte(buf): """ Returns int representation for a byte. Args: buf (BytesIO): buffer to read from .. _ord: https://docs.python.org/2/library/functions.html#ord """ i = buf.read(1) if i == b'': raise EOFError("Unexpected EOF encountered") return ord(i) @staticmethod def _decode_index(buf): """ Extracts message index from Schema Registry Protobuf formatted bytes. Args: buf (BytesIO): byte buffer Returns: int: Protobuf Message index. """ size = ProtobufDeserializer._decode_uvarint(buf) msg_index = [size] for _ in range(size): msg_index.append(ProtobufDeserializer._decode_uvarint(buf)) return msg_index
[docs] def __call__(self, value, ctx): """ Deserializes Schema Registry formatted Protobuf to Protobuf Message. Args: value (bytes): Confluent Schema Registry formatted Protobuf bytes. ctx (SerializationContext): Metadata pertaining to the serialization operation. Returns: Message: Protobuf Message instance. Raises: SerializerError: If response payload and expected Message type differ. """ if value is None: return None # SR wire protocol + msg_index length if len(value) < 6: raise SerializationError("Message too small. This message was not" " produced with a Confluent" " Schema Registry serializer") with _ContextStringIO(value) as payload: magic, schema_id = struct.unpack('>bI', payload.read(5)) if magic != _MAGIC_BYTE: raise SerializationError("Unknown magic byte. This message was" " not produced with a Confluent" " Schema Registry serializer") # Protobuf Messages are self-describing; no need to query schema # Move the reader cursor past the index _ = ProtobufDeserializer._decode_index(payload) msg = self._msg_class() try: msg.ParseFromString(payload.read()) except DecodeError as e: raise SerializationError(str(e)) return msg