.. _python_client: |ak| |python| ============= Confluent develops and maintains **confluent-kafka-python**, a |python| client for |ak-tm| that provides a high-level Producer, Consumer and AdminClient compatible with all |ak| brokers >= v0.8, |ccloud| and |cp|. .. _installation_python_client: |python| installation --------------------- The client is available on `PyPI `__ and can be installed using ``pip``: .. codewithvars:: bash pip install confluent-kafka You can install it globally, or within a `virtualenv `__. .. note:: The confluent-kafka Python package is a binding on top of the C client `librdkafka `__. It comes bundled with a pre-built version of librdkafka which does not include GSSAPI/Kerberos support. For information how to install a version that supports GSSAPI, see the `installation instructions `__. .. _producer_python_client: |ak| Producer ------------- Initialization ~~~~~~~~~~~~~~ The Producer is configured using a dictionary as follows: .. sourcecode:: python from confluent_kafka import Producer import socket conf = {'bootstrap.servers': "host1:9092,host2:9092", 'client.id': socket.gethostname()} producer = Producer(conf) For information on the available configuration properties, refer to the `API Documentation `__. Asynchronous writes ~~~~~~~~~~~~~~~~~~~ To initiate sending a message to |ak|, call the ``produce`` method, passing in the message value (which may be ``None``) and optionally a key, partition, and callback. The produce call will complete immediately and does not return a value. A ``KafkaException`` will be thrown if the message could not be enqueued due to librdkafka's local produce queue being full. .. sourcecode:: python producer.produce(topic, key="key", value="value") To receive notification of delivery success or failure, you can pass a ``callback`` parameter. This can be any callable, for example, a lambda, function, bound method, or callable object. Although the ``produce()`` method enqueues message immediately for batching, compression and transmission to broker, no delivery notification events will be propagated until ``poll()`` is invoked. .. sourcecode:: python def acked(err, msg): if err is not None: print("Failed to deliver message: %s: %s" % (str(msg), str(err)) else print("Message produced: %s" % (str(msg)) producer.produce(topic, key="key", value="value", callback=acked) # Wait up to 1 second for events. Callbacks will be invoked during # this method call if the message is acknowledged. producer.poll(1) Synchronous writes ~~~~~~~~~~~~~~~~~~ The Python client provides a ``flush()`` method which can be used to make writes synchronous. This is typically a bad idea since it effectively limits throughput to the broker round trip time, but may be justified in some cases. .. sourcecode:: python producer.produce(topic, key="key", value="value") producer.flush() .. _consumer_python_client: Typically, ``flush()`` should be called prior to shutting down the producer to ensure all outstanding/queued/in-flight messages are delivered. |ak| Consumer ------------- Initialization ~~~~~~~~~~~~~~ The Consumer is configured using a dictionary as follows: .. sourcecode:: python from confluent_kafka import Consumer conf = {'bootstrap.servers': "host1:9092,host2:9092", 'group.id': "foo", 'auto.offset.reset': 'smallest'} consumer = Consumer(conf) The ``group.id`` property is mandatory and specifies which consumer group the consumer is a member of. The ``auto.offset.reset`` property specifies what offset the consumer should start reading from in the event there are no committed offsets for a partition, or the committed offset is invalid (perhaps due to log truncation). For information on the available configuration properties, refer to the `API Documentation `__. |python| code examples ~~~~~~~~~~~~~~~~~~~~~~ Basic poll loop ^^^^^^^^^^^^^^^ A typical Kafka consumer application is centered around a consume loop, which repeatedly calls the ``poll`` method to retrieve records one-by-one that have been efficiently pre-fetched by the consumer in behind the scenes. Before entering the consume loop, you’ll typically use the ``subscribe`` method to specify which topics should be fetched from: .. sourcecode:: python running = True def basic_consume_loop(consumer, topics): try: consumer.subscribe(topics) while running: msg = consumer.poll(timeout=1.0) if msg is None: continue if msg.error(): if msg.error().code() == KafkaError._PARTITION_EOF: # End of partition event sys.stderr.write('%% %s [%d] reached end at offset %d\n' % (msg.topic(), msg.partition(), msg.offset())) elif msg.error(): raise KafkaException(msg.error()) else: msg_process(msg) finally: # Close down consumer to commit final offsets. consumer.close() def shutdown(): running = False The poll timeout is hard-coded to 1 second. If no records are received before this timeout expires, then ``Consumer.poll()`` will return an empty record set. Note that you should always call ``Consumer.close()`` after you are finished using the consumer. Doing so will ensure that active sockets are closed and internal state is cleaned up. It will also trigger a group rebalance immediately which ensures that any partitions owned by the consumer are re-assigned to another member in the group. If not closed properly, the broker will trigger the rebalance only after the session timeout has expired. Synchronous commits ^^^^^^^^^^^^^^^^^^^ The simplest and most reliable way to manually commit offsets is by setting the ``async`` parameter to the ``Consumer.commit()`` method call. This method can also accept the mutually exclusive keyword parameters ``offsets`` to explicitly list the offsets for each assigned topic partition and ``message`` which will commit offsets relative to a ``Message`` object returned by ``poll()``. .. sourcecode:: python def consume_loop(consumer, topics): try: consumer.subscribe(topics) msg_count = 0 while running: msg = consumer.poll(timeout=1.0) if msg is None: continue if msg.error(): if msg.error().code() == KafkaError._PARTITION_EOF: # End of partition event sys.stderr.write('%% %s [%d] reached end at offset %d\n' % (msg.topic(), msg.partition(), msg.offset())) elif msg.error(): raise KafkaException(msg.error()) else: msg_process(msg) msg_count += 1 if msg_count % MIN_COMMIT_COUNT == 0: consumer.commit(async=False) finally: # Close down consumer to commit final offsets. consumer.close() In this example, a synchronous commit is triggered every ``MIN_COMMIT_COUNT`` messages. The ``async`` flag controls whether this call is asynchronous. You could also trigger the commit on expiration of a timeout to ensure there the committed position is updated regularly. Delivery guarantees ^^^^^^^^^^^^^^^^^^^ In the previous example, you get "at least once" delivery since the commit follows the message processing. By changing the order, however, you can get "at most once" delivery, but you must be a little careful with the commit failure. .. sourcecode:: python def consume_loop(consumer, topics): try: consumer.subscribe(topics) while running: msg = consumer.poll(timeout=1.0) if msg is None: continue if msg.error(): if msg.error().code() == KafkaError._PARTITION_EOF: # End of partition event sys.stderr.write('%% %s [%d] reached end at offset %d\n' % (msg.topic(), msg.partition(), msg.offset())) elif msg.error(): raise KafkaException(msg.error()) else: consumer.commit(async=False) msg_process(msg) finally: # Close down consumer to commit final offsets. consumer.close() For simplicity in this example, ``Consumer.commit()`` is used prior to processing the message. Committing on every message would produce a lot of overhead in practice. A better approach would be to collect a batch of messages, execute the synchronous commit, and then process the messages only if the commit succeeded. Asynchronous Commits ^^^^^^^^^^^^^^^^^^^^ .. sourcecode:: python def consume_loop(consumer, topics): try: consumer.subscribe(topics) msg_count = 0 while running: msg = consumer.poll(timeout=1.0) if msg is None: continue if msg.error(): if msg.error().code() == KafkaError._PARTITION_EOF: # End of partition event sys.stderr.write('%% %s [%d] reached end at offset %d\n' % (msg.topic(), msg.partition(), msg.offset())) elif msg.error(): raise KafkaException(msg.error()) else: msg_process(msg) msg_count += 1 if msg_count % MIN_COMMIT_COUNT == 0: consumer.commit(async=True) finally: # Close down consumer to commit final offsets. consumer.close() In this example, the consumer sends the request and returns immediately by using asynchronous commits. The ``async`` parameter to ``commit()`` is changed to ``True``. The value is passed in explicitly, but asynchronous commits are the default if the parameter is not included. The API gives you a callback which is invoked when the commit either succeeds or fails. The commit callback can be any callable and can be passed as a configuration parameter to the consumer constructor. .. sourcecode:: python from confluent_kafka import Consumer def commit_completed(err, partitions): if err: print(str(err)) else: print("Committed partition offsets: " + str(partitions)) conf = {'bootstrap.servers': "host1:9092,host2:9092", 'group.id': "foo", 'default.topic.config': {'auto.offset.reset': 'smallest'}, 'on_commit': commit_completed} consumer = Consumer(conf)