#!/usr/bin/env python
#
# Copyright 2016-2020 Confluent Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""
Avro schema registry module: Deals with encoding and decoding of messages with avro schemas
"""
from confluent_kafka import Producer, Consumer
from confluent_kafka.avro.error import ClientError
from confluent_kafka.avro.load import load, loads # noqa
from confluent_kafka.avro.cached_schema_registry_client import CachedSchemaRegistryClient
from confluent_kafka.avro.serializer import (SerializerError, # noqa
KeySerializerError,
ValueSerializerError)
from confluent_kafka.avro.serializer.message_serializer import MessageSerializer
[docs]class AvroProducer(Producer):
"""
Kafka Producer client which does avro schema encoding to messages.
Handles schema registration, Message serialization.
Constructor takes below parameters.
:param dict config: Config parameters containing url for schema registry (``schema.registry.url``)
and the standard Kafka client configuration (``bootstrap.servers`` et.al).
:param str default_key_schema: Optional default avro schema for key
:param str default_value_schema: Optional default avro schema for value
"""
def __init__(self, config, default_key_schema=None,
default_value_schema=None, schema_registry=None, **kwargs):
sr_conf = {key.replace("schema.registry.", ""): value
for key, value in config.items() if key.startswith("schema.registry")}
if sr_conf.get("basic.auth.credentials.source") == 'SASL_INHERIT':
# Fallback to plural 'mechanisms' for backward compatibility
sr_conf['sasl.mechanism'] = config.get('sasl.mechanism', config.get('sasl.mechanisms', ''))
sr_conf['sasl.username'] = config.get('sasl.username', '')
sr_conf['sasl.password'] = config.get('sasl.password', '')
sr_conf['auto.register.schemas'] = config.get('auto.register.schemas', True)
ap_conf = {key: value
for key, value in config.items() if not key.startswith("schema.registry")}
if schema_registry is None:
schema_registry = CachedSchemaRegistryClient(sr_conf)
elif sr_conf.get("url", None) is not None:
raise ValueError("Cannot pass schema_registry along with schema.registry.url config")
super(AvroProducer, self).__init__(ap_conf, **kwargs)
self._serializer = MessageSerializer(schema_registry)
self._key_schema = default_key_schema
self._value_schema = default_value_schema
[docs] def produce(self, **kwargs):
"""
Asynchronously sends message to Kafka by encoding with specified or default avro schema.
:param str topic: topic name
:param object value: An object to serialize
:param str value_schema: Avro schema for value
:param object key: An object to serialize
:param str key_schema: Avro schema for key
Plus any other parameters accepted by confluent_kafka.Producer.produce
:raises SerializerError: On serialization failure
:raises BufferError: If producer queue is full.
:raises KafkaException: For other produce failures.
"""
# get schemas from kwargs if defined
key_schema = kwargs.pop('key_schema', self._key_schema)
value_schema = kwargs.pop('value_schema', self._value_schema)
topic = kwargs.pop('topic', None)
if not topic:
raise ClientError("Topic name not specified.")
value = kwargs.pop('value', None)
key = kwargs.pop('key', None)
if value is not None:
if value_schema:
value = self._serializer.encode_record_with_schema(topic, value_schema, value)
else:
raise ValueSerializerError("Avro schema required for values")
if key is not None:
if key_schema:
key = self._serializer.encode_record_with_schema(topic, key_schema, key, True)
else:
raise KeySerializerError("Avro schema required for key")
super(AvroProducer, self).produce(topic, value, key, **kwargs)
[docs]class AvroConsumer(Consumer):
"""
Kafka Consumer client which does avro schema decoding of messages.
Handles message deserialization.
Constructor takes below parameters
:param dict config: Config parameters containing url for schema registry (``schema.registry.url``)
and the standard Kafka client configuration (``bootstrap.servers`` et.al)
:param schema reader_key_schema: a reader schema for the message key
:param schema reader_value_schema: a reader schema for the message value
:raises ValueError: For invalid configurations
"""
def __init__(self, config, schema_registry=None, reader_key_schema=None, reader_value_schema=None, **kwargs):
sr_conf = {key.replace("schema.registry.", ""): value
for key, value in config.items() if key.startswith("schema.registry")}
if sr_conf.get("basic.auth.credentials.source") == 'SASL_INHERIT':
# Fallback to plural 'mechanisms' for backward compatibility
sr_conf['sasl.mechanism'] = config.get('sasl.mechanism', config.get('sasl.mechanisms', ''))
sr_conf['sasl.username'] = config.get('sasl.username', '')
sr_conf['sasl.password'] = config.get('sasl.password', '')
ap_conf = {key: value
for key, value in config.items() if not key.startswith("schema.registry")}
if schema_registry is None:
schema_registry = CachedSchemaRegistryClient(sr_conf)
elif sr_conf.get("url", None) is not None:
raise ValueError("Cannot pass schema_registry along with schema.registry.url config")
super(AvroConsumer, self).__init__(ap_conf, **kwargs)
self._serializer = MessageSerializer(schema_registry, reader_key_schema, reader_value_schema)
[docs] def poll(self, timeout=None):
"""
This is an overriden method from confluent_kafka.Consumer class. This handles message
deserialization using avro schema
:param float timeout: Poll timeout in seconds (default: indefinite)
:returns: message object with deserialized key and value as dict objects
:rtype: Message
"""
if timeout is None:
timeout = -1
message = super(AvroConsumer, self).poll(timeout)
if message is None:
return None
if not message.error():
try:
if message.value() is not None:
decoded_value = self._serializer.decode_message(message.value(), is_key=False)
message.set_value(decoded_value)
if message.key() is not None:
decoded_key = self._serializer.decode_message(message.key(), is_key=True)
message.set_key(decoded_key)
except SerializerError as e:
raise SerializerError("Message deserialization failed for message at {} [{}] offset {}: {}".format(
message.topic(),
message.partition(),
message.offset(),
e))
return message