Source code for confluent_kafka.serialization

#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Copyright 2020 Confluent Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import struct as _struct
from enum import Enum
from typing import Any, List, Optional

from confluent_kafka._types import HeadersType
from confluent_kafka.error import KafkaException

__all__ = [
    'Deserializer',
    'IntegerDeserializer',
    'IntegerSerializer',
    'DoubleDeserializer',
    'DoubleSerializer',
    'StringDeserializer',
    'StringSerializer',
    'MessageField',
    'SerializationContext',
    'SerializationError',
    'Serializer',
]


[docs] class MessageField(str, Enum): """ Enum like object for identifying Message fields. Attributes: KEY (str): Message key VALUE (str): Message value """ NONE = 'none' KEY = 'key' VALUE = 'value' def __str__(self) -> str: return str(self.value)
[docs] class SerializationContext(object): """ SerializationContext provides additional context to the serializer/deserializer about the data it's serializing/deserializing. Args: topic (str): Topic data is being produce to or consumed from. field (MessageField): Describes what part of the message is being serialized. headers (list): List of message header tuples. Defaults to None. """ def __init__(self, topic: str, field: MessageField, headers: Optional[HeadersType] = None) -> None: self.topic = topic self.field = field self.headers = headers
[docs] class SerializationError(KafkaException): """Generic error from serializer package""" pass
[docs] class Serializer(object): """ Extensible class from which all Serializer implementations derive. Serializers instruct Kafka clients on how to convert Python objects to bytes. See built-in implementations, listed below, for an example of how to extend this class. Note: This class is not directly instantiable. The derived classes must be used instead. The following implementations are provided by this module. Note: Unless noted elsewhere all numeric types are signed and serialization is big-endian. .. list-table:: :header-rows: 1 * - Name - Type - Binary Format * - DoubleSerializer - float - IEEE 764 binary64 * - IntegerSerializer - int - int32 * - StringSerializer - unicode - unicode(encoding) """ __slots__: List[str] = []
[docs] def __call__(self, obj: Any, ctx: Optional[SerializationContext] = None) -> Optional[bytes]: """ Converts obj to bytes. Args: obj (object): object to be serialized ctx (SerializationContext): Metadata pertaining to the serialization operation Raises: SerializerError if an error occurs during serialization Returns: bytes if obj is not None, otherwise None """ raise NotImplementedError
[docs] class Deserializer(object): """ Extensible class from which all Deserializer implementations derive. Deserializers instruct Kafka clients on how to convert bytes to objects. See built-in implementations, listed below, for an example of how to extend this class. Note: This class is not directly instantiable. The derived classes must be used instead. The following implementations are provided by this module. Note: Unless noted elsewhere all numeric types are signed and serialization is big-endian. .. list-table:: :header-rows: 1 * - Name - Type - Binary Format * - DoubleDeserializer - float - IEEE 764 binary64 * - IntegerDeserializer - int - int32 * - StringDeserializer - unicode - unicode(encoding) """ __slots__: List[str] = []
[docs] def __call__(self, value: Optional[bytes], ctx: Optional[SerializationContext] = None) -> Any: """ Convert bytes to object Args: value (bytes): bytes to be deserialized ctx (SerializationContext): Metadata pertaining to the serialization operation Raises: SerializerError if an error occurs during deserialization Returns: object if data is not None, otherwise None """ raise NotImplementedError
[docs] class DoubleSerializer(Serializer): """ Serializes float to IEEE 764 binary64. See Also: `DoubleSerializer Javadoc <https://docs.confluent.io/current/clients/javadocs/org/apache/kafka/common/serialization/DoubleSerializer.html>`_ """ # noqa: E501
[docs] def __call__(self, obj: Optional[float], ctx: Optional[SerializationContext] = None) -> Optional[bytes]: """ Args: obj (object): object to be serialized ctx (SerializationContext): Metadata pertaining to the serialization operation Note: None objects are represented as Kafka Null. Raises: SerializerError if an error occurs during serialization. Returns: IEEE 764 binary64 bytes if obj is not None, otherwise None """ if obj is None: return None try: return _struct.pack('>d', obj) except _struct.error as e: raise SerializationError(str(e))
[docs] class DoubleDeserializer(Deserializer): """ Deserializes float to IEEE 764 binary64. See Also: `DoubleDeserializer Javadoc <https://docs.confluent.io/current/clients/javadocs/org/apache/kafka/common/serialization/DoubleDeserializer.html>`_ """ # noqa: E501
[docs] def __call__(self, value: Optional[bytes], ctx: Optional[SerializationContext] = None) -> Optional[float]: """ Deserializes float from IEEE 764 binary64 bytes. Args: value (bytes): bytes to be deserialized ctx (SerializationContext): Metadata pertaining to the serialization operation Raises: SerializerError if an error occurs during deserialization. Returns: float if data is not None, otherwise None """ if value is None: return None try: return _struct.unpack('>d', value)[0] except _struct.error as e: raise SerializationError(str(e))
[docs] class IntegerSerializer(Serializer): """ Serializes int to int32 bytes. See Also: `IntegerSerializer Javadoc <https://docs.confluent.io/current/clients/javadocs/org/apache/kafka/common/serialization/IntegerSerializer.html>`_ """ # noqa: E501
[docs] def __call__(self, obj: Optional[int], ctx: Optional[SerializationContext] = None) -> Optional[bytes]: """ Serializes int as int32 bytes. Args: obj (object): object to be serialized ctx (SerializationContext): Metadata pertaining to the serialization operation Note: None objects are represented as Kafka Null. Raises: SerializerError if an error occurs during serialization Returns: int32 bytes if obj is not None, else None """ if obj is None: return None try: return _struct.pack('>i', obj) except _struct.error as e: raise SerializationError(str(e))
[docs] class IntegerDeserializer(Deserializer): """ Deserializes int to int32 bytes. See Also: `IntegerDeserializer Javadoc <https://docs.confluent.io/current/clients/javadocs/org/apache/kafka/common/serialization/IntegerDeserializer.html>`_ """ # noqa: E501
[docs] def __call__(self, value: Optional[bytes], ctx: Optional[SerializationContext] = None) -> Optional[int]: """ Deserializes int from int32 bytes. Args: value (bytes): bytes to be deserialized ctx (SerializationContext): Metadata pertaining to the serialization operation Raises: SerializerError if an error occurs during deserialization. Returns: int if data is not None, otherwise None """ if value is None: return None try: return _struct.unpack('>i', value)[0] except _struct.error as e: raise SerializationError(str(e))
[docs] class StringSerializer(Serializer): """ Serializes unicode to bytes per the configured codec. Defaults to ``utf_8``. Note: None objects are represented as Kafka Null. Args: codec (str, optional): encoding scheme. Defaults to utf_8 See Also: `Supported encodings <https://docs.python.org/3/library/codecs.html#standard-encodings>`_ `StringSerializer Javadoc <https://docs.confluent.io/current/clients/javadocs/org/apache/kafka/common/serialization/StringSerializer.html>`_ """ # noqa: E501 def __init__(self, codec: str = 'utf_8') -> None: self.codec = codec
[docs] def __call__(self, obj: Optional[str], ctx: Optional[SerializationContext] = None) -> Optional[bytes]: """ Serializes a str(py2:unicode) to bytes. Compatibility Note: Python 2 str objects must be converted to unicode objects. Python 3 all str objects are already unicode objects. Args: obj (object): object to be serialized ctx (SerializationContext): Metadata pertaining to the serialization operation Raises: SerializerError if an error occurs during serialization. Returns: serialized bytes if obj is not None, otherwise None """ if obj is None: return None try: return obj.encode(self.codec) except _struct.error as e: raise SerializationError(str(e))
[docs] class StringDeserializer(Deserializer): """ Deserializes a str(py2:unicode) from bytes. Args: codec (str, optional): encoding scheme. Defaults to utf_8 See Also: `Supported encodings <https://docs.python.org/3/library/codecs.html#standard-encodings>`_ `StringDeserializer Javadoc <https://docs.confluent.io/current/clients/javadocs/org/apache/kafka/common/serialization/StringDeserializer.html>`_ """ # noqa: E501 def __init__(self, codec: str = 'utf_8') -> None: self.codec = codec
[docs] def __call__(self, value: Optional[bytes], ctx: Optional[SerializationContext] = None) -> Optional[str]: """ Serializes unicode to bytes per the configured codec. Defaults to ``utf_8``. Compatibility Note: Python 2 str objects must be converted to unicode objects by the application prior to using this serializer. Python 3 all str objects are already unicode objects. Args: value (bytes): bytes to be deserialized ctx (SerializationContext): Metadata pertaining to the serialization operation Raises: SerializerError if an error occurs during deserialization. Returns: unicode if data is not None, otherwise None """ if value is None: return None try: return value.decode(self.codec) except _struct.error as e: raise SerializationError(str(e))