Important

You are viewing documentation for an older version of Confluent Platform. For the latest, click here.

Kafka Python Client

Confluent develops and maintains confluent-kafka-python, a Python Client for Apache Kafka® that provides a high-level Producer, Consumer and AdminClient compatible with all Kafka brokers >= v0.8, Confluent Cloud and Confluent Platform.

Python Client installation

The client is available on PyPI and can be installed using pip:

pip install confluent-kafka

You can install it globally, or within a virtualenv.

Note

The confluent-kafka Python package is a binding on top of the C client librdkafka. It comes bundled with a pre-built version of librdkafka which does not include GSSAPI/Kerberos support. For information how to install a version that supports GSSAPI, see the installation instructions.

Python Client example code

For Hello World examples of Kafka clients in Python, see Python. All examples include a producer and consumer that can connect to any Kafka cluster running on-premises or in Confluent Cloud. They also include examples of how to produce and consume Avro data with Schema Registry.

Kafka Producer

Initialization

The Producer is configured using a dictionary as follows:

from confluent_kafka import Producer
import socket

conf = {'bootstrap.servers': "host1:9092,host2:9092",
        'client.id': socket.gethostname()}

producer = Producer(conf)

For information on the available configuration properties, refer to the API Documentation.

Asynchronous writes

To initiate sending a message to Kafka, call the produce method, passing in the message value (which may be None) and optionally a key, partition, and callback. The produce call will complete immediately and does not return a value. A KafkaException will be thrown if the message could not be enqueued due to librdkafka’s local produce queue being full.

producer.produce(topic, key="key", value="value")

To receive notification of delivery success or failure, you can pass a callback parameter. This can be any callable, for example, a lambda, function, bound method, or callable object. Although the produce() method enqueues message immediately for batching, compression and transmission to broker, no delivery notification events will be propagated until poll() is invoked.

def acked(err, msg):
    if err is not None:
        print("Failed to deliver message: %s: %s" % (str(msg), str(err)))
    else
        print("Message produced: %s" % (str(msg)))

producer.produce(topic, key="key", value="value", callback=acked)

# Wait up to 1 second for events. Callbacks will be invoked during
# this method call if the message is acknowledged.
producer.poll(1)

Synchronous writes

The Python client provides a flush() method which can be used to make writes synchronous. This is typically a bad idea since it effectively limits throughput to the broker round trip time, but may be justified in some cases.

producer.produce(topic, key="key", value="value")
producer.flush()

Typically, flush() should be called prior to shutting down the producer to ensure all outstanding/queued/in-flight messages are delivered.

Kafka Consumer

Initialization

The Consumer is configured using a dictionary as follows:

from confluent_kafka import Consumer

conf = {'bootstrap.servers': "host1:9092,host2:9092",
        'group.id': "foo",
        'auto.offset.reset': 'smallest'}

consumer = Consumer(conf)

The group.id property is mandatory and specifies which consumer group the consumer is a member of. The auto.offset.reset property specifies what offset the consumer should start reading from in the event there are no committed offsets for a partition, or the committed offset is invalid (perhaps due to log truncation).

This is another example with the enable.auto.commit configured to False in the consumer. The default value is True.

from confluent_kafka import Consumer

conf = {'bootstrap.servers': 'host1:9092,host2:9092',
        'group.id': "foo",
        'enable.auto.commit': False,
        'auto.offset.reset': 'earliest'}

consumer = Consumer(conf)

For information on the available configuration properties, refer to the API Documentation.

Python Client code examples

Basic poll loop

A typical Kafka consumer application is centered around a consume loop, which repeatedly calls the poll method to retrieve records one-by-one that have been efficiently pre-fetched by the consumer in behind the scenes. Before entering the consume loop, you’ll typically use the subscribe method to specify which topics should be fetched from:

running = True

def basic_consume_loop(consumer, topics):
    try:
        consumer.subscribe(topics)

        while running:
            msg = consumer.poll(timeout=1.0)
            if msg is None: continue

            if msg.error():
                if msg.error().code() == KafkaError._PARTITION_EOF:
                    # End of partition event
                    sys.stderr.write('%% %s [%d] reached end at offset %d\n' %
                                     (msg.topic(), msg.partition(), msg.offset()))
                elif msg.error():
                    raise KafkaException(msg.error())
            else:
                msg_process(msg)
    finally:
        # Close down consumer to commit final offsets.
        consumer.close()

def shutdown():
    running = False

The poll timeout is hard-coded to 1 second. If no records are received before this timeout expires, then Consumer.poll() will return an empty record set.

Note that you should always call Consumer.close() after you are finished using the consumer. Doing so will ensure that active sockets are closed and internal state is cleaned up. It will also trigger a group rebalance immediately which ensures that any partitions owned by the consumer are re-assigned to another member in the group. If not closed properly, the broker will trigger the rebalance only after the session timeout has expired.

Synchronous commits

The simplest and most reliable way to manually commit offsets is by setting the asynchronous parameter to the Consumer.commit() method call. This method can also accept the mutually exclusive keyword parameters offsets to explicitly list the offsets for each assigned topic partition and message which will commit offsets relative to a Message object returned by poll().

def consume_loop(consumer, topics):
    try:
        consumer.subscribe(topics)

        msg_count = 0
        while running:
            msg = consumer.poll(timeout=1.0)
            if msg is None: continue

            if msg.error():
                if msg.error().code() == KafkaError._PARTITION_EOF:
                    # End of partition event
                    sys.stderr.write('%% %s [%d] reached end at offset %d\n' %
                                     (msg.topic(), msg.partition(), msg.offset()))
                elif msg.error():
                    raise KafkaException(msg.error())
            else:
                msg_process(msg)
                msg_count += 1
                if msg_count % MIN_COMMIT_COUNT == 0:
                    consumer.commit(asynchronous=False)
    finally:
        # Close down consumer to commit final offsets.
        consumer.close()

In this example, a synchronous commit is triggered every MIN_COMMIT_COUNT messages. The asynchronous flag controls whether this call is asynchronous. You could also trigger the commit on expiration of a timeout to ensure there the committed position is updated regularly.

Delivery guarantees

In the previous example, you get “at least once” delivery since the commit follows the message processing. By changing the order, however, you can get “at most once” delivery, but you must be a little careful with the commit failure.

def consume_loop(consumer, topics):
    try:
        consumer.subscribe(topics)

        while running:
            msg = consumer.poll(timeout=1.0)
            if msg is None: continue

            if msg.error():
                if msg.error().code() == KafkaError._PARTITION_EOF:
                    # End of partition event
                    sys.stderr.write('%% %s [%d] reached end at offset %d\n' %
                                     (msg.topic(), msg.partition(), msg.offset()))
                elif msg.error():
                    raise KafkaException(msg.error())
            else:
                consumer.commit(asynchronous=False)
                msg_process(msg)

    finally:
        # Close down consumer to commit final offsets.
        consumer.close()

For simplicity in this example, Consumer.commit() is used prior to processing the message. Committing on every message would produce a lot of overhead in practice. A better approach would be to collect a batch of messages, execute the synchronous commit, and then process the messages only if the commit succeeded.

Asynchronous Commits

def consume_loop(consumer, topics):
    try:
        consumer.subscribe(topics)

        msg_count = 0
        while running:
            msg = consumer.poll(timeout=1.0)
            if msg is None: continue

            if msg.error():
                if msg.error().code() == KafkaError._PARTITION_EOF:
                    # End of partition event
                    sys.stderr.write('%% %s [%d] reached end at offset %d\n' %
                                     (msg.topic(), msg.partition(), msg.offset()))
                elif msg.error():
                    raise KafkaException(msg.error())
            else:
                msg_process(msg)
                msg_count += 1
                if msg_count % MIN_COMMIT_COUNT == 0:
                    consumer.commit(asynchronous=True)
    finally:
        # Close down consumer to commit final offsets.
        consumer.close()

In this example, the consumer sends the request and returns immediately by using asynchronous commits. The asynchronous parameter to commit() is changed to True. The value is passed in explicitly, but asynchronous commits are the default if the parameter is not included.

The API gives you a callback which is invoked when the commit either succeeds or fails. The commit callback can be any callable and can be passed as a configuration parameter to the consumer constructor.

from confluent_kafka import Consumer

def commit_completed(err, partitions):
    if err:
        print(str(err))
    else:
        print("Committed partition offsets: " + str(partitions))

conf = {'bootstrap.servers': "host1:9092,host2:9092",
        'group.id': "foo",
        'default.topic.config': {'auto.offset.reset': 'smallest'},
        'on_commit': commit_completed}

consumer = Consumer(conf)