Apache Kafka Python Client¶
Confluent, a leading developer and maintainer of Apache Kafka®, offers confluent-kafka-python on GitHub. This Python client provides a high-level producer, consumer, and AdminClient that are compatible with Kafka brokers (version 0.8 or later), Confluent Cloud, and Confluent Platform. Stay up-to-date with the latest release updates by checking out the changelog available in the same repository.
For a step-by-step guide on building a Python client application for Kafka, see Getting Started with Apache Kafka and Python.
The confluent-kafka-python package is a binding on top of the C client, librdkafka. For overview of the librdkafka client library, see Introduction to librdkafka client library.
For information about the configuration of the Confluent Kafka Python client, see Kafka Client Configuration.
Python Client installation¶
The client is available on PyPI and can be installed using pip
:
pip install confluent-kafka
You can install it globally, or within a virtualenv. If you want to install a FIPS-compliant client, see FIPS Compliance.
Note
The confluent-kafka-python package comes bundled with a pre-built version of librdkafka which does not include GSSAPI/Kerberos support. For information about how to install a version that supports GSSAPI, see the installation instructions.
Python Client demo code¶
For a step-by-step tutorial using the Python client including code samples for the producer and consumer see this guide.
There are also further examples including how to produce and consume Avro data with Schema Registry.
Kafka Producer¶
Initialization¶
The Producer is configured using a dictionary in the examples below.
If you are running Kafka locally, you can initialize the Producer as shown below.
from confluent_kafka import Producer
import socket
conf = {'bootstrap.servers': 'host1:9092,host2:9092',
'client.id': socket.gethostname()}
producer = Producer(conf)
If you are connecting to a Kafka cluster in Confluent Cloud, you need to provide credentials for access. The example below shows using a cluster API key and secret.
from confluent_kafka import Producer
import socket
conf = {'bootstrap.servers': 'pkc-abcd85.us-west-2.aws.confluent.cloud:9092',
'security.protocol': 'SASL_SSL',
'sasl.mechanism': 'PLAIN',
'sasl.username': '<CLUSTER_API_KEY>',
'sasl.password': '<CLUSTER_API_SECRET>',
'client.id': socket.gethostname()}
producer = Producer(conf)
- For information on the available configuration properties, refer to the API Documentation.
- For a step-by-step tutorial using the Python client including code samples for the producer and consumer see this guide.
Asynchronous writes¶
To initiate sending a message to Kafka, call the produce
method, passing in the
message value (which may be None
) and optionally a key, partition, and callback.
The produce call will complete immediately and does not return a value. A
KafkaException
will be thrown if the message could not be enqueued due to
librdkafka’s local produce queue being full.
producer.produce(topic, key="key", value="value")
To receive notification of delivery success or failure, you can pass a callback
parameter. This can be any callable, for example, a lambda, function, bound method, or
callable object. Although the produce()
method enqueues message immediately
for batching, compression and transmission to broker, no delivery notification events
will be propagated until poll()
is invoked.
def acked(err, msg):
if err is not None:
print("Failed to deliver message: %s: %s" % (str(msg), str(err)))
else:
print("Message produced: %s" % (str(msg)))
producer.produce(topic, key="key", value="value", callback=acked)
# Wait up to 1 second for events. Callbacks will be invoked during
# this method call if the message is acknowledged.
producer.poll(1)
Synchronous writes¶
The Python client provides a flush()
method which can be used to make
writes synchronous. This is typically a bad idea since it effectively
limits throughput to the broker round trip time, but may be justified in
some cases.
producer.produce(topic, key="key", value="value")
producer.flush()
Typically, flush()
should be called prior to shutting down the producer
to ensure all outstanding/queued/in-flight messages are delivered.
Kafka Consumer¶
Initialization¶
The Consumer is configured using a dictionary in the examples below. If you are running Kafka locally, you can initialize the Consumer as shown below.
from confluent_kafka import Consumer
conf = {'bootstrap.servers': 'host1:9092,host2:9092',
'group.id': 'foo',
'auto.offset.reset': 'smallest'}
consumer = Consumer(conf)
If you are connecting to a Kafka cluster in Confluent Cloud, you need to provide credentials for access. The example below shows using a cluster API key and secret.
from confluent_kafka import Consumer
conf = {'bootstrap.servers': 'pkc-abcd85.us-west-2.aws.confluent.cloud:9092',
'security.protocol': 'SASL_SSL',
'sasl.mechanism': 'PLAIN',
'sasl.username': '<CLUSTER_API_KEY>',
'sasl.password': '<CLUSTER_API_SECRET>',
'group.id': 'foo',
'auto.offset.reset': 'smallest'}
consumer = Consumer(conf)
The group.id
property is mandatory and specifies which consumer group the consumer
is a member of. The auto.offset.reset
property specifies what offset the consumer
should start reading from in the event there are no committed offsets for a partition,
or the committed offset is invalid (perhaps due to log truncation).
The local example below shows enable.auto.commit
configured to false
in the consumer. The default value is True
.
from confluent_kafka import Consumer
conf = {'bootstrap.servers': 'host1:9092,host2:9092',
'group.id': 'foo',
'enable.auto.commit': 'false',
'auto.offset.reset': 'earliest'}
consumer = Consumer(conf)
- For information on the available configuration properties, refer to the API Documentation.
- For a step-by-step tutorial using the Python client including code samples for the producer and consumer see this guide.
Python Client code examples¶
Basic poll loop¶
A typical Kafka consumer application is centered around a consume loop, which repeatedly calls
the poll
method to retrieve records one-by-one that have been efficiently pre-fetched by
the consumer in behind the scenes. Before entering the consume loop, you’ll typically use the
subscribe
method to specify which topics should be fetched from:
running = True
def basic_consume_loop(consumer, topics):
try:
consumer.subscribe(topics)
while running:
msg = consumer.poll(timeout=1.0)
if msg is None: continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
# End of partition event
sys.stderr.write('%% %s [%d] reached end at offset %d\n' %
(msg.topic(), msg.partition(), msg.offset()))
elif msg.error():
raise KafkaException(msg.error())
else:
msg_process(msg)
finally:
# Close down consumer to commit final offsets.
consumer.close()
def shutdown():
running = False
The poll timeout is hard-coded to 1 second. If no records
are received before this timeout expires, then Consumer.poll()
will return
an empty record set.
Note that you should always call Consumer.close()
after you are finished
using the consumer. Doing so will ensure that active sockets are
closed and internal state is cleaned up. It will also trigger a group
rebalance immediately which ensures that any partitions owned by the
consumer are re-assigned to another member in the group. If not closed
properly, the broker will trigger the rebalance only after the session
timeout has expired.
Synchronous commits¶
The simplest and most reliable way to manually commit offsets is by
setting the asynchronous
parameter to the Consumer.commit()
method call. This method can also accept the mutually exclusive keyword
parameters offsets
to explicitly list the offsets for each assigned
topic partition and message
which will commit offsets relative to a
Message
object returned by poll()
.
def consume_loop(consumer, topics):
try:
consumer.subscribe(topics)
msg_count = 0
while running:
msg = consumer.poll(timeout=1.0)
if msg is None: continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
# End of partition event
sys.stderr.write('%% %s [%d] reached end at offset %d\n' %
(msg.topic(), msg.partition(), msg.offset()))
elif msg.error():
raise KafkaException(msg.error())
else:
msg_process(msg)
msg_count += 1
if msg_count % MIN_COMMIT_COUNT == 0:
consumer.commit(asynchronous=False)
finally:
# Close down consumer to commit final offsets.
consumer.close()
In this example, a synchronous commit is triggered every MIN_COMMIT_COUNT
messages. The asynchronous
flag controls whether this call is
asynchronous. You could also trigger the commit on expiration of a
timeout to ensure there the committed position is updated regularly.
Delivery guarantees¶
Default guarantee¶
The following are the default settings of the Python client, and the settings result in the default delivery guarantee of the Python client being “None”:
'enable.auto.commit': 'true'
'enable.auto.offset.store': 'true'
Since auto commits are performed in a background thread, these settings may result in the offset for the latest message being committed before the application has finished processing the message. If the application were to crash or exit before finishing processing, and the offset had been auto-committed, the next incarnation of the consumer application would start at the next message, effectively missing the message that was processed when the application crashed. You can lose data or get duplicates.
“At least once” guarantee¶
To achieve “at least once” guarantee, configure the following settings:
'enable.auto.commit': 'true'
'enable.auto.offset.store': 'false'
To avoid the scenario of data loss or duplicates with the above default “None
guaranteed” mode, the application can disable the automatic offset store and
manually store offsets (with rd_kafka_offsets_store()
) after processing.
This gives the application fine-grained control over when a message is
committed. The latest stored offset will be automatically committed every
auto.commit.interval.ms
.
For this guarantee option, you should store the offset only after processing the message successfully.
Note: Only offsets greater than the current offset are committed. For example, if the latest committed
offset was 10
and the application performs an offsets_store()
with
offset 9
, that offset will not be committed.
In the example in the previous section, you get “at least once” delivery since the commit follows the message processing. By changing the order and committing synchronously before processing, you can get “at most once” delivery, but you must handle commit failures carefully.
def consume_loop(consumer, topics):
try:
consumer.subscribe(topics)
while running:
msg = consumer.poll(timeout=1.0)
if msg is None: continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
# End of partition event
sys.stderr.write('%% %s [%d] reached end at offset %d\n' %
(msg.topic(), msg.partition(), msg.offset()))
elif msg.error():
raise KafkaException(msg.error())
else:
consumer.commit(asynchronous=False)
msg_process(msg)
finally:
# Close down consumer to commit final offsets.
consumer.close()
For simplicity in this example, Consumer.commit()
is used
prior to processing the message. Committing on every message would
produce a lot of overhead in practice. A better approach would be to
collect a batch of messages, execute the synchronous commit, and then
process the messages only if the commit succeeded.
Asynchronous Commits¶
def consume_loop(consumer, topics):
try:
consumer.subscribe(topics)
msg_count = 0
while running:
msg = consumer.poll(timeout=1.0)
if msg is None: continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
# End of partition event
sys.stderr.write('%% %s [%d] reached end at offset %d\n' %
(msg.topic(), msg.partition(), msg.offset()))
elif msg.error():
raise KafkaException(msg.error())
else:
msg_process(msg)
msg_count += 1
if msg_count % MIN_COMMIT_COUNT == 0:
consumer.commit(asynchronous=True)
finally:
# Close down consumer to commit final offsets.
consumer.close()
In this example, the consumer sends the request and returns
immediately by using asynchronous commits. The asynchronous
parameter to commit()
is
changed to True
. The value is passed in explicitly, but asynchronous
commits are the default if the parameter is not included.
The API gives you a callback which is invoked when the commit either succeeds or fails. The commit callback can be any callable and can be passed as a configuration parameter to the consumer constructor.
from confluent_kafka import Consumer
def commit_completed(err, partitions):
if err:
print(str(err))
else:
print("Committed partition offsets: " + str(partitions))
conf = {'bootstrap.servers': "host1:9092,host2:9092",
'group.id': "foo",
'default.topic.config': {'auto.offset.reset': 'smallest'},
'on_commit': commit_completed}
consumer = Consumer(conf)
FIPS Compliance¶
Confluent tested FIPS compliance for the client using OpenSSL 3.0. To use the client in FIPS-compliant mode, Confluent recommends using OpenSSL 3.0. Older versions of OpenSSL have not been verified (although they may work). Confluent tested communication for FIPS compliance between clients and the following endpoints:
- Kafka brokers
- Schema Registry
Kafka broker and Schema Registry¶
- Kafka broker
- To communicate with the Kafka broker, the client uses the librdkafka library, which uses OpenSSL. The steps below configure OpenSSL to operate in FIPS mode for client communication with the broker.
- Schema Registry
- To communicate with Schema Registry, the client uses the standard libraries in Python, which uses the operating system’s native SSL/TLS library. For FIPS-compliant communication between Schema Registry and the client, do not use the steps that follow. Instead, make the SSL/TLS library FIPS compliant. If the native SSL/TLS library is OpenSSL (the default for Python), then use the steps in the OpenSSL readme to make OpenSSL FIPS compliant.
FIPS-compliant Kafka broker communication¶
For FIPS-compliant communication with the broker, there are two ways to approach client installation:
- Use prebuilt wheels
- Build librdkafka and the client both from source
Use prebuilt wheels¶
If you install this client through prebuilt wheels using pip install confluent_kafka
, OpenSSL 3.0
is already statically linked with the librdkafka shared library. To enable this client to
communicate with the Kafka cluster using the OpenSSL FIPS provider and FIPS-approved algorithms,
you must enable the FIPS provider. You can find steps to enable the FIPS provider in section
Use FIPS provider.
Note
You should enable the FIPS provider (using the same steps) if you install this client from the source using
pip install confluent_kafka --no-binary :all:
with prebuilt librdkafka in which OpenSSL is statically linked.
Build librdkafka and client both from source¶
When you build the librdkafka from source, librdkafka dynamically links to the OpenSSL present in the system
if static linking is not used explicitly while building. If the system installed OpenSSL is already working in FIPS mode,
then you can directly jump to the section
Client configuration to enable FIPS provider and enable the fips
provider.
If you don’t have OpenSSL working in FIPS mode, use the steps mentioned in the section
Use FIPS provider to make OpenSSL in your system FIPS compliant, and then enable the fips
provider. Once you have OpenSSL working in FIPS mode and the fips
provider enabled, librdkafka and the python client
will use FIPS approved algorithms for the communication between client and Kafka cluster.
Use FIPS provider¶
To use the FIPS provider, you must have the FIPS module available on your system. Plug the module into OpenSSL, and then configure OpenSSL to use the module.
You can plug the FIPS provider into OpenSSL two ways:
- Put the module in the default module folder of OpenSSL.
- Point to the module with the environment variable,
OPENSSL_MODULES
. For example:OPENSSL_MODULES="/path/to/fips/module/lib/folder/
.
After you plug the FIPS provider module into OpenSSL, you must configure OpenSSL to use the module. Once again, you have two options:
Modify the default configuration file to include the FIPS-related config.
Create a new configuration file and point to it using the environment variable,
OPENSSL_CONF
. For example:OPENSSL_CONF="/path/to/fips/enabled/openssl/config/openssl.cnf
.For an example of OpenSSL configuration file, see: Enable FIPS provider with OpenSSL.
Note
You need to specify both OPENSSL_MODULES
and OPENSSL_CONF
environment variables when installing the client
from pre-built wheels or when OpenSSL is statically linked to librdkafka.
Build FIPS provider module¶
This section provides a high-level overview of how to build the FIPS provider module. To find the official steps to generate the FIPS provider module, see: README-FIPS doc.
To build the FIPS provider module:
Clone OpenSSL from: OpenSSL Github Repo.
Use
git checkout
to checkout a FIPS-compliant version of OpenSSL 3.0. The latest version may not be FIPS-compliant. At the time of this writing, v3.0.8 (tagged as v3.0.8) is the current tested FIPS-compliant version.Run:
./Configure enable-fips
.Run:
make install_fips
.Inside the
providers
folder, two files are generated. Use these files with OpenSSL:- FIPS module (fips.dylib in Mac, fips.so in Linux, and fips.dll in Windows)
- FIPS config (fipsmodule.cnf)
Reference FIPS provider in OpenSSL¶
When installing from source, you can dynamically plug the FIPS module built above into OpenSSL by putting the FIPS module into the
default OpenSSL module folder. Look for something like: ...lib/ossl-modules/
.
For the default locations of OpenSSL on various operating systems, see the SSL section of the Introduction to librdkafka - the Apache Kafka C/C++ client library.
You can also point to this module with the environment variable OPENSSL_MODULES
.
For example: OPENSSL_MODULES="/path/to/fips/module/lib/folder/
.
Enable FIPS provider with OpenSSL¶
To enable FIPS in OpenSSL, you must include fipsmodule.cnf
in the file, openssl.cnf
. See the
following openssl.cnf
example:
config_diagnostics = 1
openssl_conf = openssl_init
.include /usr/local/ssl/fipsmodule.cnf
[openssl_init]
providers = provider_sect
alg_section = algorithm_sect
[provider_sect]
fips = fips_sect
[algorithm_sect]
default_properties = fips=yes
.
.
.
The fipsmodule.cnf
file includes fips_sect
which OpenSSL requires to enable FIPS.
Some of the algorithms might have different implementation in FIPS or other providers. If you load two different
providers like default
and fips
, any implementation could be used. To make sure you fetch only FIPS-compliant version
of the algorithm, use fips=yes
default property in config file.
Client configuration to enable FIPS provider¶
OpenSSL requires some non-crypto algorithms as well. These algorithms are not included in the FIPS
provider and you need to use the base
provider in conjunction with the fips
provider. Base
provider comes with OpenSSL by default. You must enable base
provider in the client configuration.
To make the client (consumer, producer or admin client) FIPS compliant, you must enable
only fips
and base
provider in the client using the ssl.providers
configuration property.
Configure the property this way: 'ssl.providers': 'fips,base'
.
Suggested resources¶
- Free course: Apache Kafka for Python Developers
- Free course: Apache Kafka 101
- Hands-on Developer tutorial
- Blog: Getting Started with Apache Kafka and Python
- Blog: Integrating Apache Kafka With Python Asyncio Web Applications
- Generating the FIPS module and config file
- How to use the FIPS Module
- librdkafka SSL Information