#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Copyright 2020 Confluent Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import io
import json
from typing import Any, Callable, Dict, Optional, Union, cast
from fastavro import schemaless_reader, schemaless_writer
from confluent_kafka.schema_registry import (
RuleMode,
Schema,
SchemaRegistryClient,
dual_schema_id_deserializer,
prefix_schema_id_serializer,
topic_subject_name_strategy,
)
from confluent_kafka.schema_registry.common.avro import (
AVRO_TYPE,
AvroSchema,
_ContextStringIO,
_schema_loads,
get_inline_tags,
parse_schema_with_repo,
transform,
)
from confluent_kafka.schema_registry.common.schema_registry_client import RulePhase
from confluent_kafka.schema_registry.rule_registry import RuleRegistry
from confluent_kafka.schema_registry.serde import (
BaseDeserializer,
BaseSerializer,
ParsedSchemaCache,
SchemaId,
)
from confluent_kafka.serialization import SerializationContext, SerializationError
__all__ = [
'_resolve_named_schema',
'AvroSerializer',
'AvroDeserializer',
]
def _resolve_named_schema(schema: Schema, schema_registry_client: SchemaRegistryClient) -> Dict[str, AvroSchema]:
"""
Resolves named schemas referenced by the provided schema recursively.
:param schema: Schema to resolve named schemas for.
:param schema_registry_client: SchemaRegistryClient to use for retrieval.
:return: named_schemas dict.
"""
named_schemas = {}
if schema.references is not None:
for ref in schema.references:
if ref.subject is None or ref.version is None:
raise TypeError("Subject or version cannot be None")
referenced_schema = schema_registry_client.get_version(ref.subject, ref.version, True)
ref_named_schemas = _resolve_named_schema(referenced_schema.schema, schema_registry_client)
if referenced_schema.schema.schema_str is None:
raise TypeError("Schema string cannot be None")
parsed_schema = parse_schema_with_repo(referenced_schema.schema.schema_str, named_schemas=ref_named_schemas)
named_schemas.update(ref_named_schemas)
if ref.name is None:
raise TypeError("Name cannot be None")
named_schemas[ref.name] = parsed_schema
return named_schemas
[docs]
class AvroSerializer(BaseSerializer):
"""
Serializer that outputs Avro binary encoded data with Confluent Schema Registry framing.
Configuration properties:
+-----------------------------------+----------+--------------------------------------------------+
| Property Name | Type | Description |
+===================================+==========+==================================================+
| ``auto.register.schemas`` | bool | If True, automatically register the configured |
| | | schema with Confluent Schema Registry if it has |
| | | not previously been associated with the relevant |
| | | subject (determined via subject.name.strategy). |
| | | |
| | | Defaults to True. |
+-----------------------------------+----------+--------------------------------------------------+
| ``normalize.schemas`` | bool | Whether to normalize schemas, which will |
| | | transform schemas to have a consistent format, |
| | | including ordering properties and references. |
+-----------------------------------+----------+--------------------------------------------------+
| ``use.schema.id`` | int | Whether to use the given schema ID for |
| | | serialization. |
| | | |
+-----------------------------------+----------+--------------------------------------------------+
| ``use.latest.version`` | bool | Whether to use the latest subject version for |
| | | serialization. |
| | | |
| | | WARNING: There is no check that the latest |
| | | schema is backwards compatible with the object |
| | | being serialized. |
| | | |
| | | Defaults to False. |
+-----------------------------------+----------+--------------------------------------------------+
| ``use.latest.with.metadata`` | dict | Whether to use the latest subject version with |
| | | the given metadata. |
| | | |
| | | WARNING: There is no check that the latest |
| | | schema is backwards compatible with the object |
| | | being serialized. |
| | | |
| | | Defaults to None. |
+-----------------------------------+----------+--------------------------------------------------+
| ``subject.name.strategy`` | callable | Callable(SerializationContext, str) -> str |
| | | |
| | | Defines how Schema Registry subject names are |
| | | constructed. Standard naming strategies are |
| | | defined in the confluent_kafka.schema_registry |
| | | namespace. |
| | | |
| | | Defaults to topic_subject_name_strategy. |
+-----------------------------------+----------+--------------------------------------------------+
| ``schema.id.serializer`` | callable | Callable(bytes, SerializationContext, schema_id) |
| | | -> bytes |
| | | |
| | | Defines how the schema id/guid is serialized. |
| | | Defaults to prefix_schema_id_serializer. |
+-----------------------------------+----------+--------------------------------------------------+
| ``validate.strict`` | bool | If set to True, an error will be raised if |
| | | records do not contain exactly the same |
| | | fields that the schema states. |
| | | |
| | | Defaults to False. |
+-----------------------------------+----------+--------------------------------------------------+
| ``validate.strict.allow.default`` | bool | If set to True, an error will be raised |
| | | if records do not contain exactly the same |
| | | fields that the schema states, unless it is a |
| | | missing field that has a default value in the |
| | | schema. |
| | | |
| | | Defaults to False. |
+-----------------------------------+----------+--------------------------------------------------+
Schemas are registered against subject names in Confluent Schema Registry that
define a scope in which the schemas can be evolved. By default, the subject name
is formed by concatenating the topic name with the message field (key or value)
separated by a hyphen.
i.e. {topic name}-{message field}
Alternative naming strategies may be configured with the property
``subject.name.strategy``.
Supported subject name strategies:
+--------------------------------------+------------------------------+
| Subject Name Strategy | Output Format |
+======================================+==============================+
| topic_subject_name_strategy(default) | {topic name}-{message field} |
+--------------------------------------+------------------------------+
| topic_record_subject_name_strategy | {topic name}-{record name} |
+--------------------------------------+------------------------------+
| record_subject_name_strategy | {record name} |
+--------------------------------------+------------------------------+
See `Subject name strategy <https://docs.confluent.io/current/schema-registry/serializer-formatter.html#subject-name-strategy>`_ for additional details.
Note:
Prior to serialization, all values must first be converted to
a dict instance. This may handled manually prior to calling
:py:func:`Producer.produce()` or by registering a `to_dict`
callable with AvroSerializer.
See ``avro_producer.py`` in the examples directory for example usage.
Note:
Tuple notation can be used to determine which branch of an ambiguous union to take.
See `fastavro notation <https://fastavro.readthedocs.io/en/latest/writer.html#using-the-tuple-notation-to-specify-which-branch-of-a-union-to-take>`_
Args:
schema_registry_client (SchemaRegistryClient): Schema Registry client instance.
schema_str (str or Schema):
Avro `Schema Declaration. <https://avro.apache.org/docs/current/spec.html#schemas>`_
Accepts either a string or a :py:class:`Schema` instance. Note that string
definitions cannot reference other schemas. For referencing other schemas,
use a :py:class:`Schema` instance.
to_dict (callable, optional): Callable(object, SerializationContext) -> dict. Converts object to a dict.
conf (dict): AvroSerializer configuration.
""" # noqa: E501
__slots__ = [
'_known_subjects',
'_parsed_schema',
'_schema',
'_schema_id',
'_schema_name',
'_to_dict',
'_parsed_schemas',
'_strict',
'_strict_allow_default',
]
_default_conf = {
'auto.register.schemas': True,
'normalize.schemas': False,
'use.schema.id': None,
'use.latest.version': False,
'use.latest.with.metadata': None,
'subject.name.strategy': topic_subject_name_strategy,
'schema.id.serializer': prefix_schema_id_serializer,
'validate.strict': False,
'validate.strict.allow.default': False,
}
def __init_impl(
self,
schema_registry_client: SchemaRegistryClient,
schema_str: Union[str, Schema, None] = None,
to_dict: Optional[Callable[[object, SerializationContext], dict]] = None,
conf: Optional[dict] = None,
rule_conf: Optional[dict] = None,
rule_registry: Optional[RuleRegistry] = None,
):
super().__init__()
if isinstance(schema_str, str):
schema = _schema_loads(schema_str)
elif isinstance(schema_str, Schema):
schema = schema_str
else:
schema = None
self._registry = schema_registry_client
self._schema_id: Optional[SchemaId] = None
self._rule_registry = rule_registry if rule_registry else RuleRegistry.get_global_instance()
self._known_subjects: set[str] = set()
self._parsed_schemas = ParsedSchemaCache()
if to_dict is not None and not callable(to_dict):
raise ValueError(
"to_dict must be callable with the signature " "to_dict(object, SerializationContext)->dict"
)
self._to_dict = to_dict
conf_copy = self._default_conf.copy()
if conf is not None:
conf_copy.update(conf)
self._auto_register = cast(bool, conf_copy.pop('auto.register.schemas'))
if not isinstance(self._auto_register, bool):
raise ValueError("auto.register.schemas must be a boolean value")
self._normalize_schemas = cast(bool, conf_copy.pop('normalize.schemas'))
if not isinstance(self._normalize_schemas, bool):
raise ValueError("normalize.schemas must be a boolean value")
self._use_schema_id = cast(Optional[int], conf_copy.pop('use.schema.id'))
if self._use_schema_id is not None and not isinstance(self._use_schema_id, int):
raise ValueError("use.schema.id must be an int value")
self._use_latest_version = cast(bool, conf_copy.pop('use.latest.version'))
if not isinstance(self._use_latest_version, bool):
raise ValueError("use.latest.version must be a boolean value")
if self._use_latest_version and self._auto_register:
raise ValueError("cannot enable both use.latest.version and auto.register.schemas")
self._use_latest_with_metadata = cast(Optional[dict], conf_copy.pop('use.latest.with.metadata'))
if self._use_latest_with_metadata is not None and not isinstance(self._use_latest_with_metadata, dict):
raise ValueError("use.latest.with.metadata must be a dict value")
self._subject_name_func = cast(
Callable[[Optional[SerializationContext], Optional[str]], Optional[str]],
conf_copy.pop('subject.name.strategy'),
)
if not callable(self._subject_name_func):
raise ValueError("subject.name.strategy must be callable")
self._schema_id_serializer = cast(
Callable[[bytes, Optional[SerializationContext], Any], bytes], conf_copy.pop('schema.id.serializer')
)
if not callable(self._schema_id_serializer):
raise ValueError("schema.id.serializer must be callable")
self._strict = cast(bool, conf_copy.pop('validate.strict'))
if not isinstance(self._strict, bool):
raise ValueError("validate.strict must be a boolean value")
self._strict_allow_default = cast(bool, conf_copy.pop('validate.strict.allow.default'))
if not isinstance(self._strict_allow_default, bool):
raise ValueError("validate.strict.allow.default must be a boolean value")
if len(conf_copy) > 0:
raise ValueError("Unrecognized properties: {}".format(", ".join(conf_copy.keys())))
if schema:
parsed_schema = self._get_parsed_schema(schema)
if isinstance(parsed_schema, list):
# if parsed_schema is a list, we have an Avro union and there
# is no valid schema name. This is fine because the only use of
# schema_name is for supplying the subject name to the registry
# and union types should use topic_subject_name_strategy, which
# just discards the schema name anyway
schema_name = None
elif isinstance(parsed_schema, dict):
# The Avro spec states primitives have a name equal to their type
# i.e. {"type": "string"} has a name of string.
# This function does not comply.
# https://github.com/fastavro/fastavro/issues/415
if schema.schema_str is not None:
schema_dict = json.loads(schema.schema_str)
schema_name = parsed_schema.get("name", schema_dict.get("type"))
else:
schema_name = None
else:
schema_name = None
else:
schema_name = None
parsed_schema = None
self._schema = schema
self._schema_name = schema_name
self._parsed_schema = parsed_schema
for rule in self._rule_registry.get_executors():
rule.configure(self._registry.config() if self._registry else {}, rule_conf if rule_conf else {})
__init__ = __init_impl
[docs]
def __call__( # type: ignore[override]
self, obj: object, ctx: Optional[SerializationContext] = None
) -> Optional[bytes]:
return self.__serialize(obj, ctx)
def __serialize(self, obj: object, ctx: Optional[SerializationContext] = None) -> Optional[bytes]:
"""
Serializes an object to Avro binary format, prepending it with Confluent
Schema Registry framing.
Args:
obj (object): The object instance to serialize.
ctx (SerializationContext): Metadata pertaining to the serialization operation.
Raises:
TypeError or ValueError: If any error occurs serializing obj.
SchemaRegistryError: If there was an error registering the schema with
Schema Registry, or auto.register.schemas is
false and the schema was not registered.
Returns:
bytes: Confluent Schema Registry encoded Avro bytes
"""
if obj is None:
return None
subject = self._subject_name_func(ctx, self._schema_name)
latest_schema = self._get_reader_schema(subject) if subject else None
if latest_schema is not None:
self._schema_id = SchemaId(AVRO_TYPE, latest_schema.schema_id, latest_schema.guid)
elif subject is not None and subject not in self._known_subjects:
# Check to ensure this schema has been registered under subject_name.
if self._auto_register:
# The schema name will always be the same. We can't however register
# a schema without a subject so we set the schema_id here to handle
# the initial registration.
registered_schema = self._registry.register_schema_full_response(
subject, self._schema, normalize_schemas=self._normalize_schemas
)
self._schema_id = SchemaId(AVRO_TYPE, registered_schema.schema_id, registered_schema.guid)
else:
registered_schema = self._registry.lookup_schema(
subject, self._schema, normalize_schemas=self._normalize_schemas
)
self._schema_id = SchemaId(AVRO_TYPE, registered_schema.schema_id, registered_schema.guid)
self._known_subjects.add(subject)
value: Any
parsed_schema: Any
if self._to_dict is not None:
if ctx is None:
raise TypeError("SerializationContext cannot be None")
value = self._to_dict(obj, ctx)
else:
value = obj
if latest_schema is not None and ctx is not None and subject is not None:
parsed_schema = self._get_parsed_schema(latest_schema.schema)
def field_transformer(rule_ctx, field_transform, msg):
return transform(rule_ctx, parsed_schema, msg, field_transform) # noqa: E731
value = self._execute_rules(
ctx,
subject,
RuleMode.WRITE,
None,
latest_schema.schema,
value,
get_inline_tags(parsed_schema),
field_transformer,
)
else:
parsed_schema = self._parsed_schema
with _ContextStringIO() as fo:
# Check if it's a simple bytes type
is_bytes = parsed_schema == "bytes" or (
isinstance(parsed_schema, dict) and parsed_schema.get("type") == "bytes"
)
if is_bytes:
# For simple bytes type, write value directly
buffer = value if isinstance(value, bytes) else value.encode()
else:
# write the record to the rest of the buffer
schemaless_writer(
fo, parsed_schema, value, strict=self._strict, strict_allow_default=self._strict_allow_default
)
buffer = fo.getvalue()
if latest_schema is not None and ctx is not None and subject is not None:
buffer = self._execute_rules_with_phase(
ctx, subject, RulePhase.ENCODING, RuleMode.WRITE, None, latest_schema.schema, buffer, None, None
)
return self._schema_id_serializer(buffer, ctx, self._schema_id)
def _get_parsed_schema(self, schema: Schema) -> AvroSchema:
parsed_schema = self._parsed_schemas.get_parsed_schema(schema)
if parsed_schema is not None:
return parsed_schema
named_schemas = _resolve_named_schema(schema, self._registry)
if schema.schema_str is None:
raise TypeError("Schema string cannot be None")
prepared_schema = _schema_loads(schema.schema_str)
if prepared_schema.schema_str is None:
raise TypeError("Prepared schema string cannot be None")
parsed_schema = parse_schema_with_repo(prepared_schema.schema_str, named_schemas=named_schemas)
self._parsed_schemas.set(schema, parsed_schema)
return parsed_schema
[docs]
class AvroDeserializer(BaseDeserializer):
"""
Deserializer for Avro binary encoded data with Confluent Schema Registry
framing.
+-----------------------------+----------+--------------------------------------------------+
| Property Name | Type | Description |
+-----------------------------+----------+--------------------------------------------------+
| | | Whether to use the latest subject version for |
| ``use.latest.version`` | bool | deserialization. |
| | | |
| | | Defaults to False. |
+-----------------------------+----------+--------------------------------------------------+
| | | Whether to use the latest subject version with |
| ``use.latest.with.metadata``| dict | the given metadata. |
| | | |
| | | Defaults to None. |
+-----------------------------+----------+--------------------------------------------------+
| | | Callable(SerializationContext, str) -> str |
| | | |
| ``subject.name.strategy`` | callable | Defines how Schema Registry subject names are |
| | | constructed. Standard naming strategies are |
| | | defined in the confluent_kafka.schema_registry |
| | | namespace. |
| | | |
| | | Defaults to topic_subject_name_strategy. |
+-----------------------------+----------+--------------------------------------------------+
| | | Callable(bytes, SerializationContext, schema_id) |
| | | -> io.BytesIO |
| | | |
| ``schema.id.deserializer`` | callable | Defines how the schema id/guid is deserialized. |
| | | Defaults to dual_schema_id_deserializer. |
+-----------------------------+----------+--------------------------------------------------+
Note:
By default, Avro complex types are returned as dicts. This behavior can
be overridden by registering a callable ``from_dict`` with the deserializer to
convert the dicts to the desired type.
See ``avro_consumer.py`` in the examples directory in the examples
directory for example usage.
Args:
schema_registry_client (SchemaRegistryClient): Confluent Schema Registry
client instance.
schema_str (str, Schema, optional): Avro reader schema declaration Accepts
either a string or a :py:class:`Schema` instance. If not provided, the
writer schema will be used as the reader schema. Note that string
definitions cannot reference other schemas. For referencing other schemas,
use a :py:class:`Schema` instance.
from_dict (callable, optional): Callable(dict, SerializationContext) -> object.
Converts a dict to an instance of some object.
return_record_name (bool): If True, when reading a union of records, the result will
be a tuple where the first value is the name of the record and the second value is
the record itself. Defaults to False.
See Also:
`Apache Avro Schema Declaration <https://avro.apache.org/docs/current/spec.html#schemas>`_
`Apache Avro Schema Resolution <https://avro.apache.org/docs/1.8.2/spec.html#Schema+Resolution>`_
"""
__slots__ = ['_reader_schema', '_from_dict', '_return_record_name', '_schema', '_parsed_schemas']
_default_conf = {
'use.latest.version': False,
'use.latest.with.metadata': None,
'subject.name.strategy': topic_subject_name_strategy,
'schema.id.deserializer': dual_schema_id_deserializer,
}
def __init_impl(
self,
schema_registry_client: SchemaRegistryClient,
schema_str: Union[str, Schema, None] = None,
from_dict: Optional[Callable[[dict, SerializationContext], object]] = None,
return_record_name: bool = False,
conf: Optional[dict] = None,
rule_conf: Optional[dict] = None,
rule_registry: Optional[RuleRegistry] = None,
):
super().__init__()
schema = None
if schema_str is not None:
if isinstance(schema_str, str):
schema = _schema_loads(schema_str)
elif isinstance(schema_str, Schema):
schema = schema_str
else:
raise TypeError('You must pass either schema string or schema object')
self._schema = schema
self._registry = schema_registry_client
self._rule_registry = rule_registry if rule_registry else RuleRegistry.get_global_instance()
self._parsed_schemas = ParsedSchemaCache()
self._use_schema_id = None
conf_copy = self._default_conf.copy()
if conf is not None:
conf_copy.update(conf)
self._use_latest_version = cast(bool, conf_copy.pop('use.latest.version'))
if not isinstance(self._use_latest_version, bool):
raise ValueError("use.latest.version must be a boolean value")
self._use_latest_with_metadata = cast(Optional[dict], conf_copy.pop('use.latest.with.metadata'))
if self._use_latest_with_metadata is not None and not isinstance(self._use_latest_with_metadata, dict):
raise ValueError("use.latest.with.metadata must be a dict value")
self._subject_name_func = cast(
Callable[[Optional[SerializationContext], Optional[str]], Optional[str]],
conf_copy.pop('subject.name.strategy'),
)
if not callable(self._subject_name_func):
raise ValueError("subject.name.strategy must be callable")
self._schema_id_deserializer = cast(
Callable[[bytes, Optional[SerializationContext], Any], io.BytesIO], conf_copy.pop('schema.id.deserializer')
)
if not callable(self._schema_id_deserializer):
raise ValueError("schema.id.deserializer must be callable")
if len(conf_copy) > 0:
raise ValueError("Unrecognized properties: {}".format(", ".join(conf_copy.keys())))
self._reader_schema: Optional[AvroSchema]
if schema and self._schema is not None:
self._reader_schema = self._get_parsed_schema(self._schema)
else:
self._reader_schema = None
if from_dict is not None and not callable(from_dict):
raise ValueError(
"from_dict must be callable with the signature " "from_dict(SerializationContext, dict) -> object"
)
self._from_dict = from_dict
self._return_record_name = return_record_name
if not isinstance(self._return_record_name, bool):
raise ValueError("return_record_name must be a boolean value")
for rule in self._rule_registry.get_executors():
rule.configure(self._registry.config() if self._registry else {}, rule_conf if rule_conf else {})
__init__ = __init_impl
[docs]
def __call__(self, data: Optional[bytes], ctx: Optional[SerializationContext] = None) -> Union[dict, object, None]:
return self.__deserialize(data, ctx)
def __deserialize(
self, data: Optional[bytes], ctx: Optional[SerializationContext] = None
) -> Union[dict, object, None]:
"""
Deserialize Avro binary encoded data with Confluent Schema Registry framing to
a dict, or object instance according to from_dict, if specified.
Arguments:
data (bytes): bytes
ctx (SerializationContext): Metadata relevant to the serialization
operation.
Raises:
SerializerError: if an error occurs parsing data.
Returns:
object: If data is None, then None. Else, a dict, or object instance according
to from_dict, if specified.
""" # noqa: E501
if data is None:
return None
if len(data) <= 5:
raise SerializationError(
"Expecting data framing of length 6 bytes or "
"more but total data size is {} bytes. This "
"message was not produced with a Confluent "
"Schema Registry serializer".format(len(data))
)
subject = self._subject_name_func(ctx, None) if ctx else None
latest_schema = None
if subject is not None:
latest_schema = self._get_reader_schema(subject)
schema_id = SchemaId(AVRO_TYPE)
payload = self._schema_id_deserializer(data, ctx, schema_id)
writer_schema_raw = self._get_writer_schema(schema_id, subject)
writer_schema = self._get_parsed_schema(writer_schema_raw)
if subject is None:
subject = (
self._subject_name_func(ctx, writer_schema.get("name")) if ctx else None # type: ignore[union-attr]
)
if subject is not None:
latest_schema = self._get_reader_schema(subject)
if ctx is not None and subject is not None:
payload = self._execute_rules_with_phase(
ctx, subject, RulePhase.ENCODING, RuleMode.READ, None, writer_schema_raw, payload, None, None
)
if isinstance(payload, bytes):
payload = io.BytesIO(payload)
reader_schema: Optional[AvroSchema]
if latest_schema is not None and subject is not None:
migrations = self._get_migrations(subject, writer_schema_raw, latest_schema, None)
reader_schema_raw = latest_schema.schema
reader_schema = self._get_parsed_schema(latest_schema.schema)
elif self._schema is not None:
migrations = None
reader_schema_raw = self._schema
reader_schema = self._reader_schema
else:
migrations = None
reader_schema_raw = writer_schema_raw
reader_schema = writer_schema
# Check if it's a simple bytes type
is_bytes = writer_schema == "bytes" or (
isinstance(writer_schema, dict) and writer_schema.get("type") == "bytes"
)
if migrations and ctx is not None and subject is not None:
if is_bytes:
# For simple bytes type, read payload directly
obj_dict = payload.read()
else:
obj_dict = schemaless_reader(payload, writer_schema, None, self._return_record_name)
obj_dict = self._execute_migrations(ctx, subject, migrations, obj_dict)
else:
if is_bytes:
# For simple bytes type, read payload directly
obj_dict = payload.read()
else:
obj_dict = schemaless_reader(payload, writer_schema, reader_schema, self._return_record_name)
def field_transformer(rule_ctx, field_transform, message):
return transform(rule_ctx, reader_schema, message, field_transform) # noqa: E731
if ctx is not None and subject is not None:
inline_tags = get_inline_tags(reader_schema) if reader_schema is not None else None
obj_dict = self._execute_rules(
ctx, subject, RuleMode.READ, None, reader_schema_raw, obj_dict, inline_tags, field_transformer
)
if self._from_dict is not None:
if ctx is None:
raise TypeError("SerializationContext cannot be None")
return self._from_dict(obj_dict, ctx)
return obj_dict
def _get_parsed_schema(self, schema: Schema) -> AvroSchema:
parsed_schema = self._parsed_schemas.get_parsed_schema(schema)
if parsed_schema is not None:
return parsed_schema
named_schemas = _resolve_named_schema(schema, self._registry)
if schema.schema_str is None:
raise TypeError("Schema string cannot be None")
prepared_schema = _schema_loads(schema.schema_str)
if prepared_schema.schema_str is None:
raise TypeError("Prepared schema string cannot be None")
parsed_schema = parse_schema_with_repo(prepared_schema.schema_str, named_schemas=named_schemas)
self._parsed_schemas.set(schema, parsed_schema)
return parsed_schema