Apache Kafka Python Client

Confluent, a leading developer and maintainer of Apache Kafka®, offers confluent-kafka-python on GitHub. This Python client provides a high-level producer, consumer, and AdminClient that are compatible with Kafka brokers (version 0.8 or later), Confluent Cloud, and Confluent Platform. Stay up-to-date with the latest release updates by checking out the changelog available in the same repository.

For a step-by-step guide on building a Python client application for Kafka, see Getting Started with Apache Kafka and Python.

Python Client installation

The client is available on PyPI and can be installed using pip:

pip install confluent-kafka

You can install it globally, or within a virtualenv. If you want to install a FIPS-compliant client, see FIPS Compliance.

Note

The confluent-kafka Python package is a binding on top of the C client librdkafka. It comes bundled with a pre-built version of librdkafka which does not include GSSAPI/Kerberos support. For information how to install a version that supports GSSAPI, see the installation instructions.

Python Client demo code

For a step-by-step tutorial using the Python client including code samples for the producer and consumer see this guide.

There are also further examples including how to produce and consume Avro data with Schema Registry.

Kafka Producer

Initialization

The Producer is configured using a dictionary in the examples below.

If you are running Kafka locally, you can initialize the Producer as shown below.

from confluent_kafka import Producer
import socket

conf = {'bootstrap.servers': 'host1:9092,host2:9092',
        'client.id': socket.gethostname()}

producer = Producer(conf)

If you are connecting to a Kafka cluster in Confluent Cloud, you need to provide credentials for access. The example below shows using a cluster API key and secret.

from confluent_kafka import Producer
import socket

conf = {'bootstrap.servers': 'pkc-abcd85.us-west-2.aws.confluent.cloud:9092',
        'security.protocol': 'SASL_SSL',
        'sasl.mechanism': 'PLAIN',
        'sasl.username': '<CLUSTER_API_KEY>',
        'sasl.password': '<CLUSTER_API_SECRET>',
        'client.id': socket.gethostname()}

producer = Producer(conf)
  • For information on the available configuration properties, refer to the API Documentation.
  • For a step-by-step tutorial using the Python client including code samples for the producer and consumer see this guide.

Asynchronous writes

To initiate sending a message to Kafka, call the produce method, passing in the message value (which may be None) and optionally a key, partition, and callback. The produce call will complete immediately and does not return a value. A KafkaException will be thrown if the message could not be enqueued due to librdkafka’s local produce queue being full.

producer.produce(topic, key="key", value="value")

To receive notification of delivery success or failure, you can pass a callback parameter. This can be any callable, for example, a lambda, function, bound method, or callable object. Although the produce() method enqueues message immediately for batching, compression and transmission to broker, no delivery notification events will be propagated until poll() is invoked.

def acked(err, msg):
    if err is not None:
        print("Failed to deliver message: %s: %s" % (str(msg), str(err)))
    else:
        print("Message produced: %s" % (str(msg)))

producer.produce(topic, key="key", value="value", callback=acked)

# Wait up to 1 second for events. Callbacks will be invoked during
# this method call if the message is acknowledged.
producer.poll(1)

Synchronous writes

The Python client provides a flush() method which can be used to make writes synchronous. This is typically a bad idea since it effectively limits throughput to the broker round trip time, but may be justified in some cases.

producer.produce(topic, key="key", value="value")
producer.flush()

Typically, flush() should be called prior to shutting down the producer to ensure all outstanding/queued/in-flight messages are delivered.

Kafka Consumer

Initialization

The Consumer is configured using a dictionary in the examples below. If you are running Kafka locally, you can initialize the Consumer as shown below.

from confluent_kafka import Consumer

conf = {'bootstrap.servers': 'host1:9092,host2:9092',
        'group.id': 'foo',
        'auto.offset.reset': 'smallest'}

consumer = Consumer(conf)

If you are connecting to a Kafka cluster in Confluent Cloud, you need to provide credentials for access. The example below shows using a cluster API key and secret.

from confluent_kafka import Consumer

conf = {'bootstrap.servers': 'pkc-abcd85.us-west-2.aws.confluent.cloud:9092',
        'security.protocol': 'SASL_SSL',
        'sasl.mechanism': 'PLAIN',
        'sasl.username': '<CLUSTER_API_KEY>',
        'sasl.password': '<CLUSTER_API_SECRET>',
        'group.id': 'foo',
        'auto.offset.reset': 'smallest'}

consumer = Consumer(conf)

The group.id property is mandatory and specifies which consumer group the consumer is a member of. The auto.offset.reset property specifies what offset the consumer should start reading from in the event there are no committed offsets for a partition, or the committed offset is invalid (perhaps due to log truncation).

The local example below shows enable.auto.commit configured to false in the consumer. The default value is True.

from confluent_kafka import Consumer

conf = {'bootstrap.servers': 'host1:9092,host2:9092',
        'group.id': 'foo',
        'enable.auto.commit': 'false',
        'auto.offset.reset': 'earliest'}

consumer = Consumer(conf)
  • For information on the available configuration properties, refer to the API Documentation.
  • For a step-by-step tutorial using the Python client including code samples for the producer and consumer see this guide.

Python Client code examples

Basic poll loop

A typical Kafka consumer application is centered around a consume loop, which repeatedly calls the poll method to retrieve records one-by-one that have been efficiently pre-fetched by the consumer in behind the scenes. Before entering the consume loop, you’ll typically use the subscribe method to specify which topics should be fetched from:

running = True

def basic_consume_loop(consumer, topics):
    try:
        consumer.subscribe(topics)

        while running:
            msg = consumer.poll(timeout=1.0)
            if msg is None: continue

            if msg.error():
                if msg.error().code() == KafkaError._PARTITION_EOF:
                    # End of partition event
                    sys.stderr.write('%% %s [%d] reached end at offset %d\n' %
                                     (msg.topic(), msg.partition(), msg.offset()))
                elif msg.error():
                    raise KafkaException(msg.error())
            else:
                msg_process(msg)
    finally:
        # Close down consumer to commit final offsets.
        consumer.close()

def shutdown():
    running = False

The poll timeout is hard-coded to 1 second. If no records are received before this timeout expires, then Consumer.poll() will return an empty record set.

Note that you should always call Consumer.close() after you are finished using the consumer. Doing so will ensure that active sockets are closed and internal state is cleaned up. It will also trigger a group rebalance immediately which ensures that any partitions owned by the consumer are re-assigned to another member in the group. If not closed properly, the broker will trigger the rebalance only after the session timeout has expired.

Synchronous commits

The simplest and most reliable way to manually commit offsets is by setting the asynchronous parameter to the Consumer.commit() method call. This method can also accept the mutually exclusive keyword parameters offsets to explicitly list the offsets for each assigned topic partition and message which will commit offsets relative to a Message object returned by poll().

def consume_loop(consumer, topics):
    try:
        consumer.subscribe(topics)

        msg_count = 0
        while running:
            msg = consumer.poll(timeout=1.0)
            if msg is None: continue

            if msg.error():
                if msg.error().code() == KafkaError._PARTITION_EOF:
                    # End of partition event
                    sys.stderr.write('%% %s [%d] reached end at offset %d\n' %
                                     (msg.topic(), msg.partition(), msg.offset()))
                elif msg.error():
                    raise KafkaException(msg.error())
            else:
                msg_process(msg)
                msg_count += 1
                if msg_count % MIN_COMMIT_COUNT == 0:
                    consumer.commit(asynchronous=False)
    finally:
        # Close down consumer to commit final offsets.
        consumer.close()

In this example, a synchronous commit is triggered every MIN_COMMIT_COUNT messages. The asynchronous flag controls whether this call is asynchronous. You could also trigger the commit on expiration of a timeout to ensure there the committed position is updated regularly.

Delivery guarantees

In the previous example, you get “at least once” delivery since the commit follows the message processing. By changing the order, however, you can get “at most once” delivery, but you must be a little careful with the commit failure.

def consume_loop(consumer, topics):
    try:
        consumer.subscribe(topics)

        while running:
            msg = consumer.poll(timeout=1.0)
            if msg is None: continue

            if msg.error():
                if msg.error().code() == KafkaError._PARTITION_EOF:
                    # End of partition event
                    sys.stderr.write('%% %s [%d] reached end at offset %d\n' %
                                     (msg.topic(), msg.partition(), msg.offset()))
                elif msg.error():
                    raise KafkaException(msg.error())
            else:
                consumer.commit(asynchronous=False)
                msg_process(msg)

    finally:
        # Close down consumer to commit final offsets.
        consumer.close()

For simplicity in this example, Consumer.commit() is used prior to processing the message. Committing on every message would produce a lot of overhead in practice. A better approach would be to collect a batch of messages, execute the synchronous commit, and then process the messages only if the commit succeeded.

Asynchronous Commits

def consume_loop(consumer, topics):
    try:
        consumer.subscribe(topics)

        msg_count = 0
        while running:
            msg = consumer.poll(timeout=1.0)
            if msg is None: continue

            if msg.error():
                if msg.error().code() == KafkaError._PARTITION_EOF:
                    # End of partition event
                    sys.stderr.write('%% %s [%d] reached end at offset %d\n' %
                                     (msg.topic(), msg.partition(), msg.offset()))
                elif msg.error():
                    raise KafkaException(msg.error())
            else:
                msg_process(msg)
                msg_count += 1
                if msg_count % MIN_COMMIT_COUNT == 0:
                    consumer.commit(asynchronous=True)
    finally:
        # Close down consumer to commit final offsets.
        consumer.close()

In this example, the consumer sends the request and returns immediately by using asynchronous commits. The asynchronous parameter to commit() is changed to True. The value is passed in explicitly, but asynchronous commits are the default if the parameter is not included.

The API gives you a callback which is invoked when the commit either succeeds or fails. The commit callback can be any callable and can be passed as a configuration parameter to the consumer constructor.

from confluent_kafka import Consumer

def commit_completed(err, partitions):
    if err:
        print(str(err))
    else:
        print("Committed partition offsets: " + str(partitions))

conf = {'bootstrap.servers': "host1:9092,host2:9092",
        'group.id': "foo",
        'default.topic.config': {'auto.offset.reset': 'smallest'},
        'on_commit': commit_completed}

consumer = Consumer(conf)

FIPS Compliance

Confluent tested FIPS compliance for the client using OpenSSL 3.0. To use the client in FIPS-compliant mode, Confluent recommends using OpenSSL 3.0. Older versions of OpenSSL have not been verified (although they may work). Confluent tested communication for FIPS compliance between clients and the following endpoints:

  • Kafka brokers
  • Schema Registry

Kafka broker and Schema Registry

Kafka broker
To communicate with the Kafka broker, the client uses the librdkafka library, which uses OpenSSL. The steps below configure OpenSSL to operate in FIPS mode for client communication with the broker.
Schema Registry
To communicate with Schema Registry, the client uses the standard libraries in Python, which uses the operating system’s native SSL/TLS library. For FIPS-compliant communication between Schema Registry and the client, do not use the steps that follow. Instead, make the SSL/TLS library FIPS compliant. If the native SSL/TLS library is OpenSSL (the default for Python), then use the steps in the OpenSSL readme to make OpenSSL FIPS compliant.

FIPS-compliant Kafka broker communication

For FIPS-compliant communication with the broker, there are two ways to approach client installation:

  • Use prebuilt wheels
  • Build librdkafka and the client both from source

Use prebuilt wheels

If you install this client through prebuilt wheels using pip install confluent_kafka, OpenSSL 3.0 is already statically linked with the librdkafka shared library. To enable this client to communicate with the Kafka cluster using the OpenSSL FIPS provider and FIPS-approved algorithms, you must enable the FIPS provider. You can find steps to enable the FIPS provider in section Use FIPS provider.

Note

You should enable the FIPS provider (using the same steps) if you install this client from the source using pip install confluent_kafka --no-binary :all: with prebuilt librdkafka in which OpenSSL is statically linked.

Build librdkafka and client both from source

When you build the librdkafka from source, librdkafka dynamically links to the OpenSSL present in the system if static linking is not used explicitly while building. If the system installed OpenSSL is already working in FIPS mode, then you can directly jump to the section Client configuration to enable FIPS provider and enable the fips provider.

If you don’t have OpenSSL working in FIPS mode, use the steps mentioned in the section Use FIPS provider to make OpenSSL in your system FIPS compliant, and then enable the fips provider. Once you have OpenSSL working in FIPS mode and the fips provider enabled, librdkafka and the python client will use FIPS approved algorithms for the communication between client and Kafka cluster.

Use FIPS provider

To use the FIPS provider, you must have the FIPS module available on your system. Plug the module into OpenSSL, and then configure OpenSSL to use the module.

You can plug the FIPS provider into OpenSSL two ways:

  • Put the module in the default module folder of OpenSSL.
  • Point to the module with the environment variable, OPENSSL_MODULES. For example: OPENSSL_MODULES="/path/to/fips/module/lib/folder/.

After you plug the FIPS provider module into OpenSSL, you must configure OpenSSL to use the module. Once again, you have two options:

  • Modify the default configuration file to include the FIPS-related config.

  • Create a new configuration file and point to it using the environment variable, OPENSSL_CONF. For example: OPENSSL_CONF="/path/to/fips/enabled/openssl/config/openssl.cnf.

    For an example of OpenSSL configuration file, see: Enable FIPS provider with OpenSSL.

Note

You need to specify both OPENSSL_MODULES and OPENSSL_CONF environment variables when installing the client from pre-built wheels or when OpenSSL is statically linked to librdkafka.

Build FIPS provider module

This section provides a high-level overview of how to build the FIPS provider module. To find the official steps to generate the FIPS provider module, see: README-FIPS doc.

To build the FIPS provider module:

  1. Clone OpenSSL from: OpenSSL Github Repo.

  2. Use git checkout to checkout a FIPS-compliant version of OpenSSL 3.0. The latest version may not be FIPS-compliant. At the time of this writing, v3.0.8 (tagged as v3.0.8) is the current tested FIPS-compliant version.

  3. Run: ./Configure enable-fips.

  4. Run: make install_fips.

    Inside the providers folder, two files are generated. Use these files with OpenSSL:

    • FIPS module (fips.dylib in Mac, fips.so in Linux, and fips.dll in Windows)
    • FIPS config (fipsmodule.cnf)

Reference FIPS provider in OpenSSL

When installing from source, you can dynamically plug the FIPS module built above into OpenSSL by putting the FIPS module into the default OpenSSL module folder. Look for something like: ...lib/ossl-modules/.

For the default locations of OpenSSL on various operating systems, see the SSL section of the Introduction to librdkafka - the Apache Kafka C/C++ client library.

You can also point to this module with the environment variable OPENSSL_MODULES.

For example: OPENSSL_MODULES="/path/to/fips/module/lib/folder/.

Enable FIPS provider with OpenSSL

To enable FIPS in OpenSSL, you must include fipsmodule.cnf in the file, openssl.cnf. See the following openssl.cnf example:

config_diagnostics = 1
openssl_conf = openssl_init

.include /usr/local/ssl/fipsmodule.cnf

[openssl_init]
providers = provider_sect
alg_section = algorithm_sect

[provider_sect]
fips = fips_sect

[algorithm_sect]
default_properties = fips=yes
.
.
.

The fipsmodule.cnf file includes fips_sect which OpenSSL requires to enable FIPS.

Some of the algorithms might have different implementation in FIPS or other providers. If you load two different providers like default and fips, any implementation could be used. To make sure you fetch only FIPS-compliant version of the algorithm, use fips=yes default property in config file.

Client configuration to enable FIPS provider

OpenSSL requires some non-crypto algorithms as well. These algorithms are not included in the FIPS provider and you need to use the base provider in conjunction with the fips provider. Base provider comes with OpenSSL by default. You must enable base provider in the client configuration.

To make the client (consumer, producer or admin client) FIPS compliant, you must enable only fips and base provider in the client using the ssl.providers configuration property.

Configure the property this way: 'ssl.providers': 'fips,base'.

API documentation

Click here to view the Python Client API documentation.