#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Copyright 2020-2022 Confluent Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import io
import sys
import base64
import struct
import warnings
from collections import deque
from google.protobuf.message import DecodeError
from google.protobuf.message_factory import MessageFactory
from . import (_MAGIC_BYTE,
reference_subject_name_strategy,
topic_subject_name_strategy,)
from .schema_registry_client import (Schema,
SchemaReference)
from confluent_kafka.serialization import SerializationError
# Convert an int to bytes (inverse of ord())
# Python3.chr() -> Unicode
# Python2.chr() -> str(alias for bytes)
if sys.version > '3':
def _bytes(v):
"""
Convert int to bytes
Args:
v (int): The int to convert to bytes.
"""
return bytes((v,))
else:
def _bytes(v):
"""
Convert int to bytes
Args:
v (int): The int to convert to bytes.
"""
return chr(v)
class _ContextStringIO(io.BytesIO):
"""
Wrapper to allow use of StringIO via 'with' constructs.
"""
def __enter__(self):
return self
def __exit__(self, *args):
self.close()
return False
def _create_index_array(msg_desc):
"""
Creates an index array specifying the location of msg_desc in
the referenced FileDescriptor.
Args:
msg_desc (MessageDescriptor): Protobuf MessageDescriptor
Returns:
list of int: Protobuf MessageDescriptor index array.
Raises:
ValueError: If the message descriptor is malformed.
"""
msg_idx = deque()
# Walk the nested MessageDescriptor tree up to the root.
current = msg_desc
found = False
while current.containing_type is not None:
previous = current
current = previous.containing_type
# find child's position
for idx, node in enumerate(current.nested_types):
if node == previous:
msg_idx.appendleft(idx)
found = True
break
if not found:
raise ValueError("Nested MessageDescriptor not found")
# Add the index of the root MessageDescriptor in the FileDescriptor.
found = False
for idx, msg_type_name in enumerate(msg_desc.file.message_types_by_name):
if msg_type_name == current.name:
msg_idx.appendleft(idx)
found = True
break
if not found:
raise ValueError("MessageDescriptor not found in file")
return list(msg_idx)
def _schema_to_str(file_descriptor):
"""
Base64 encode a FileDescriptor
Args:
file_descriptor (FileDescriptor): FileDescriptor to encode.
Returns:
str: Base64 encoded FileDescriptor
"""
return base64.standard_b64encode(file_descriptor.serialized_pb).decode('ascii')
[docs]class ProtobufSerializer(object):
"""
Serializer for Protobuf Message derived classes. Serialization format is Protobuf,
with Confluent Schema Registry framing.
Configuration properties:
+-------------------------------------+----------+------------------------------------------------------+
| Property Name | Type | Description |
+=====================================+==========+======================================================+
| | | If True, automatically register the configured |
| ``auto.register.schemas`` | bool | schema with Confluent Schema Registry if it has |
| | | not previously been associated with the relevant |
| | | subject (determined via subject.name.strategy). |
| | | |
| | | Defaults to True. |
| | | |
| | | Raises SchemaRegistryError if the schema was not |
| | | registered against the subject, or could not be |
| | | successfully registered. |
+-------------------------------------+----------+------------------------------------------------------+
| | | Whether to normalize schemas, which will |
| ``normalize.schemas`` | bool | transform schemas to have a consistent format, |
| | | including ordering properties and references. |
+-------------------------------------+----------+------------------------------------------------------+
| | | Whether to use the latest subject version for |
| ``use.latest.version`` | bool | serialization. |
| | | |
| | | WARNING: There is no check that the latest |
| | | schema is backwards compatible with the object |
| | | being serialized. |
| | | |
| | | Defaults to False. |
+-------------------------------------+----------+------------------------------------------------------+
| | | Whether or not to skip known types when resolving |
| ``skip.known.types`` | bool | schema dependencies. |
| | | |
| | | Defaults to False. |
+-------------------------------------+----------+------------------------------------------------------+
| | | Callable(SerializationContext, str) -> str |
| | | |
| ``subject.name.strategy`` | callable | Defines how Schema Registry subject names are |
| | | constructed. Standard naming strategies are |
| | | defined in the confluent_kafka.schema_registry |
| | | namespace. |
| | | |
| | | Defaults to topic_subject_name_strategy. |
+-------------------------------------+----------+------------------------------------------------------+
| | | Callable(SerializationContext, str) -> str |
| | | |
| ``reference.subject.name.strategy`` | callable | Defines how Schema Registry subject names for schema |
| | | references are constructed. |
| | | |
| | | Defaults to reference_subject_name_strategy |
+-------------------------------------+----------+------------------------------------------------------+
| ``use.deprecated.format`` | bool | Specifies whether the Protobuf serializer should |
| | | serialize message indexes without zig-zag encoding. |
| | | This option must be explicitly configured as older |
| | | and newer Protobuf producers are incompatible. |
| | | If the consumers of the topic being produced to are |
| | | using confluent-kafka-python <1.8 then this property |
| | | must be set to True until all old consumers have |
| | | have been upgraded. |
| | | |
| | | Warning: This configuration property will be removed |
| | | in a future version of the client. |
+-------------------------------------+----------+------------------------------------------------------+
Schemas are registered against subject names in Confluent Schema Registry that
define a scope in which the schemas can be evolved. By default, the subject name
is formed by concatenating the topic name with the message field (key or value)
separated by a hyphen.
i.e. {topic name}-{message field}
Alternative naming strategies may be configured with the property
``subject.name.strategy``.
Supported subject name strategies
+--------------------------------------+------------------------------+
| Subject Name Strategy | Output Format |
+======================================+==============================+
| topic_subject_name_strategy(default) | {topic name}-{message field} |
+--------------------------------------+------------------------------+
| topic_record_subject_name_strategy | {topic name}-{record name} |
+--------------------------------------+------------------------------+
| record_subject_name_strategy | {record name} |
+--------------------------------------+------------------------------+
See `Subject name strategy <https://docs.confluent.io/current/schema-registry/serializer-formatter.html#subject-name-strategy>`_ for additional details.
Args:
msg_type (GeneratedProtocolMessageType): Protobuf Message type.
schema_registry_client (SchemaRegistryClient): Schema Registry
client instance.
conf (dict): ProtobufSerializer configuration.
See Also:
`Protobuf API reference <https://googleapis.dev/python/protobuf/latest/google/protobuf.html>`_
""" # noqa: E501
__slots__ = ['_auto_register', '_normalize_schemas', '_use_latest_version', '_skip_known_types',
'_registry', '_known_subjects', '_msg_class', '_index_array', '_schema', '_schema_id',
'_ref_reference_subject_func', '_subject_name_func', '_use_deprecated_format']
_default_conf = {
'auto.register.schemas': True,
'normalize.schemas': False,
'use.latest.version': False,
'skip.known.types': False,
'subject.name.strategy': topic_subject_name_strategy,
'reference.subject.name.strategy': reference_subject_name_strategy,
'use.deprecated.format': False,
}
def __init__(self, msg_type, schema_registry_client, conf=None):
if conf is None or 'use.deprecated.format' not in conf:
raise RuntimeError(
"ProtobufSerializer: the 'use.deprecated.format' configuration "
"property must be explicitly set due to backward incompatibility "
"with older confluent-kafka-python Protobuf producers and consumers. "
"See the release notes for more details")
conf_copy = self._default_conf.copy()
if conf is not None:
conf_copy.update(conf)
self._auto_register = conf_copy.pop('auto.register.schemas')
if not isinstance(self._auto_register, bool):
raise ValueError("auto.register.schemas must be a boolean value")
self._normalize_schemas = conf_copy.pop('normalize.schemas')
if not isinstance(self._normalize_schemas, bool):
raise ValueError("normalize.schemas must be a boolean value")
self._use_latest_version = conf_copy.pop('use.latest.version')
if not isinstance(self._use_latest_version, bool):
raise ValueError("use.latest.version must be a boolean value")
if self._use_latest_version and self._auto_register:
raise ValueError("cannot enable both use.latest.version and auto.register.schemas")
self._skip_known_types = conf_copy.pop('skip.known.types')
if not isinstance(self._skip_known_types, bool):
raise ValueError("skip.known.types must be a boolean value")
self._use_deprecated_format = conf_copy.pop('use.deprecated.format')
if not isinstance(self._use_deprecated_format, bool):
raise ValueError("use.deprecated.format must be a boolean value")
if self._use_deprecated_format:
warnings.warn("ProtobufSerializer: the 'use.deprecated.format' "
"configuration property, and the ability to use the "
"old incorrect Protobuf serializer heading format "
"introduced in confluent-kafka-python v1.4.0, "
"will be removed in an upcoming release in 2021 Q2. "
"Please migrate your Python Protobuf producers and "
"consumers to 'use.deprecated.format':False as "
"soon as possible")
self._subject_name_func = conf_copy.pop('subject.name.strategy')
if not callable(self._subject_name_func):
raise ValueError("subject.name.strategy must be callable")
self._ref_reference_subject_func = conf_copy.pop(
'reference.subject.name.strategy')
if not callable(self._ref_reference_subject_func):
raise ValueError("subject.name.strategy must be callable")
if len(conf_copy) > 0:
raise ValueError("Unrecognized properties: {}"
.format(", ".join(conf_copy.keys())))
self._registry = schema_registry_client
self._schema_id = None
self._known_subjects = set()
self._msg_class = msg_type
descriptor = msg_type.DESCRIPTOR
self._index_array = _create_index_array(descriptor)
self._schema = Schema(_schema_to_str(descriptor.file),
schema_type='PROTOBUF')
@staticmethod
def _write_varint(buf, val, zigzag=True):
"""
Writes val to buf, either using zigzag or uvarint encoding.
Args:
buf (BytesIO): buffer to write to.
val (int): integer to be encoded.
zigzag (bool): whether to encode in zigzag or uvarint encoding
"""
if zigzag:
val = (val << 1) ^ (val >> 63)
while (val & ~0x7f) != 0:
buf.write(_bytes((val & 0x7f) | 0x80))
val >>= 7
buf.write(_bytes(val))
@staticmethod
def _encode_varints(buf, ints, zigzag=True):
"""
Encodes each int as a uvarint onto buf
Args:
buf (BytesIO): buffer to write to.
ints ([int]): ints to be encoded.
zigzag (bool): whether to encode in zigzag or uvarint encoding
"""
assert len(ints) > 0
# The root element at the 0 position does not need a length prefix.
if ints == [0]:
buf.write(_bytes(0x00))
return
ProtobufSerializer._write_varint(buf, len(ints), zigzag=zigzag)
for value in ints:
ProtobufSerializer._write_varint(buf, value, zigzag=zigzag)
def _resolve_dependencies(self, ctx, file_desc):
"""
Resolves and optionally registers schema references recursively.
Args:
ctx (SerializationContext): Serialization context.
file_desc (FileDescriptor): file descriptor to traverse.
"""
schema_refs = []
for dep in file_desc.dependencies:
if self._skip_known_types and dep.name.startswith("google/protobuf/"):
continue
dep_refs = self._resolve_dependencies(ctx, dep)
subject = self._ref_reference_subject_func(ctx, dep)
schema = Schema(_schema_to_str(dep),
references=dep_refs,
schema_type='PROTOBUF')
if self._auto_register:
self._registry.register_schema(subject, schema)
reference = self._registry.lookup_schema(subject, schema)
# schema_refs are per file descriptor
schema_refs.append(SchemaReference(dep.name,
subject,
reference.version))
return schema_refs
[docs] def __call__(self, message, ctx):
"""
Serializes an instance of a class derived from Protobuf Message, and prepends
it with Confluent Schema Registry framing.
Args:
message (Message): An instance of a class derived from Protobuf Message.
ctx (SerializationContext): Metadata relevant to the serialization.
operation.
Raises:
SerializerError if any error occurs during serialization.
Returns:
None if messages is None, else a byte array containing the Protobuf
serialized message with Confluent Schema Registry framing.
"""
if message is None:
return None
if not isinstance(message, self._msg_class):
raise ValueError("message must be of type {} not {}"
.format(self._msg_class, type(message)))
subject = self._subject_name_func(ctx,
message.DESCRIPTOR.full_name)
if subject not in self._known_subjects:
if self._use_latest_version:
latest_schema = self._registry.get_latest_version(subject)
self._schema_id = latest_schema.schema_id
else:
self._schema.references = self._resolve_dependencies(
ctx, message.DESCRIPTOR.file)
if self._auto_register:
self._schema_id = self._registry.register_schema(subject,
self._schema,
self._normalize_schemas)
else:
self._schema_id = self._registry.lookup_schema(
subject, self._schema, self._normalize_schemas).schema_id
self._known_subjects.add(subject)
with _ContextStringIO() as fo:
# Write the magic byte and schema ID in network byte order
# (big endian)
fo.write(struct.pack('>bI', _MAGIC_BYTE, self._schema_id))
# write the index array that specifies the message descriptor
# of the serialized data.
self._encode_varints(fo, self._index_array,
zigzag=not self._use_deprecated_format)
# write the serialized data itself
fo.write(message.SerializeToString())
return fo.getvalue()
[docs]class ProtobufDeserializer(object):
"""
Deserializer for Protobuf serialized data with Confluent Schema Registry framing.
Args:
message_type (Message derived type): Protobuf Message type.
conf (dict): Configuration dictionary.
ProtobufDeserializer configuration properties:
+-------------------------------------+----------+------------------------------------------------------+
| Property Name | Type | Description |
+-------------------------------------+----------+------------------------------------------------------+
| ``use.deprecated.format`` | bool | Specifies whether the Protobuf deserializer should |
| | | deserialize message indexes without zig-zag encoding.|
| | | This option must be explicitly configured as older |
| | | and newer Protobuf producers are incompatible. |
| | | If Protobuf messages in the topic to consume were |
| | | produced with confluent-kafka-python <1.8 then this |
| | | property must be set to True until all old messages |
| | | have been processed and producers have been upgraded.|
| | | Warning: This configuration property will be removed |
| | | in a future version of the client. |
+-------------------------------------+----------+------------------------------------------------------+
See Also:
`Protobuf API reference <https://googleapis.dev/python/protobuf/latest/google/protobuf.html>`_
"""
__slots__ = ['_msg_class', '_index_array', '_use_deprecated_format']
_default_conf = {
'use.deprecated.format': False,
}
def __init__(self, message_type, conf=None):
# Require use.deprecated.format to be explicitly configured
# during a transitionary period since old/new format are
# incompatible.
if conf is None or 'use.deprecated.format' not in conf:
raise RuntimeError(
"ProtobufDeserializer: the 'use.deprecated.format' configuration "
"property must be explicitly set due to backward incompatibility "
"with older confluent-kafka-python Protobuf producers and consumers. "
"See the release notes for more details")
conf_copy = self._default_conf.copy()
if conf is not None:
conf_copy.update(conf)
self._use_deprecated_format = conf_copy.pop('use.deprecated.format')
if not isinstance(self._use_deprecated_format, bool):
raise ValueError("use.deprecated.format must be a boolean value")
if self._use_deprecated_format:
warnings.warn("ProtobufDeserializer: the 'use.deprecated.format' "
"configuration property, and the ability to use the "
"old incorrect Protobuf serializer heading format "
"introduced in confluent-kafka-python v1.4.0, "
"will be removed in an upcoming release in 2022 Q2. "
"Please migrate your Python Protobuf producers and "
"consumers to 'use.deprecated.format':False as "
"soon as possible")
descriptor = message_type.DESCRIPTOR
self._index_array = _create_index_array(descriptor)
self._msg_class = MessageFactory().GetPrototype(descriptor)
@staticmethod
def _decode_varint(buf, zigzag=True):
"""
Decodes a single varint from a buffer.
Args:
buf (BytesIO): buffer to read from
zigzag (bool): decode as zigzag or uvarint
Returns:
int: decoded varint
Raises:
EOFError: if buffer is empty
"""
value = 0
shift = 0
try:
while True:
i = ProtobufDeserializer._read_byte(buf)
value |= (i & 0x7f) << shift
shift += 7
if not (i & 0x80):
break
if zigzag:
value = (value >> 1) ^ -(value & 1)
return value
except EOFError:
raise EOFError("Unexpected EOF while reading index")
@staticmethod
def _read_byte(buf):
"""
Read one byte from buf as an int.
Args:
buf (BytesIO): The buffer to read from.
.. _ord:
https://docs.python.org/2/library/functions.html#ord
"""
i = buf.read(1)
if i == b'':
raise EOFError("Unexpected EOF encountered")
return ord(i)
@staticmethod
def _read_index_array(buf, zigzag=True):
"""
Read an index array from buf that specifies the message
descriptor of interest in the file descriptor.
Args:
buf (BytesIO): The buffer to read from.
Returns:
list of int: The index array.
"""
size = ProtobufDeserializer._decode_varint(buf, zigzag=zigzag)
if size < 0 or size > 100000:
raise DecodeError("Invalid Protobuf msgidx array length")
if size == 0:
return [0]
msg_index = []
for _ in range(size):
msg_index.append(ProtobufDeserializer._decode_varint(buf,
zigzag=zigzag))
return msg_index
[docs] def __call__(self, data, ctx):
"""
Deserialize a serialized protobuf message with Confluent Schema Registry
framing.
Args:
data (bytes): Serialized protobuf message with Confluent Schema
Registry framing.
ctx (SerializationContext): Metadata relevant to the serialization
operation.
Returns:
Message: Protobuf Message instance.
Raises:
SerializerError: If there was an error reading the Confluent framing
data, or parsing the protobuf serialized message.
"""
if data is None:
return None
# SR wire protocol + msg_index length
if len(data) < 6:
raise SerializationError("Expecting data framing of length 6 bytes or "
"more but total data size is {} bytes. This "
"message was not produced with a Confluent "
"Schema Registry serializer".format(len(data)))
with _ContextStringIO(data) as payload:
magic, schema_id = struct.unpack('>bI', payload.read(5))
if magic != _MAGIC_BYTE:
raise SerializationError("Unknown magic byte. This message was "
"not produced with a Confluent "
"Schema Registry serializer")
# Protobuf Messages are self-describing; no need to query schema
_ = self._read_index_array(payload, zigzag=not self._use_deprecated_format)
msg = self._msg_class()
try:
msg.ParseFromString(payload.read())
except DecodeError as e:
raise SerializationError(str(e))
return msg