Welcome to Confluent’s Apache Kafka Python client documentation¶
Indices and tables¶
confluent_kafka
— Confluent’s Apache Kafka Python client¶
Consumer¶
-
class
confluent_kafka.
Consumer
¶ High-level Kafka Consumer
-
Consumer
(config)¶ Parameters: config (dict) – Configuration properties. At a minimum bootstrap.servers
andgroup.id
should be set
Create new Consumer instance using provided configuration dict.
- Special configuration properties:
on_commit
: Optional callback will be called when a commit request has succeeded or failed.
-
on_commit
(err, partitions)¶ Parameters: - consumer (Consumer) – Consumer instance.
- err (KafkaError) – Commit error object, or None on success.
- partitions (list(TopicPartition)) – List of partitions with their committed offsets or per-partition errors.
-
assign
()¶ -
assign
(partitions)
Set consumer partition assignment to the provided list of
TopicPartition
and starts consuming.Parameters: partitions (list(TopicPartition)) – List of topic+partitions and optionally initial offsets to start consuming. Raises: RuntimeError if called on a closed consumer -
-
assignment
()¶ Returns the current partition assignment.
Returns: List of assigned topic+partitions. Return type: list(TopicPartition) Raises: KafkaException Raises: RuntimeError if called on a closed consumer
-
close
()¶ Close down and terminate the Kafka Consumer.
Actions performed:
- Stops consuming
- Commits offsets - except if the consumer property ‘enable.auto.commit’ is set to False
- Leave consumer group
Return type: None Raises: RuntimeError if called on a closed consumer
-
commit
()¶ -
commit
([message=None][, offsets=None][, asynchronous=True])
Commit a message or a list of offsets.
message
andoffsets
are mutually exclusive, if neither is set the current partition assignment’s offsets are used instead. The consumer relies on your use of this method if you have set ‘enable.auto.commit’ to FalseParameters: - message (confluent_kafka.Message) – Commit message’s offset+1.
- offsets (list(TopicPartition)) – List of topic+partitions+offsets to commit.
- asynchronous (bool) – Asynchronous commit, return None immediately. If False the commit() call will block until the commit succeeds or fails and the committed offsets will be returned (on success). Note that specific partitions may have failed and the .err field of each partition will need to be checked for success.
Return type: None|list(TopicPartition)
Raises: KafkaException
Raises: RuntimeError if called on a closed consumer
-
-
committed
()¶ -
committed
(partitions[, timeout=None])
Retrieve committed offsets for the list of partitions.
Parameters: - partitions (list(TopicPartition)) – List of topic+partitions to query for stored offsets.
- timeout (float) – Request timeout. (Seconds)
Returns: List of topic+partitions with offset and possibly error set.
Return type: list(TopicPartition)
Raises: KafkaException
Raises: RuntimeError if called on a closed consumer
-
-
consume
()¶ -
consume
([num_messages=1][, timeout=-1])
Consume messages, calls callbacks and returns list of messages (possibly empty on timeout).
The application must check the returned
Message
object’sMessage.error()
method to distinguish between proper messages (error() returns None), or an event or error for eachMessage
in the list (see error().code() for specifics).Parameters: - num_messages (int) – Maximum number of messages to return (default: 1).
- timeout (float) – Maximum time to block waiting for message, event or callback (default: infinite (-1)). (Seconds)
Returns: A list of Message objects (possibly empty on timeout)
Return type: list(Message)
Raises: - RuntimeError – if called on a closed consumer
- KafkaError – in case of internal error
- ValueError – if num_messages > 1M
-
-
get_watermark_offsets
()¶ -
get_watermark_offsets
(partition[, timeout=None][, cached=False])
Retrieve low and high offsets for partition.
Parameters: - partition (TopicPartition) – Topic+partition to return offsets for.
- timeout (float) – Request timeout (when cached=False). (Seconds)
- cached (bool) – Instead of querying the broker used cached information. Cached values: The low offset is updated periodically (if statistics.interval.ms is set) while the high offset is updated on each message fetched from the broker for this partition.
Returns: Tuple of (low,high) on success or None on timeout.
Return type: tuple(int,int)
Raises: KafkaException
Raises: RuntimeError if called on a closed consumer
-
-
list_topics
()¶ -
list_topics
([topic=None][, timeout=-1])
Request Metadata from cluster. This method provides the same information as listTopics(), describeTopics() and describeCluster() in the Java Admin client.
Parameters: - topic (str) – If specified, only request info about this topic, else return for all topics in cluster. Warning: If auto.create.topics.enable is set to true on the broker and an unknown topic is specified it will be created.
- timeout (float) – Maximum response time before timing out, or -1 for infinite timeout.
Return type: Raises: KafkaException
-
-
offsets_for_times
()¶ -
offsets_for_times
(partitions[, timeout=None])
offsets_for_times looks up offsets by timestamp for the given partitions.
The returned offsets for each partition is the earliest offset whose timestamp is greater than or equal to the given timestamp in the corresponding partition.
param list(TopicPartition) partitions: topic+partitions with timestamps in the TopicPartition.offset field. param float timeout: Request timeout. (Seconds) returns: list of topic+partition with offset field set and possibly error set rtype: list(TopicPartition) raises: KafkaException raises: RuntimeError if called on a closed consumer -
-
pause
()¶ -
pause
(partitions)
Pause consumption for the provided list of partitions.
Parameters: partitions (list(TopicPartition)) – List of topic+partitions to pause. Return type: None Raises: KafkaException -
-
poll
()¶ -
poll
([timeout=None])
Consume messages, calls callbacks and returns events.
The application must check the returned
Message
object’sMessage.error()
method to distinguish between proper messages (error() returns None), or an event or error (see error().code() for specifics).Parameters: timeout (float) – Maximum time to block waiting for message, event or callback. (Seconds) Returns: A Message object or None on timeout Return type: Message
or NoneRaises: RuntimeError if called on a closed consumer -
-
position
()¶ -
position
(partitions[, timeout=None])
Retrieve current positions (offsets) for the list of partitions.
Parameters: partitions (list(TopicPartition)) – List of topic+partitions to return current offsets for. The current offset is the offset of the last consumed message + 1. Returns: List of topic+partitions with offset and possibly error set. Return type: list(TopicPartition) Raises: KafkaException Raises: RuntimeError if called on a closed consumer -
-
resume
()¶ -
resume
(partitions)
Resume consumption for the provided list of partitions.
Parameters: partitions (list(TopicPartition)) – List of topic+partitions to resume. Return type: None Raises: KafkaException -
-
seek
()¶ -
seek
(partition)
Set consume position for partition to offset. The offset may be an absolute (>=0) or a logical offset (
OFFSET_BEGINNING
et.al).seek() may only be used to update the consume offset of an actively consumed partition (i.e., after
assign()
), to set the starting offset of partition not being consumed instead pass the offset in an assign() call.Parameters: partition (TopicPartition) – Topic+partition+offset to seek to. Raises: KafkaException -
-
store_offsets
()¶ -
store_offsets
([message=None][, offsets=None])
Store offsets for a message or a list of offsets.
message
andoffsets
are mutually exclusive. The stored offsets will be committed according to ‘auto.commit.interval.ms’ or manual offset-lesscommit()
. Note that ‘enable.auto.offset.store’ must be set to False when using this API.Parameters: - message (confluent_kafka.Message) – Store message’s offset+1.
- offsets (list(TopicPartition)) – List of topic+partitions+offsets to store.
Return type: None
Raises: KafkaException
Raises: RuntimeError if called on a closed consumer
-
-
subscribe
()¶ -
subscribe
(topics[, listener=None]) Set subscription to supplied list of topics This replaces a previous subscription.
Regexp pattern subscriptions are supported by prefixing the topic string with
"^"
, e.g.:consumer.subscribe(["^my_topic.*", "^another[0-9]-?[a-z]+$", "not_a_regex"])
Parameters: - topics (list(str)) – List of topics (strings) to subscribe to.
- on_assign (callable) – callback to provide handling of customized offsets on completion of a successful partition re-assignment.
- on_revoke (callable) – callback to provide handling of offset commits to a customized store on the start of a rebalance operation.
Raises: Raises: RuntimeError if called on a closed consumer
-
on_assign
(consumer, partitions)¶
-
on_revoke
(consumer, partitions)¶ Parameters: - consumer (Consumer) – Consumer instance.
- partitions (list(TopicPartition)) – Absolute list of partitions being assigned or revoked.
-
-
unassign
()¶ Removes the current partition assignment and stops consuming.
Raises: - KafkaException –
- RuntimeError – if called on a closed consumer
-
unsubscribe
()¶ Remove current subscription.
Raises: KafkaException Raises: RuntimeError if called on a closed consumer
-
Producer¶
-
class
confluent_kafka.
Producer
¶ Asynchronous Kafka Producer
-
Producer
(config)¶ Parameters: config (dict) – Configuration properties. At a minimum bootstrap.servers
should be setCreate new Producer instance using provided configuration dict.
-
len
()¶ Returns: Number of messages and Kafka protocol requests waiting to be delivered to broker. Return type: int
-
flush
()¶ -
flush
([timeout]) Param: float timeout: Maximum time to block (requires librdkafka >= v0.9.4). (Seconds) Returns: Number of messages still in queue.
Note
See
poll()
for a description on what callbacks may be triggered.-
-
list_topics
()¶ -
list_topics
([topic=None][, timeout=-1])
Request Metadata from cluster. This method provides the same information as listTopics(), describeTopics() and describeCluster() in the Java Admin client.
Parameters: - topic (str) – If specified, only request info about this topic, else return for all topics in cluster. Warning: If auto.create.topics.enable is set to true on the broker and an unknown topic is specified it will be created.
- timeout (float) – Maximum response time before timing out, or -1 for infinite timeout.
Return type: Raises: KafkaException
-
-
poll
()¶ -
poll
([timeout])
Polls the producer for events and calls the corresponding callbacks (if registered).
Callbacks:
on_delivery
callbacks fromproduce()
- …
Parameters: timeout (float) – Maximum time to block waiting for events. (Seconds) Returns: Number of events processed (callbacks served) Return type: int -
-
produce
()¶ -
produce
(topic[, value][, key][, partition][, on_delivery][, timestamp][, headers])
Produce message to topic. This is an asynchronous operation, an application may use the
callback
(aliason_delivery
) argument to pass a function (or lambda) that will be called frompoll()
when the message has been successfully delivered or permanently fails delivery.Currently message headers are not supported on the message returned to the callback. The
msg.headers()
will return None even if the original message had headers set.Parameters: - topic (str) – Topic to produce message to
- value (str|bytes) – Message payload
- key (str|bytes) – Message key
- partition (int) – Partition to produce to, else uses the configured built-in partitioner.
- on_delivery(err,msg) (func) – Delivery report callback to call (from
poll()
orflush()
) on successful or failed delivery - timestamp (int) – Message timestamp (CreateTime) in milliseconds since epoch UTC (requires librdkafka >= v0.9.4, api.version.request=true, and broker >= 0.10.0.0). Default value is current time.
- dict|list (headers) – Message headers to set on the message. The header key must be a string while the value must be binary, unicode or None. Accepts a list of (key,value) or a dict. (Requires librdkafka >= v0.11.4 and broker version >= 0.11.0.0)
Return type: None
Raises: - BufferError – if the internal producer message queue is full (
queue.buffering.max.messages
exceeded) - KafkaException – for other errors, see exception code
- NotImplementedError – if timestamp is specified without underlying library support.
-
-
Admin¶
Kafka Admin client: create, view, alter, delete topics and resources.
-
class
confluent_kafka.admin.
AdminClient
(conf)¶ The Kafka AdminClient provides admin operations for Kafka brokers, topics, groups, and other resource types supported by the broker.
The Admin API methods are asynchronous and returns a dict of concurrent.futures.Future objects keyed by the entity. The entity is a topic name for create_topics(), delete_topics(), create_partitions(), and a ConfigResource for alter_configs(), describe_configs().
All the futures for a single API call will currently finish/fail at the same time (backed by the same protocol request), but this might change in future versions of the client.
See examples/adminapi.py for example usage.
For more information see the Java Admin API documentation: https://docs.confluent.io/current/clients/javadocs/org/apache/kafka/clients/admin/package-frame.html
Requires broker version v0.11.0.0 or later.
-
alter_configs
(resources, **kwargs)¶ Update configuration values for the specified resources. Updates are not transactional so they may succeed for a subset of the provided resources while the others fail. The configuration for a particular resource is updated atomically, replacing the specified values while reverting unspecified configuration entries to their default values.
The future result() value is None.
Warning: alter_configs() will replace all existing configuration for the provided resources with the new configuration given, reverting all other configuration for the resource back to their default values.
Warning: Multiple resources and resource types may be specified, but at most one resource of type RESOURCE_BROKER is allowed per call since these resource requests must be sent to the broker specified in the resource.
Parameters: - resources (list(ConfigResource)) – Resources to update configuration for.
- request_timeout (float) – Set the overall request timeout in seconds, including broker lookup, request transmission, operation time on broker, and response. Default: socket.timeout.ms*1000.0.
- validate_only (bool) – Tell broker to only validate the request, without altering the configuration. Default: False
Returns: a dict of futures for each resource, keyed by the ConfigResource.
Return type: dict(<ConfigResource, future>)
Raises: - KafkaException – Operation failed locally or on broker.
- TypeException – Invalid input.
- ValueException – Invalid input.
-
create_partitions
(new_partitions, **kwargs)¶ Create additional partitions for the given topics.
The future result() value is None.
Parameters: - new_partitions (list(NewPartitions)) – New partitions to be created.
- operation_timeout (float) – Set broker’s operation timeout in seconds, controlling how long the CreatePartitions request will block on the broker waiting for the partition creation to propagate in the cluster. A value of 0 returns immediately. Default: 0
- request_timeout (float) – Set the overall request timeout in seconds, including broker lookup, request transmission, operation time on broker, and response. Default: socket.timeout.ms*1000.0
- validate_only (bool) – Tell broker to only validate the request, without creating the partitions. Default: False
Returns: a dict of futures for each topic, keyed by the topic name.
Return type: dict(<topic_name, future>)
Raises: - KafkaException – Operation failed locally or on broker.
- TypeException – Invalid input.
- ValueException – Invalid input.
-
create_topics
(new_topics, **kwargs)¶ Create new topics in cluster.
The future result() value is None.
Parameters: - new_topics (list(NewTopic)) – New topics to be created.
- operation_timeout (float) – Set broker’s operation timeout in seconds, controlling how long the CreateTopics request will block on the broker waiting for the topic creation to propagate in the cluster. A value of 0 returns immediately. Default: 0
- request_timeout (float) – Set the overall request timeout in seconds, including broker lookup, request transmission, operation time on broker, and response. Default: socket.timeout.ms*1000.0
- validate_only (bool) – Tell broker to only validate the request, without creating the topic. Default: False
Returns: a dict of futures for each topic, keyed by the topic name.
Return type: dict(<topic_name, future>)
Raises: - KafkaException – Operation failed locally or on broker.
- TypeException – Invalid input.
- ValueException – Invalid input.
-
delete_topics
(topics, **kwargs)¶ Delete topics.
The future result() value is None.
Parameters: - topics (list(str)) – Topics to mark for deletion.
- operation_timeout (float) – Set broker’s operation timeout in seconds, controlling how long the DeleteTopics request will block on the broker waiting for the topic deletion to propagate in the cluster. A value of 0 returns immediately. Default: 0
- request_timeout (float) – Set the overall request timeout in seconds, including broker lookup, request transmission, operation time on broker, and response. Default: socket.timeout.ms*1000.0
Returns: a dict of futures for each topic, keyed by the topic name.
Return type: dict(<topic_name, future>)
Raises: - KafkaException – Operation failed locally or on broker.
- TypeException – Invalid input.
- ValueException – Invalid input.
-
describe_configs
(resources, **kwargs)¶ Get configuration for the specified resources.
The future result() value is a dict(<configname, ConfigEntry>).
Warning: Multiple resources and resource types may be requested, but at most one resource of type RESOURCE_BROKER is allowed per call since these resource requests must be sent to the broker specified in the resource.
Parameters: - resources (list(ConfigResource)) – Resources to get configuration for.
- request_timeout (float) – Set the overall request timeout in seconds, including broker lookup, request transmission, operation time on broker, and response. Default: socket.timeout.ms*1000.0
- validate_only (bool) – Tell broker to only validate the request, without creating the partitions. Default: False
Returns: a dict of futures for each resource, keyed by the ConfigResource.
Return type: dict(<ConfigResource, future>)
Raises: - KafkaException – Operation failed locally or on broker.
- TypeException – Invalid input.
- ValueException – Invalid input.
-
-
class
confluent_kafka.admin.
BrokerMetadata
¶ BrokerMetadata contains information about a Kafka broker.
This class is typically not user instantiated.
Variables: - id (int) – Broker id.
- host (str) – Broker hostname.
- port (int) – Broker port.
-
class
confluent_kafka.admin.
ClusterMetadata
¶ ClusterMetadata as returned by list_topics() contains information about the Kafka cluster, brokers, and topics.
This class is typically not user instantiated.
Variables: - cluster_id (str) – Cluster id string, if supported by broker, else None.
- controller_id (id) – Current controller broker id, or -1.
- brokers (dict) – Map of brokers indexed by the int broker id. Value is BrokerMetadata object.
- topics (dict) – Map of topics indexed by the topic name. Value is TopicMetadata object.
- orig_broker_id (int) – The broker this metadata originated from.
- orig_broker_name (str) – Broker name/address this metadata originated from.
-
class
confluent_kafka.admin.
ConfigEntry
(name, value, source=<ConfigSource.UNKNOWN_CONFIG: 0>, is_read_only=False, is_default=False, is_sensitive=False, is_synonym=False, synonyms=[])¶ ConfigEntry is returned by describe_configs() for each configuration entry for the specified resource.
This class is typically not user instantiated.
Variables: - name (str) – Configuration property name.
- value (str) – Configuration value (or None if not set or is_sensitive==True).
- source (ConfigSource) – Configuration source.
- is_read_only (bool) – Indicates if configuration property is read-only.
- is_default (bool) – Indicates if configuration property is using its default value.
- is_sensitive (bool) – Indicates if configuration property value contains sensitive information (such as security settings), in which case .value is None.
- is_synonym (bool) – Indicates if configuration property is a synonym for the parent configuration entry.
- synonyms (list) – A ConfigEntry list of synonyms and alternate sources for this configuration property.
-
class
confluent_kafka.admin.
ConfigResource
(restype, name, set_config=None, described_configs=None, error=None)¶ Class representing resources that have configs.
Instantiate with a resource type and a resource name.
-
class
Type
¶ ConfigResource.Type depicts the type of a Kafka resource.
-
ANY
= 1¶ Match any resource, used for lookups.
-
BROKER
= 4¶ Broker resource. Resource name is broker id
-
GROUP
= 3¶ Group resource. Resource name is group.id
-
TOPIC
= 2¶ Topic resource. Resource name is topic name
-
UNKNOWN
= 0¶ Resource type is not known or not set.
-
-
set_config
(name, value, overwrite=True)¶ Set/Overwrite configuration entry
Any configuration properties that are not included will be reverted to their default values. As a workaround use describe_configs() to retrieve the current configuration and overwrite the settings you want to change.
Parameters: - name (str) – Configuration property name
- value (str) – Configuration value
- overwrite (bool) – If True overwrite entry if already exists (default). If False do nothing if entry already exists.
-
class
-
class
confluent_kafka.admin.
ConfigSource
¶ Config sources returned in ConfigEntry by describe_configs().
-
DEFAULT_CONFIG
= 5¶
-
DYNAMIC_BROKER_CONFIG
= 2¶
-
DYNAMIC_DEFAULT_BROKER_CONFIG
= 3¶
-
DYNAMIC_TOPIC_CONFIG
= 1¶
-
STATIC_BROKER_CONFIG
= 4¶
-
UNKNOWN_CONFIG
= 0¶
-
-
class
confluent_kafka.admin.
PartitionMetadata
¶ PartitionsMetadata contains information about a Kafka partition.
This class is typically not user instantiated.
Variables: - id (int) – Partition id.
- leader (int) – Current leader broker for this partition, or -1.
- replicas (list(int)) – List of replica broker ids for this partition.
- isrs (list(int)) – List of in-sync-replica broker ids for this partition.
- error (KafkaError) – Partition error, or None. Value is a KafkaError object.
Warning: Depending on cluster state the broker ids referenced in leader, replicas and isrs may temporarily not be reported in ClusterMetadata.brokers. Always check the availability of a broker id in the brokers dict.
-
class
confluent_kafka.admin.
TopicMetadata
¶ TopicMetadata contains information about a Kafka topic.
This class is typically not user instantiated.
Variables: - topic (str) – Topic name.
- partitions (dict) – Map of partitions indexed by partition id. Value is PartitionMetadata object.
- error (KafkaError) – Topic error, or None. Value is a KafkaError object.
-
class
confluent_kafka.admin.
NewTopic
¶ NewTopic specifies per-topic settings for passing to passed to AdminClient.create_topics().
-
NewTopic
(topic, num_partitions[, replication_factor][, replica_assignment][, config])¶ Instantiate a NewTopic object.
Parameters: - topic (string) – Topic name
- num_partitions (int) – Number of partitions to create
- replication_factor (int) – Replication factor of partitions, or -1 if replica_assignment is used.
- replica_assignment (list) – List of lists with the replication assignment for each new partition.
- config (dict) – Dict (str:str) of topic configuration. See http://kafka.apache.org/documentation.html#topicconfigs
Return type:
-
-
class
confluent_kafka.admin.
NewPartitions
¶ NewPartitions specifies per-topic settings for passing to passed to AdminClient.create_partitions().
-
NewPartitions
(topic, new_total_count[, replication_factor][, replica_assignment])¶ Instantiate a NewPartitions object.
Parameters: - topic (string) – Topic name
- new_total_cnt (int) – Increase the topic’s partition count to this value.
- replica_assignment (list) – List of lists with the replication assignment for each new partition.
Return type:
-
Avro¶
Message¶
-
class
confluent_kafka.
Message
¶ The Message object represents either a single consumed or produced message, or an event (
error()
is not None).An application must check with
error()
to see if the object is a proper message (error() returns None) or an error/event.This class is not user-instantiable.
-
len
()¶ Returns: Message value (payload) size in bytes Return type: int
-
error
()¶ The message object is also used to propagate errors and events, an application must check error() to determine if the Message is a proper message (error() returns None) or an error or event (error() returns a KafkaError object)
Return type: None or KafkaError
-
headers
()¶ Retrieve the headers set on a message. Each header is a key valuepair. Please note that header keys are ordered and can repeat.
Returns: list of two-tuples, one (key, value) pair for each header. Return type: [(str, bytes),..] or None.
-
key
()¶ Returns: message key or None if not available. Return type: str|bytes or None
-
offset
()¶ Returns: message offset or None if not available. Return type: int or None
-
partition
()¶ Returns: partition number or None if not available. Return type: int or None
-
set_headers
()¶ Set the field ‘Message.headers’ with new value.
Parameters: value (object) – Message.headers. Returns: None. Return type: None
-
set_key
()¶ Set the field ‘Message.key’ with new value.
Parameters: value (object) – Message.key. Returns: None. Return type: None
-
set_value
()¶ Set the field ‘Message.value’ with new value.
Parameters: value (object) – Message.value. Returns: None. Return type: None
-
timestamp
()¶ Retrieve timestamp type and timestamp from message. The timestamp type is one of:
TIMESTAMP_NOT_AVAILABLE
- Timestamps not supported by brokerTIMESTAMP_CREATE_TIME
- Message creation time (or source / producer time)TIMESTAMP_LOG_APPEND_TIME
- Broker receive time
The returned timestamp should be ignored if the timestamp type is
TIMESTAMP_NOT_AVAILABLE
.The timestamp is the number of milliseconds since the epoch (UTC).
Timestamps require broker version 0.10.0.0 or later and
{'api.version.request': True}
configured on the client.returns: tuple of message timestamp type, and timestamp. rtype: (int, int)
-
topic
()¶ Returns: topic name or None if not available. Return type: str or None
-
value
()¶ Returns: message value (payload) or None if not available. Return type: str|bytes or None
-
TopicPartition¶
-
class
confluent_kafka.
TopicPartition
¶ TopicPartition is a generic type to hold a single partition and various information about it.
It is typically used to provide a list of topics or partitions for various operations, such as
Consumer.assign()
.-
TopicPartition
(topic[, partition][, offset])¶ Instantiate a TopicPartition object.
Parameters: - topic (string) – Topic name
- partition (int) – Partition id
- offset (int) – Initial partition offset
Return type:
-
error
¶ Attribute error: Indicates an error (with KafkaError
) unless None.
-
offset
¶ Attribute offset: Offset (long) Either an absolute offset (>=0) or a logical offset:
OFFSET_BEGINNING
,OFFSET_END
,OFFSET_STORED
,OFFSET_INVALID
-
partition
¶ Attribute partition: Partition number (int)
-
topic
¶ Attribute topic: Topic name (string)
-
KafkaError¶
-
class
confluent_kafka.
KafkaError
¶ Kafka error and event object
The KafkaError class serves multiple purposes:
- Propagation of errors
- Propagation of events
- Exceptions
This class is not user-instantiable.
Error and event constants:
Constant Description _BAD_MSG Local: Bad message format _BAD_COMPRESSION Local: Invalid compressed data _DESTROY Local: Broker handle destroyed _FAIL Local: Communication failure with broker _TRANSPORT Local: Broker transport failure _CRIT_SYS_RESOURCE Local: Critical system resource failure _RESOLVE Local: Host resolution failure _MSG_TIMED_OUT Local: Message timed out _PARTITION_EOF Broker: No more messages _UNKNOWN_PARTITION Local: Unknown partition _FS Local: File or filesystem error _UNKNOWN_TOPIC Local: Unknown topic _ALL_BROKERS_DOWN Local: All broker connections are down _INVALID_ARG Local: Invalid argument or configuration _TIMED_OUT Local: Timed out _QUEUE_FULL Local: Queue full _ISR_INSUFF Local: ISR count insufficient _NODE_UPDATE Local: Broker node update _SSL Local: SSL error _WAIT_COORD Local: Waiting for coordinator _UNKNOWN_GROUP Local: Unknown group _IN_PROGRESS Local: Operation in progress _PREV_IN_PROGRESS Local: Previous operation in progress _EXISTING_SUBSCRIPTION Local: Existing subscription _ASSIGN_PARTITIONS Local: Assign partitions _REVOKE_PARTITIONS Local: Revoke partitions _CONFLICT Local: Conflicting use _STATE Local: Erroneous state _UNKNOWN_PROTOCOL Local: Unknown protocol _NOT_IMPLEMENTED Local: Not implemented _AUTHENTICATION Local: Authentication failure _NO_OFFSET Local: No offset stored _OUTDATED Local: Outdated _TIMED_OUT_QUEUE Local: Timed out in queue _UNSUPPORTED_FEATURE Local: Required feature not supported by broker _WAIT_CACHE Local: Awaiting cache update _INTR Local: Operation interrupted _KEY_SERIALIZATION Local: Key serialization error _VALUE_SERIALIZATION Local: Value serialization error _KEY_DESERIALIZATION Local: Key deserialization error _VALUE_DESERIALIZATION Local: Value deserialization error _PARTIAL Local: Partial response _READ_ONLY Local: Read-only object _NOENT Local: No such entry _UNDERFLOW Local: Read underflow _INVALID_TYPE Local: Invalid type UNKNOWN Unknown broker error NO_ERROR Success OFFSET_OUT_OF_RANGE Broker: Offset out of range INVALID_MSG Broker: Invalid message UNKNOWN_TOPIC_OR_PART Broker: Unknown topic or partition INVALID_MSG_SIZE Broker: Invalid message size LEADER_NOT_AVAILABLE Broker: Leader not available NOT_LEADER_FOR_PARTITION Broker: Not leader for partition REQUEST_TIMED_OUT Broker: Request timed out BROKER_NOT_AVAILABLE Broker: Broker not available REPLICA_NOT_AVAILABLE Broker: Replica not available MSG_SIZE_TOO_LARGE Broker: Message size too large STALE_CTRL_EPOCH Broker: StaleControllerEpochCode OFFSET_METADATA_TOO_LARGE Broker: Offset metadata string too large NETWORK_EXCEPTION Broker: Broker disconnected before response received GROUP_LOAD_IN_PROGRESS Broker: Group coordinator load in progress GROUP_COORDINATOR_NOT_AVAILABLE Broker: Group coordinator not available NOT_COORDINATOR_FOR_GROUP Broker: Not coordinator for group TOPIC_EXCEPTION Broker: Invalid topic RECORD_LIST_TOO_LARGE Broker: Message batch larger than configured server segment size NOT_ENOUGH_REPLICAS Broker: Not enough in-sync replicas NOT_ENOUGH_REPLICAS_AFTER_APPEND Broker: Message(s) written to insufficient number of in-sync replicas INVALID_REQUIRED_ACKS Broker: Invalid required acks value ILLEGAL_GENERATION Broker: Specified group generation id is not valid INCONSISTENT_GROUP_PROTOCOL Broker: Inconsistent group protocol INVALID_GROUP_ID Broker: Invalid group.id UNKNOWN_MEMBER_ID Broker: Unknown member INVALID_SESSION_TIMEOUT Broker: Invalid session timeout REBALANCE_IN_PROGRESS Broker: Group rebalance in progress INVALID_COMMIT_OFFSET_SIZE Broker: Commit offset data size is not valid TOPIC_AUTHORIZATION_FAILED Broker: Topic authorization failed GROUP_AUTHORIZATION_FAILED Broker: Group authorization failed CLUSTER_AUTHORIZATION_FAILED Broker: Cluster authorization failed INVALID_TIMESTAMP Broker: Invalid timestamp UNSUPPORTED_SASL_MECHANISM Broker: Unsupported SASL mechanism ILLEGAL_SASL_STATE Broker: Request not valid in current SASL state UNSUPPORTED_VERSION Broker: API version not supported TOPIC_ALREADY_EXISTS Broker: Topic already exists INVALID_PARTITIONS Broker: Invalid number of partitions INVALID_REPLICATION_FACTOR Broker: Invalid replication factor INVALID_REPLICA_ASSIGNMENT Broker: Invalid replica assignment INVALID_CONFIG Broker: Configuration is invalid NOT_CONTROLLER Broker: Not controller for cluster INVALID_REQUEST Broker: Invalid request UNSUPPORTED_FOR_MESSAGE_FORMAT Broker: Message format on broker does not support request POLICY_VIOLATION Broker: Isolation policy volation OUT_OF_ORDER_SEQUENCE_NUMBER Broker: Broker received an out of order sequence number DUPLICATE_SEQUENCE_NUMBER Broker: Broker received a duplicate sequence number INVALID_PRODUCER_EPOCH Broker: Producer attempted an operation with an old epoch INVALID_TXN_STATE Broker: Producer attempted a transactional operation in an invalid state INVALID_PRODUCER_ID_MAPPING Broker: Producer attempted to use a producer id which is not currently assigned to its transactional INVALID_TRANSACTION_TIMEOUT Broker: Transaction timeout is larger than the maximum value allowed by the broker’s max.transaction CONCURRENT_TRANSACTIONS Broker: Producer attempted to update a transaction while another concurrent operation on the same tr TRANSACTION_COORDINATOR_FENCED Broker: Indicates that the transaction coordinator sending a WriteTxnMarker is no longer the current TRANSACTIONAL_ID_AUTHORIZATION_FAILED Broker: Transactional Id authorization failed SECURITY_DISABLED Broker: Security features are disabled OPERATION_NOT_ATTEMPTED Broker: Operation not attempted -
code
()¶ Returns the error/event code for comparison toKafkaError.<ERR_CONSTANTS>.
Returns: error/event code Return type: int
-
name
()¶ Returns the enum name for error/event.
Returns: error/event enum name string Return type: str
-
str
()¶ Returns the human-readable error/event string.
Returns: error/event message string Return type: str
KafkaException¶
-
class
confluent_kafka.
KafkaException
¶ Kafka exception that wraps the
KafkaError
class.Use
exception.args[0]
to extract theKafkaError
object
Offset¶
Logical offset constants:
OFFSET_BEGINNING
- Beginning of partition (oldest offset)OFFSET_END
- End of partition (next offset)OFFSET_STORED
- Use stored/committed offsetOFFSET_INVALID
- Invalid/Default offset
ThrottleEvent¶
-
class
confluent_kafka.
ThrottleEvent
(broker_name, broker_id, throttle_time)¶ ThrottleEvent contains details about a throttled request. Set up a throttle callback by setting the
throttle_cb
configuration property to a callable that takes a ThrottleEvent object as its only argument. The callback will be triggered from poll(), consume() or flush() when a request has been throttled by the broker.This class is typically not user instantiated.
Variables: - broker_name (str) – The hostname of the broker which throttled the request
- broker_id (int) – The broker id
- throttle_time (float) – The amount of time (in seconds) the broker throttled (delayed) the request
Configuration¶
Configuration of producer and consumer instances is performed by providing a dict of configuration properties to the instance constructor, e.g.:
conf = {'bootstrap.servers': 'mybroker.com',
'group.id': 'mygroup', 'session.timeout.ms': 6000,
'on_commit': my_commit_callback,
'auto.offset.reset': 'earliest'}
consumer = confluent_kafka.Consumer(conf)
The supported configuration values are dictated by the underlying librdkafka C library. For the full range of configuration properties please consult librdkafka’s documentation: https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
The Python bindings also provide some additional configuration properties:
default.topic.config
: value is a dict of client topic-level configuration properties that are applied to all used topics for the instance. **DEPRECATED: ** topic configuration should now be specified in the global top-level configuration.error_cb(kafka.KafkaError)
: Callback for generic/global error events. This callback is served upon callingclient.poll()
orproducer.flush()
.throttle_cb(confluent_kafka.ThrottleEvent)
: Callback for throttled request reporting. This callback is served upon callingclient.poll()
orproducer.flush()
.stats_cb(json_str)
: Callback for statistics data. This callback is triggered by poll() or flush everystatistics.interval.ms
(needs to be configured separately). Function argumentjson_str
is a str instance of a JSON document containing statistics data. This callback is served upon callingclient.poll()
orproducer.flush()
. See https://github.com/edenhill/librdkafka/wiki/Statistics” for more information.on_delivery(kafka.KafkaError, kafka.Message)
(Producer): value is a Python function reference that is called once for each produced message to indicate the final delivery result (success or failure). This property may also be set per-message by passingcallback=callable
(oron_delivery=callable
) to the confluent_kafka.Producer.produce() function. Currently message headers are not supported on the message returned to the callback. Themsg.headers()
will return None even if the original message had headers set. This callback is served upon callingproducer.poll()
orproducer.flush()
.on_commit(kafka.KafkaError, list(kafka.TopicPartition))
(Consumer): Callback used to indicate success or failure of asynchronous and automatic commit requests. This callback is served upon callingconsumer.poll()
. Is not triggered for synchronous commits.logger=logging.Handler
kwarg: forward logs from the Kafka client to the providedlogging.Handler
instance. To avoid spontaneous calls from non-Python threads the log messages will only be forwarded whenclient.poll()
orproducer.flush()
are called.mylogger = logging.getLogger() mylogger.addHandler(logging.StreamHandler()) producer = confluent_kafka.Producer({‘bootstrap.servers’: ‘mybroker.com’}, logger=mylogger)