#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Copyright 2020-2022 Confluent Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import io
from typing import Any, Callable, List, Optional, Set, Tuple, Union, cast
from google.protobuf import descriptor_pb2, json_format
from google.protobuf.descriptor import Descriptor, FileDescriptor
from google.protobuf.descriptor_pool import DescriptorPool
from google.protobuf.message import DecodeError, Message
from google.protobuf.message_factory import GetMessageClass
from confluent_kafka.schema_registry import (
RuleMode,
Schema,
SchemaReference,
dual_schema_id_deserializer,
prefix_schema_id_serializer,
reference_subject_name_strategy,
topic_subject_name_strategy,
)
from confluent_kafka.schema_registry.common.protobuf import (
PROTOBUF_TYPE,
_bytes,
_ContextStringIO,
_create_index_array,
_init_pool,
_is_builtin,
_schema_to_str,
_str_to_proto,
transform,
)
from confluent_kafka.schema_registry.common.schema_registry_client import RulePhase
from confluent_kafka.schema_registry.rule_registry import RuleRegistry
from confluent_kafka.schema_registry.schema_registry_client import SchemaRegistryClient
from confluent_kafka.schema_registry.serde import (
BaseDeserializer,
BaseSerializer,
ParsedSchemaCache,
SchemaId,
)
from confluent_kafka.serialization import SerializationContext, SerializationError
__all__ = [
'_resolve_named_schema',
'ProtobufSerializer',
'ProtobufDeserializer',
]
def _resolve_named_schema(
schema: Schema,
schema_registry_client: SchemaRegistryClient,
pool: DescriptorPool,
visited: Optional[Set[str]] = None,
):
"""
Resolves named schemas referenced by the provided schema recursively.
:param schema: Schema to resolve named schemas for.
:param schema_registry_client: SchemaRegistryClient to use for retrieval.
:param pool: DescriptorPool to add resolved schemas to.
:return: DescriptorPool
"""
if visited is None:
visited = set()
if schema.references is not None:
for ref in schema.references:
if ref.name is None:
raise ValueError("Name cannot be None")
if _is_builtin(ref.name) or ref.name in visited:
continue
visited.add(ref.name)
if ref.subject is None or ref.version is None:
raise ValueError("Subject or version cannot be None")
referenced_schema = schema_registry_client.get_version(ref.subject, ref.version, True, 'serialized')
if referenced_schema.schema.schema_str is None:
raise ValueError("Schema string cannot be None")
_resolve_named_schema(referenced_schema.schema, schema_registry_client, pool, visited)
file_descriptor_proto = _str_to_proto(ref.name, referenced_schema.schema.schema_str)
pool.Add(file_descriptor_proto)
[docs]
class ProtobufSerializer(BaseSerializer):
"""
Serializer for Protobuf Message derived classes. Serialization format is Protobuf,
with Confluent Schema Registry framing.
Configuration properties:
+-------------------------------------+----------+------------------------------------------------------+
| Property Name | Type | Description |
+=====================================+==========+======================================================+
| | | If True, automatically register the configured |
| ``auto.register.schemas`` | bool | schema with Confluent Schema Registry if it has |
| | | not previously been associated with the relevant |
| | | subject (determined via subject.name.strategy). |
| | | |
| | | Defaults to True. |
| | | |
| | | Raises SchemaRegistryError if the schema was not |
| | | registered against the subject, or could not be |
| | | successfully registered. |
+-------------------------------------+----------+------------------------------------------------------+
| | | Whether to normalize schemas, which will |
| ``normalize.schemas`` | bool | transform schemas to have a consistent format, |
| | | including ordering properties and references. |
+-------------------------------------+----------+------------------------------------------------------+
| | | Whether to use the given schema ID for |
| ``use.schema.id`` | int | serialization. |
| | | |
+-------------------------------------+----------+------------------------------------------------------+
| | | Whether to use the latest subject version for |
| ``use.latest.version`` | bool | serialization. |
| | | |
| | | WARNING: There is no check that the latest |
| | | schema is backwards compatible with the object |
| | | being serialized. |
| | | |
| | | Defaults to False. |
+-------------------------------------+----------+------------------------------------------------------+
| | | Whether to use the latest subject version with |
| ``use.latest.with.metadata`` | dict | the given metadata. |
| | | |
| | | WARNING: There is no check that the latest |
| | | schema is backwards compatible with the object |
| | | being serialized. |
| | | |
| | | Defaults to None. |
+-------------------------------------+----------+------------------------------------------------------+
| | | Whether or not to skip known types when resolving |
| ``skip.known.types`` | bool | schema dependencies. |
| | | |
| | | Defaults to True. |
+-------------------------------------+----------+------------------------------------------------------+
| | | Callable(SerializationContext, str) -> str |
| | | |
| ``subject.name.strategy`` | callable | Defines how Schema Registry subject names are |
| | | constructed. Standard naming strategies are |
| | | defined in the confluent_kafka.schema_registry |
| | | namespace. |
| | | |
| | | Defaults to topic_subject_name_strategy. |
+-------------------------------------+----------+------------------------------------------------------+
| | | Callable(SerializationContext, str) -> str |
| | | |
| ``reference.subject.name.strategy`` | callable | Defines how Schema Registry subject names for schema |
| | | references are constructed. |
| | | |
| | | Defaults to reference_subject_name_strategy |
+-------------------------------------+----------+------------------------------------------------------+
| | | Callable(bytes, SerializationContext, schema_id) |
| | | -> bytes |
| | | |
| ``schema.id.serializer`` | callable | Defines how the schema id/guid is serialized. |
| | | Defaults to prefix_schema_id_serializer. |
+-------------------------------------+----------+------------------------------------------------------+
Schemas are registered against subject names in Confluent Schema Registry that
define a scope in which the schemas can be evolved. By default, the subject name
is formed by concatenating the topic name with the message field (key or value)
separated by a hyphen.
i.e. {topic name}-{message field}
Alternative naming strategies may be configured with the property
``subject.name.strategy``.
Supported subject name strategies
+--------------------------------------+------------------------------+
| Subject Name Strategy | Output Format |
+======================================+==============================+
| topic_subject_name_strategy(default) | {topic name}-{message field} |
+--------------------------------------+------------------------------+
| topic_record_subject_name_strategy | {topic name}-{record name} |
+--------------------------------------+------------------------------+
| record_subject_name_strategy | {record name} |
+--------------------------------------+------------------------------+
See `Subject name strategy <https://docs.confluent.io/current/schema-registry/serializer-formatter.html#subject-name-strategy>`_ for additional details.
Args:
msg_type (Message): Protobuf Message type.
schema_registry_client (SchemaRegistryClient): Schema Registry
client instance.
conf (dict): ProtobufSerializer configuration.
See Also:
`Protobuf API reference <https://googleapis.dev/python/protobuf/latest/google/protobuf.html>`_
""" # noqa: E501
__slots__ = [
'_skip_known_types',
'_known_subjects',
'_msg_class',
'_index_array',
'_schema',
'_schema_id',
'_ref_reference_subject_func',
'_use_deprecated_format',
'_parsed_schemas',
]
_default_conf = {
'auto.register.schemas': True,
'normalize.schemas': False,
'use.schema.id': None,
'use.latest.version': False,
'use.latest.with.metadata': None,
'skip.known.types': True,
'subject.name.strategy': topic_subject_name_strategy,
'reference.subject.name.strategy': reference_subject_name_strategy,
'schema.id.serializer': prefix_schema_id_serializer,
'use.deprecated.format': False,
}
def __init_impl(
self,
msg_type: Message,
schema_registry_client: SchemaRegistryClient,
conf: Optional[dict] = None,
rule_conf: Optional[dict] = None,
rule_registry: Optional[RuleRegistry] = None,
):
super().__init__()
conf_copy = self._default_conf.copy()
if conf is not None:
conf_copy.update(conf)
self._auto_register = cast(bool, conf_copy.pop('auto.register.schemas'))
if not isinstance(self._auto_register, bool):
raise ValueError("auto.register.schemas must be a boolean value")
self._normalize_schemas = cast(bool, conf_copy.pop('normalize.schemas'))
if not isinstance(self._normalize_schemas, bool):
raise ValueError("normalize.schemas must be a boolean value")
self._use_schema_id = cast(Optional[int], conf_copy.pop('use.schema.id'))
if self._use_schema_id is not None and not isinstance(self._use_schema_id, int):
raise ValueError("use.schema.id must be an int value")
self._use_latest_version = cast(bool, conf_copy.pop('use.latest.version'))
if not isinstance(self._use_latest_version, bool):
raise ValueError("use.latest.version must be a boolean value")
if self._use_latest_version and self._auto_register:
raise ValueError("cannot enable both use.latest.version and auto.register.schemas")
self._use_latest_with_metadata = cast(Optional[dict], conf_copy.pop('use.latest.with.metadata'))
if self._use_latest_with_metadata is not None and not isinstance(self._use_latest_with_metadata, dict):
raise ValueError("use.latest.with.metadata must be a dict value")
self._skip_known_types = cast(bool, conf_copy.pop('skip.known.types'))
if not isinstance(self._skip_known_types, bool):
raise ValueError("skip.known.types must be a boolean value")
self._use_deprecated_format = cast(bool, conf_copy.pop('use.deprecated.format'))
if not isinstance(self._use_deprecated_format, bool):
raise ValueError("use.deprecated.format must be a boolean value")
if self._use_deprecated_format:
raise ValueError("use.deprecated.format is no longer supported")
self._subject_name_func = cast(
Callable[[Optional[SerializationContext], Optional[str]], Optional[str]],
conf_copy.pop('subject.name.strategy'),
)
if not callable(self._subject_name_func):
raise ValueError("subject.name.strategy must be callable")
self._ref_reference_subject_func = cast(
Callable[[Optional[SerializationContext], Any], Optional[str]],
conf_copy.pop('reference.subject.name.strategy'),
)
if not callable(self._ref_reference_subject_func):
raise ValueError("subject.name.strategy must be callable")
self._schema_id_serializer = cast(
Callable[[bytes, Optional[SerializationContext], Any], bytes], conf_copy.pop('schema.id.serializer')
)
if not callable(self._schema_id_serializer):
raise ValueError("schema.id.serializer must be callable")
if len(conf_copy) > 0:
raise ValueError("Unrecognized properties: {}".format(", ".join(conf_copy.keys())))
self._registry = schema_registry_client
self._rule_registry = rule_registry if rule_registry else RuleRegistry.get_global_instance()
self._schema_id: Optional[SchemaId] = None
self._known_subjects: set[str] = set()
self._msg_class = msg_type
self._parsed_schemas = ParsedSchemaCache()
descriptor = msg_type.DESCRIPTOR
self._index_array = _create_index_array(descriptor)
self._schema = Schema(_schema_to_str(descriptor.file), schema_type='PROTOBUF')
for rule in self._rule_registry.get_executors():
rule.configure(self._registry.config() if self._registry else {}, rule_conf if rule_conf else {})
__init__ = __init_impl
@staticmethod
def _write_varint(buf: io.BytesIO, val: int, zigzag: bool = True):
"""
Writes val to buf, either using zigzag or uvarint encoding.
Args:
buf (BytesIO): buffer to write to.
val (int): integer to be encoded.
zigzag (bool): whether to encode in zigzag or uvarint encoding
"""
if zigzag:
val = (val << 1) ^ (val >> 63)
while (val & ~0x7F) != 0:
buf.write(_bytes((val & 0x7F) | 0x80))
val >>= 7
buf.write(_bytes(val))
@staticmethod
def _encode_varints(buf: io.BytesIO, ints: List[int], zigzag: bool = True):
"""
Encodes each int as a uvarint onto buf
Args:
buf (BytesIO): buffer to write to.
ints ([int]): ints to be encoded.
zigzag (bool): whether to encode in zigzag or uvarint encoding
"""
assert len(ints) > 0
# The root element at the 0 position does not need a length prefix.
if ints == [0]:
buf.write(_bytes(0x00))
return
ProtobufSerializer._write_varint(buf, len(ints), zigzag=zigzag)
for value in ints:
ProtobufSerializer._write_varint(buf, value, zigzag=zigzag)
def _resolve_dependencies(self, ctx: SerializationContext, file_desc: FileDescriptor) -> List[SchemaReference]:
"""
Resolves and optionally registers schema references recursively.
Args:
ctx (SerializationContext): Serialization context.
file_desc (FileDescriptor): file descriptor to traverse.
"""
schema_refs = []
for dep in file_desc.dependencies:
if self._skip_known_types and _is_builtin(dep.name):
continue
dep_refs = self._resolve_dependencies(ctx, dep)
subject = self._ref_reference_subject_func(ctx, dep)
schema = Schema(_schema_to_str(dep), references=dep_refs, schema_type='PROTOBUF')
if self._auto_register:
self._registry.register_schema(subject, schema)
reference = self._registry.lookup_schema(subject, schema)
# schema_refs are per file descriptor
schema_refs.append(SchemaReference(dep.name, subject, reference.version))
return schema_refs
[docs]
def __call__( # type: ignore[override]
self, message: Message, ctx: Optional[SerializationContext] = None
) -> Optional[bytes]:
return self.__serialize(message, ctx)
def __serialize(self, message: Message, ctx: Optional[SerializationContext] = None) -> Optional[bytes]:
"""
Serializes an instance of a class derived from Protobuf Message, and prepends
it with Confluent Schema Registry framing.
Args:
message (Message): An instance of a class derived from Protobuf Message.
ctx (SerializationContext): Metadata relevant to the serialization.
operation.
Raises:
SerializerError if any error occurs during serialization.
Returns:
None if messages is None, else a byte array containing the Protobuf
serialized message with Confluent Schema Registry framing.
"""
if message is None:
return None
if not isinstance(message, self._msg_class):
raise ValueError("message must be of type {} not {}".format(self._msg_class, type(message)))
subject = self._subject_name_func(ctx, message.DESCRIPTOR.full_name) if ctx else None
latest_schema = None
if subject is not None:
latest_schema = self._get_reader_schema(subject, fmt='serialized')
if latest_schema is not None:
self._schema_id = SchemaId(PROTOBUF_TYPE, latest_schema.schema_id, latest_schema.guid, self._index_array)
elif subject is not None and subject not in self._known_subjects and ctx is not None:
references = self._resolve_dependencies(ctx, message.DESCRIPTOR.file)
self._schema = Schema(self._schema.schema_str, self._schema.schema_type, references)
if self._auto_register:
registered_schema = self._registry.register_schema_full_response(
subject, self._schema, normalize_schemas=self._normalize_schemas
)
self._schema_id = SchemaId(
PROTOBUF_TYPE, registered_schema.schema_id, registered_schema.guid, self._index_array
)
else:
registered_schema = self._registry.lookup_schema(
subject, self._schema, normalize_schemas=self._normalize_schemas
)
self._schema_id = SchemaId(
PROTOBUF_TYPE, registered_schema.schema_id, registered_schema.guid, self._index_array
)
self._known_subjects.add(subject)
if latest_schema is not None:
fd_proto, pool = self._get_parsed_schema(latest_schema.schema)
fd = pool.FindFileByName(fd_proto.name)
desc = fd.message_types_by_name[message.DESCRIPTOR.name]
def field_transformer(rule_ctx, field_transform, msg):
return transform(rule_ctx, desc, msg, field_transform) # noqa: E731
if ctx is not None and subject is not None:
message = self._execute_rules(
ctx, subject, RuleMode.WRITE, None, latest_schema.schema, message, None, field_transformer
)
with _ContextStringIO() as fo:
fo.write(message.SerializeToString())
if self._schema_id is not None:
self._schema_id.message_indexes = self._index_array
buffer = fo.getvalue()
if latest_schema is not None and ctx is not None and subject is not None:
buffer = self._execute_rules_with_phase(
ctx, subject, RulePhase.ENCODING, RuleMode.WRITE, None, latest_schema.schema, buffer, None, None
)
return self._schema_id_serializer(buffer, ctx, self._schema_id)
def _get_parsed_schema(self, schema: Schema) -> Tuple[descriptor_pb2.FileDescriptorProto, DescriptorPool]:
result = self._parsed_schemas.get_parsed_schema(schema)
if result is not None:
return result
pool = DescriptorPool()
_init_pool(pool)
_resolve_named_schema(schema, self._registry, pool)
if schema.schema_str is None:
raise ValueError("Schema string cannot be None")
fd_proto = _str_to_proto("default", schema.schema_str)
pool.Add(fd_proto)
self._parsed_schemas.set(schema, (fd_proto, pool))
return fd_proto, pool
[docs]
class ProtobufDeserializer(BaseDeserializer):
"""
Deserializer for Protobuf serialized data with Confluent Schema Registry framing.
Args:
message_type (Message derived type): Protobuf Message type.
conf (dict): Configuration dictionary.
ProtobufDeserializer configuration properties:
+-------------------------------------+----------+------------------------------------------------------+
| Property Name | Type | Description |
+-------------------------------------+----------+------------------------------------------------------+
| | | Whether to use the latest subject version for |
| ``use.latest.version`` | bool | deserialization. |
| | | |
| | | Defaults to False. |
+-------------------------------------+----------+------------------------------------------------------+
| | | Whether to use the latest subject version with |
| ``use.latest.with.metadata`` | dict | the given metadata. |
| | | |
| | | Defaults to None. |
+-------------------------------------+----------+------------------------------------------------------+
| | | Callable(SerializationContext, str) -> str |
| | | |
| ``subject.name.strategy`` | callable | Defines how Schema Registry subject names are |
| | | constructed. Standard naming strategies are |
| | | defined in the confluent_kafka. schema_registry |
| | | namespace . |
| | | |
| | | Defaults to topic_subject_name_strategy. |
+-------------------------------------+----------+------------------------------------------------------+
| | | Callable(bytes, SerializationContext, schema_id) |
| | | -> io.BytesIO |
| | | |
| ``schema.id.deserializer`` | callable | Defines how the schema id/guid is deserialized. |
| | | Defaults to dual_schema_id_deserializer. |
+-------------------------------------+----------+------------------------------------------------------+
See Also:
`Protobuf API reference <https://googleapis.dev/python/protobuf/latest/google/protobuf.html>`_
"""
__slots__ = ['_msg_class', '_use_deprecated_format', '_parsed_schemas']
_default_conf = {
'use.latest.version': False,
'use.latest.with.metadata': None,
'subject.name.strategy': topic_subject_name_strategy,
'schema.id.deserializer': dual_schema_id_deserializer,
'use.deprecated.format': False,
}
def __init_impl(
self,
message_type: Message,
conf: Optional[dict] = None,
schema_registry_client: Optional[SchemaRegistryClient] = None,
rule_conf: Optional[dict] = None,
rule_registry: Optional[RuleRegistry] = None,
):
super().__init__()
self._registry = schema_registry_client
self._rule_registry = rule_registry if rule_registry else RuleRegistry.get_global_instance()
self._parsed_schemas = ParsedSchemaCache()
self._use_schema_id = None
conf_copy = self._default_conf.copy()
if conf is not None:
conf_copy.update(conf)
self._use_latest_version = cast(bool, conf_copy.pop('use.latest.version'))
if not isinstance(self._use_latest_version, bool):
raise ValueError("use.latest.version must be a boolean value")
self._use_latest_with_metadata = cast(Optional[dict], conf_copy.pop('use.latest.with.metadata'))
if self._use_latest_with_metadata is not None and not isinstance(self._use_latest_with_metadata, dict):
raise ValueError("use.latest.with.metadata must be a dict value")
self._subject_name_func = cast(
Callable[[Optional[SerializationContext], Optional[str]], Optional[str]],
conf_copy.pop('subject.name.strategy'),
)
if not callable(self._subject_name_func):
raise ValueError("subject.name.strategy must be callable")
self._schema_id_deserializer = cast(
Callable[[bytes, Optional[SerializationContext], Any], io.BytesIO], conf_copy.pop('schema.id.deserializer')
)
if not callable(self._schema_id_deserializer):
raise ValueError("schema.id.deserializer must be callable")
self._use_deprecated_format = cast(bool, conf_copy.pop('use.deprecated.format'))
if not isinstance(self._use_deprecated_format, bool):
raise ValueError("use.deprecated.format must be a boolean value")
if self._use_deprecated_format:
raise ValueError("use.deprecated.format is no longer supported")
descriptor = message_type.DESCRIPTOR
self._msg_class = GetMessageClass(descriptor)
for rule in self._rule_registry.get_executors():
rule.configure(self._registry.config() if self._registry else {}, rule_conf if rule_conf else {})
__init__ = __init_impl
[docs]
def __call__(self, data: Optional[bytes], ctx: Optional[SerializationContext] = None) -> Optional[bytes]:
return self.__deserialize(data, ctx)
def __deserialize(self, data: Optional[bytes], ctx: Optional[SerializationContext] = None) -> Optional[bytes]:
"""
Deserialize a serialized protobuf message with Confluent Schema Registry
framing.
Args:
data (bytes): Serialized protobuf message with Confluent Schema
Registry framing.
ctx (SerializationContext): Metadata relevant to the serialization
operation.
Returns:
Message: Protobuf Message instance.
Raises:
SerializerError: If there was an error reading the Confluent framing
data, or parsing the protobuf serialized message.
"""
if data is None:
return None
subject = self._subject_name_func(ctx, None)
latest_schema = None
if subject is not None and self._registry is not None:
latest_schema = self._get_reader_schema(subject, fmt='serialized')
schema_id = SchemaId(PROTOBUF_TYPE)
payload = self._schema_id_deserializer(data, ctx, schema_id)
msg_index = schema_id.message_indexes
if self._registry is not None:
writer_schema_raw = self._get_writer_schema(schema_id, subject, fmt='serialized')
fd_proto, pool = self._get_parsed_schema(writer_schema_raw)
writer_schema = pool.FindFileByName(fd_proto.name)
writer_desc = self._get_message_desc(pool, writer_schema, msg_index if msg_index is not None else [])
if subject is None:
subject = self._subject_name_func(ctx, writer_desc.full_name)
if subject is not None:
latest_schema = self._get_reader_schema(subject, fmt='serialized')
else:
writer_schema_raw = None
writer_schema = None
if ctx is not None and subject is not None:
payload = self._execute_rules_with_phase(
ctx, subject, RulePhase.ENCODING, RuleMode.READ, None, writer_schema_raw, payload, None, None
)
if isinstance(payload, bytes):
payload = io.BytesIO(payload)
reader_schema_raw: Optional[Schema] = None
if latest_schema is not None and subject is not None and writer_schema_raw is not None:
migrations = self._get_migrations(subject, writer_schema_raw, latest_schema, None)
reader_schema_raw = latest_schema.schema
fd_proto, pool = self._get_parsed_schema(latest_schema.schema)
reader_schema = pool.FindFileByName(fd_proto.name)
else:
migrations = None
reader_schema_raw = writer_schema_raw
reader_schema = writer_schema
if reader_schema is not None:
# Initialize reader desc to first message in file
reader_desc = self._get_message_desc(pool, reader_schema, [0])
# Attempt to find a reader desc with the same name as the writer
reader_desc = reader_schema.message_types_by_name.get(writer_desc.name, reader_desc)
if migrations and ctx is not None and subject is not None:
msg = GetMessageClass(writer_desc)()
try:
msg.ParseFromString(payload.read())
except DecodeError as e:
raise SerializationError(str(e))
obj_dict = json_format.MessageToDict(msg, True)
obj_dict = self._execute_migrations(ctx, subject, migrations, obj_dict)
msg = GetMessageClass(reader_desc)()
msg = json_format.ParseDict(obj_dict, msg)
else:
# Protobuf Messages are self-describing; no need to query schema
msg = self._msg_class()
try:
msg.ParseFromString(payload.read())
except DecodeError as e:
raise SerializationError(str(e))
def field_transformer(rule_ctx, field_transform, message):
return transform(rule_ctx, reader_desc, message, field_transform) # noqa: E731
if ctx is not None and subject is not None:
msg = self._execute_rules(
ctx, subject, RuleMode.READ, None, reader_schema_raw, msg, None, field_transformer
)
return msg
def _get_parsed_schema(self, schema: Schema) -> Tuple[descriptor_pb2.FileDescriptorProto, DescriptorPool]:
result = self._parsed_schemas.get_parsed_schema(schema)
if result is not None:
return result
pool = DescriptorPool()
_init_pool(pool)
_resolve_named_schema(schema, self._registry, pool)
if schema.schema_str is None:
raise ValueError("Schema string cannot be None")
fd_proto = _str_to_proto("default", schema.schema_str)
pool.Add(fd_proto)
self._parsed_schemas.set(schema, (fd_proto, pool))
return fd_proto, pool
def _get_message_desc(self, pool: DescriptorPool, fd: FileDescriptor, msg_index: List[int]) -> Descriptor:
file_desc_proto = descriptor_pb2.FileDescriptorProto()
fd.CopyToProto(file_desc_proto)
(full_name, desc_proto) = self._get_message_desc_proto("", file_desc_proto, msg_index)
package = file_desc_proto.package
qualified_name = package + "." + full_name if package else full_name
return pool.FindMessageTypeByName(qualified_name)
def _get_message_desc_proto(
self,
path: str,
desc: Union[descriptor_pb2.FileDescriptorProto, descriptor_pb2.DescriptorProto],
msg_index: List[int],
) -> Tuple[str, descriptor_pb2.DescriptorProto]:
index = msg_index[0]
if isinstance(desc, descriptor_pb2.FileDescriptorProto):
msg = desc.message_type[index]
path = path + "." + msg.name if path else msg.name
if len(msg_index) == 1:
return path, msg
return self._get_message_desc_proto(path, msg, msg_index[1:])
else:
msg = desc.nested_type[index]
path = path + "." + msg.name if path else msg.name
if len(msg_index) == 1:
return path, msg
return self._get_message_desc_proto(path, msg, msg_index[1:])