from confluent_kafka.cimpl import Producer as _ProducerImpl
from .serialization import (MessageField,
from .error import (KeySerializationError,

[docs]class SerializingProducer(_ProducerImpl): """ A high level Kafka producer with serialization capabilities. `This class is experimental and likely to be removed, or subject to incompatible API changes in future versions of the library. To avoid breaking changes on upgrading, we recommend using serializers directly.` Derived from the :py:class:`Producer` class, overriding the :py:func:`Producer.produce` method to add serialization capabilities. Additional configuration properties: +-------------------------+---------------------+-----------------------------------------------------+ | Property Name | Type | Description | +=========================+=====================+=====================================================+ | | | Callable(obj, SerializationContext) -> bytes | | ``key.serializer`` | callable | | | | | Serializer used for message keys. | +-------------------------+---------------------+-----------------------------------------------------+ | | | Callable(obj, SerializationContext) -> bytes | | ``value.serializer`` | callable | | | | | Serializer used for message values. | +-------------------------+---------------------+-----------------------------------------------------+ Serializers for string, integer and double (:py:class:`StringSerializer`, :py:class:`IntegerSerializer` and :py:class:`DoubleSerializer`) are supplied out-of-the-box in the ``confluent_kafka.serialization`` namespace. Serializers for Protobuf, JSON Schema and Avro (:py:class:`ProtobufSerializer`, :py:class:`JSONSerializer` and :py:class:`AvroSerializer`) with Confluent Schema Registry integration are supplied out-of-the-box in the ``confluent_kafka.schema_registry`` namespace. See Also: - The :ref:`Configuration Guide <pythonclient_configuration>` for in depth information on how to configure the client. - ` <>`_ for a comprehensive set of configuration properties. - ` <>`_ for detailed information on the statistics provided by stats_cb - The :py:class:`Producer` class for inherited methods. Args: conf (producer): SerializingProducer configuration. """ # noqa E501 def __init__(self, conf): conf_copy = conf.copy() self._key_serializer = conf_copy.pop('key.serializer', None) self._value_serializer = conf_copy.pop('value.serializer', None) super(SerializingProducer, self).__init__(conf_copy)
[docs] def produce(self, topic, key=None, value=None, partition=-1, on_delivery=None, timestamp=0, headers=None): """ Produce a message. This is an asynchronous operation. An application may use the ``on_delivery`` argument to pass a function (or lambda) that will be called from :py:func:`SerializingProducer.poll` when the message has been successfully delivered or permanently fails delivery. Note: Currently message headers are not supported on the message returned to the callback. The ``msg.headers()`` will return None even if the original message had headers set. Args: topic (str): Topic to produce message to. key (object, optional): Message payload key. value (object, optional): Message payload value. partition (int, optional): Partition to produce to, else the configured built-in partitioner will be used. on_delivery (callable(KafkaError, Message), optional): Delivery report callback. Called as a side effect of :py:func:`SerializingProducer.poll` or :py:func:`SerializingProducer.flush` on successful or failed delivery. timestamp (float, optional): Message timestamp (CreateTime) in milliseconds since Unix epoch UTC (requires broker >= Default value is current time. headers (dict, optional): Message headers. The header key must be a str while the value must be binary, unicode or None. (Requires broker version >= Raises: BufferError: if the internal producer message queue is full. (``queue.buffering.max.messages`` exceeded). If this happens the application should call :py:func:`SerializingProducer.Poll` and try again. KeySerializationError: If an error occurs during key serialization. ValueSerializationError: If an error occurs during value serialization. KafkaException: For all other errors """ ctx = SerializationContext(topic, MessageField.KEY, headers) if self._key_serializer is not None: try: key = self._key_serializer(key, ctx) except Exception as se: raise KeySerializationError(se) ctx.field = MessageField.VALUE if self._value_serializer is not None: try: value = self._value_serializer(value, ctx) except Exception as se: raise ValueSerializationError(se) super(SerializingProducer, self).produce(topic, value, key, headers=headers, partition=partition, timestamp=timestamp, on_delivery=on_delivery)