Source code for confluent_kafka.serialization

#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Copyright 2020 Confluent Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import struct as _struct
from enum import Enum

from confluent_kafka.error import KafkaException

__all__ = ['Deserializer',
           'IntegerDeserializer',
           'IntegerSerializer',
           'DoubleDeserializer',
           'DoubleSerializer',
           'StringDeserializer',
           'StringSerializer',
           'MessageField',
           'SerializationContext',
           'SerializationError',
           'Serializer']


[docs]class MessageField(str, Enum): """ Enum like object for identifying Message fields. Attributes: KEY (str): Message key VALUE (str): Message value """ NONE = 'none' KEY = 'key' VALUE = 'value' def __str__(self) -> str: return str(self.value)
[docs]class SerializationContext(object): """ SerializationContext provides additional context to the serializer/deserializer about the data it's serializing/deserializing. Args: topic (str): Topic data is being produce to or consumed from. field (MessageField): Describes what part of the message is being serialized. headers (list): List of message header tuples. Defaults to None. """ def __init__(self, topic, field, headers=None): self.topic = topic self.field = field self.headers = headers
[docs]class SerializationError(KafkaException): """Generic error from serializer package""" pass
[docs]class Serializer(object): """ Extensible class from which all Serializer implementations derive. Serializers instruct Kafka clients on how to convert Python objects to bytes. See built-in implementations, listed below, for an example of how to extend this class. Note: This class is not directly instantiable. The derived classes must be used instead. The following implementations are provided by this module. Note: Unless noted elsewhere all numeric types are signed and serialization is big-endian. .. list-table:: :header-rows: 1 * - Name - Type - Binary Format * - DoubleSerializer - float - IEEE 764 binary64 * - IntegerSerializer - int - int32 * - StringSerializer - unicode - unicode(encoding) """ __slots__ = []
[docs] def __call__(self, obj, ctx=None): """ Converts obj to bytes. Args: obj (object): object to be serialized ctx (SerializationContext): Metadata pertaining to the serialization operation Raises: SerializerError if an error occurs during serialization Returns: bytes if obj is not None, otherwise None """ raise NotImplementedError
[docs]class Deserializer(object): """ Extensible class from which all Deserializer implementations derive. Deserializers instruct Kafka clients on how to convert bytes to objects. See built-in implementations, listed below, for an example of how to extend this class. Note: This class is not directly instantiable. The derived classes must be used instead. The following implementations are provided by this module. Note: Unless noted elsewhere all numeric types are signed and serialization is big-endian. .. list-table:: :header-rows: 1 * - Name - Type - Binary Format * - DoubleDeserializer - float - IEEE 764 binary64 * - IntegerDeserializer - int - int32 * - StringDeserializer - unicode - unicode(encoding) """ __slots__ = []
[docs] def __call__(self, value, ctx=None): """ Convert bytes to object Args: value (bytes): bytes to be deserialized ctx (SerializationContext): Metadata pertaining to the serialization operation Raises: SerializerError if an error occurs during deserialization Returns: object if data is not None, otherwise None """ raise NotImplementedError
[docs]class DoubleSerializer(Serializer): """ Serializes float to IEEE 764 binary64. See Also: `DoubleSerializer Javadoc <https://docs.confluent.io/current/clients/javadocs/org/apache/kafka/common/serialization/DoubleSerializer.html>`_ """ # noqa: E501
[docs] def __call__(self, obj, ctx=None): """ Args: obj (object): object to be serialized ctx (SerializationContext): Metadata pertaining to the serialization operation Note: None objects are represented as Kafka Null. Raises: SerializerError if an error occurs during serialization. Returns: IEEE 764 binary64 bytes if obj is not None, otherwise None """ if obj is None: return None try: return _struct.pack('>d', obj) except _struct.error as e: raise SerializationError(str(e))
[docs]class DoubleDeserializer(Deserializer): """ Deserializes float to IEEE 764 binary64. See Also: `DoubleDeserializer Javadoc <https://docs.confluent.io/current/clients/javadocs/org/apache/kafka/common/serialization/DoubleDeserializer.html>`_ """ # noqa: E501
[docs] def __call__(self, value, ctx=None): """ Deserializes float from IEEE 764 binary64 bytes. Args: value (bytes): bytes to be deserialized ctx (SerializationContext): Metadata pertaining to the serialization operation Raises: SerializerError if an error occurs during deserialization. Returns: float if data is not None, otherwise None """ if value is None: return None try: return _struct.unpack('>d', value)[0] except _struct.error as e: raise SerializationError(str(e))
[docs]class IntegerSerializer(Serializer): """ Serializes int to int32 bytes. See Also: `IntegerSerializer Javadoc <https://docs.confluent.io/current/clients/javadocs/org/apache/kafka/common/serialization/IntegerSerializer.html>`_ """ # noqa: E501
[docs] def __call__(self, obj, ctx=None): """ Serializes int as int32 bytes. Args: obj (object): object to be serialized ctx (SerializationContext): Metadata pertaining to the serialization operation Note: None objects are represented as Kafka Null. Raises: SerializerError if an error occurs during serialization Returns: int32 bytes if obj is not None, else None """ if obj is None: return None try: return _struct.pack('>i', obj) except _struct.error as e: raise SerializationError(str(e))
[docs]class IntegerDeserializer(Deserializer): """ Deserializes int to int32 bytes. See Also: `IntegerDeserializer Javadoc <https://docs.confluent.io/current/clients/javadocs/org/apache/kafka/common/serialization/IntegerDeserializer.html>`_ """ # noqa: E501
[docs] def __call__(self, value, ctx=None): """ Deserializes int from int32 bytes. Args: value (bytes): bytes to be deserialized ctx (SerializationContext): Metadata pertaining to the serialization operation Raises: SerializerError if an error occurs during deserialization. Returns: int if data is not None, otherwise None """ if value is None: return None try: return _struct.unpack('>i', value)[0] except _struct.error as e: raise SerializationError(str(e))
[docs]class StringSerializer(Serializer): """ Serializes unicode to bytes per the configured codec. Defaults to ``utf_8``. Note: None objects are represented as Kafka Null. Args: codec (str, optional): encoding scheme. Defaults to utf_8 See Also: `Supported encodings <https://docs.python.org/3/library/codecs.html#standard-encodings>`_ `StringSerializer Javadoc <https://docs.confluent.io/current/clients/javadocs/org/apache/kafka/common/serialization/StringSerializer.html>`_ """ # noqa: E501 def __init__(self, codec='utf_8'): self.codec = codec
[docs] def __call__(self, obj, ctx=None): """ Serializes a str(py2:unicode) to bytes. Compatibility Note: Python 2 str objects must be converted to unicode objects. Python 3 all str objects are already unicode objects. Args: obj (object): object to be serialized ctx (SerializationContext): Metadata pertaining to the serialization operation Raises: SerializerError if an error occurs during serialization. Returns: serialized bytes if obj is not None, otherwise None """ if obj is None: return None try: return obj.encode(self.codec) except _struct.error as e: raise SerializationError(str(e))
[docs]class StringDeserializer(Deserializer): """ Deserializes a str(py2:unicode) from bytes. Args: codec (str, optional): encoding scheme. Defaults to utf_8 See Also: `Supported encodings <https://docs.python.org/3/library/codecs.html#standard-encodings>`_ `StringDeserializer Javadoc <https://docs.confluent.io/current/clients/javadocs/org/apache/kafka/common/serialization/StringDeserializer.html>`_ """ # noqa: E501 def __init__(self, codec='utf_8'): self.codec = codec
[docs] def __call__(self, value, ctx=None): """ Serializes unicode to bytes per the configured codec. Defaults to ``utf_8``. Compatibility Note: Python 2 str objects must be converted to unicode objects by the application prior to using this serializer. Python 3 all str objects are already unicode objects. Args: value (bytes): bytes to be deserialized ctx (SerializationContext): Metadata pertaining to the serialization operation Raises: SerializerError if an error occurs during deserialization. Returns: unicode if data is not None, otherwise None """ if value is None: return None try: return value.decode(self.codec) except _struct.error as e: raise SerializationError(str(e))