confluent_kafka API¶
A reliable, performant and feature-rich Python client for Apache Kafka v0.8 and above.
- Configuration
- Client API
- AdminClient
- Consumer
- DeserializingConsumer (new API subject to change)
- AvroConsumer (legacy)
- Producer
- SerializingProducer (new API subject to change)
- AvroProducer (legacy)
- SchemaRegistry
- Serialization API
- Supporting classes
Guide to the Transactional Producer API
Kafka Clients¶
AdminClient¶
Kafka admin client: create, view, alter, and delete topics and resources.
-
class
confluent_kafka.admin.
AdminClient
(conf)[source]¶ AdminClient provides admin operations for Kafka brokers, topics, groups, and other resource types supported by the broker.
The Admin API methods are asynchronous and return a dict of concurrent.futures.Future objects keyed by the entity. The entity is a topic name for create_topics(), delete_topics(), create_partitions(), and a ConfigResource for alter_configs() and describe_configs().
All the futures for a single API call will currently finish/fail at the same time (backed by the same protocol request), but this might change in future versions of the client.
See examples/adminapi.py for example usage.
For more information see the Java Admin API documentation.
Requires broker version v0.11.0.0 or later.
-
create_topics
(new_topics, **kwargs)[source]¶ Create one or more new topics.
Parameters: - new_topics (list(NewTopic)) – A list of specifictions (NewTopic) for the topics that should be created.
- operation_timeout (float) – The operation timeout in seconds, controlling how long the CreateTopics request will block on the broker waiting for the topic creation to propagate in the cluster. A value of 0 returns immediately. Default: 0
- request_timeout (float) – The overall request timeout in seconds, including broker lookup, request transmission, operation time on broker, and response. Default: socket.timeout.ms*1000.0
- validate_only (bool) – If true, the request is only validated without creating the topic. Default: False
Returns: A dict of futures for each topic, keyed by the topic name. The future result() method returns None.
Return type: dict(<topic_name, future>)
Raises: - KafkaException – Operation failed locally or on broker.
- TypeException – Invalid input.
- ValueException – Invalid input.
-
delete_topics
(topics, **kwargs)[source]¶ Delete one or more topics.
Parameters: - topics (list(str)) – A list of topics to mark for deletion.
- operation_timeout (float) – The operation timeout in seconds, controlling how long the DeleteTopics request will block on the broker waiting for the topic deletion to propagate in the cluster. A value of 0 returns immediately. Default: 0
- request_timeout (float) – The overall request timeout in seconds, including broker lookup, request transmission, operation time on broker, and response. Default: socket.timeout.ms*1000.0
Returns: A dict of futures for each topic, keyed by the topic name. The future result() method returns None.
Return type: dict(<topic_name, future>)
Raises: - KafkaException – Operation failed locally or on broker.
- TypeException – Invalid input.
- ValueException – Invalid input.
-
list_topics
(*args, **kwargs)[source]¶ -
list_topics
([topic=None][, timeout=-1])[source]
Request metadata from the cluster. This method provides the same information as listTopics(), describeTopics() and describeCluster() in the Java Admin client.
Parameters: - topic (str) – If specified, only request information about this topic, else return results for all topics in cluster. Warning: If auto.create.topics.enable is set to true on the broker and an unknown topic is specified, it will be created.
- timeout (float) – The maximum response time before timing out, or -1 for infinite timeout.
Return type: Raises: KafkaException
-
-
list_groups
(*args, **kwargs)[source]¶ -
list_groups
([group=None][, timeout=-1])[source]
Request Group Metadata from cluster. This method provides the same information as listGroups(), describeGroups() in the Java Admin client.
Parameters: group (str) – If specified, only request info about this group, else return for all groups in cluster :param float timeout: Maximum response time before timing out, or -1 for infinite timeout. Return type: GroupMetadata Raises: KafkaException -
-
create_partitions
(new_partitions, **kwargs)[source]¶ Create additional partitions for the given topics.
Parameters: - new_partitions (list(NewPartitions)) – New partitions to be created.
- operation_timeout (float) – The operation timeout in seconds, controlling how long the CreatePartitions request will block on the broker waiting for the partition creation to propagate in the cluster. A value of 0 returns immediately. Default: 0
- request_timeout (float) – The overall request timeout in seconds, including broker lookup, request transmission, operation time on broker, and response. Default: socket.timeout.ms*1000.0
- validate_only (bool) – If true, the request is only validated without creating the partitions. Default: False
Returns: A dict of futures for each topic, keyed by the topic name. The future result() method returns None.
Return type: dict(<topic_name, future>)
Raises: - KafkaException – Operation failed locally or on broker.
- TypeException – Invalid input.
- ValueException – Invalid input.
-
describe_configs
(resources, **kwargs)[source]¶ Get the configuration of the specified resources.
Warning: Multiple resources and resource types may be requested, but at most one resource of type RESOURCE_BROKER is allowed per call since these resource requests must be sent to the broker specified in the resource.
Parameters: - resources (list(ConfigResource)) – Resources to get the configuration for.
- request_timeout (float) – The overall request timeout in seconds, including broker lookup, request transmission, operation time on broker, and response. Default: socket.timeout.ms*1000.0
Returns: A dict of futures for each resource, keyed by the ConfigResource. The type of the value returned by the future result() method is dict(<configname, ConfigEntry>).
Return type: dict(<ConfigResource, future>)
Raises: - KafkaException – Operation failed locally or on broker.
- TypeException – Invalid input.
- ValueException – Invalid input.
-
alter_configs
(resources, **kwargs)[source]¶ Update configuration properties for the specified resources. Updates are not transactional so they may succeed for a subset of the provided resources while the others fail. The configuration for a particular resource is updated atomically, replacing the specified values while reverting unspecified configuration entries to their default values.
Warning: alter_configs() will replace all existing configuration for the provided resources with the new configuration given, reverting all other configuration for the resource back to their default values.
Warning: Multiple resources and resource types may be specified, but at most one resource of type RESOURCE_BROKER is allowed per call since these resource requests must be sent to the broker specified in the resource.
Parameters: - resources (list(ConfigResource)) – Resources to update configuration of.
- request_timeout (float) – The overall request timeout in seconds, including broker lookup, request transmission, operation time on broker, and response. Default: socket.timeout.ms*1000.0.
- validate_only (bool) – If true, the request is validated only, without altering the configuration. Default: False
Returns: A dict of futures for each resource, keyed by the ConfigResource. The future result() method returns None.
Return type: dict(<ConfigResource, future>)
Raises: - KafkaException – Operation failed locally or on broker.
- TypeException – Invalid input.
- ValueException – Invalid input.
-
create_acls
(acls, **kwargs)[source]¶ Create one or more ACL bindings.
Parameters: - acls (list(AclBinding)) – A list of ACL binding specifications (
AclBinding
) to create. - request_timeout (float) – The overall request timeout in seconds, including broker lookup, request transmission, operation time on broker, and response. Default: socket.timeout.ms*1000.0
Returns: A dict of futures for each ACL binding, keyed by the
AclBinding
object. The future result() method returns None on success.Return type: dict[AclBinding, future]
Raises: - KafkaException – Operation failed locally or on broker.
- TypeException – Invalid input.
- ValueException – Invalid input.
- acls (list(AclBinding)) – A list of ACL binding specifications (
-
describe_acls
(acl_binding_filter, **kwargs)[source]¶ Match ACL bindings by filter.
Parameters: - acl_binding_filter (AclBindingFilter) – a filter with attributes that
must match.
String attributes match exact values or any string if set to None.
Enums attributes match exact values or any value if ending with _ANY.
If
ResourcePatternType
is set toResourcePatternType.MATCH
returns all the ACL bindings withResourcePatternType.LITERAL
,ResourcePatternType.WILDCARD
orResourcePatternType.PREFIXED
pattern type that match the resource name. - request_timeout (float) – The overall request timeout in seconds, including broker lookup, request transmission, operation time on broker, and response. Default: socket.timeout.ms*1000.0
Returns: A future returning a list(
AclBinding
) as resultReturn type: future
Raises: - KafkaException – Operation failed locally or on broker.
- TypeException – Invalid input.
- ValueException – Invalid input.
- acl_binding_filter (AclBindingFilter) – a filter with attributes that
must match.
String attributes match exact values or any string if set to None.
Enums attributes match exact values or any value if ending with _ANY.
If
-
delete_acls
(acl_binding_filters, **kwargs)[source]¶ Delete ACL bindings matching one or more ACL binding filters.
Parameters: - acl_binding_filters (list(AclBindingFilter)) – a list of ACL binding filters
to match ACLs to delete.
String attributes match exact values or any string if set to None.
Enums attributes match exact values or any value if ending with _ANY.
If
ResourcePatternType
is set toResourcePatternType.MATCH
deletes all the ACL bindings withResourcePatternType.LITERAL
,ResourcePatternType.WILDCARD
orResourcePatternType.PREFIXED
pattern type that match the resource name. - request_timeout (float) – The overall request timeout in seconds, including broker lookup, request transmission, operation time on broker, and response. Default: socket.timeout.ms*1000.0
Returns: A dict of futures for each ACL binding filter, keyed by the
AclBindingFilter
object. The future result() method returns a list ofAclBinding
.Return type: dict[AclBindingFilter, future]
Raises: - KafkaException – Operation failed locally or on broker.
- TypeException – Invalid input.
- ValueException – Invalid input.
- acl_binding_filters (list(AclBindingFilter)) – a list of ACL binding filters
to match ACLs to delete.
String attributes match exact values or any string if set to None.
Enums attributes match exact values or any value if ending with _ANY.
If
-
-
class
confluent_kafka.admin.
ClusterMetadata
[source]¶ Provides information about the Kafka cluster, brokers, and topics. Returned by list_topics().
This class is typically not user instantiated.
-
cluster_id
= None¶ Cluster id string, if supported by the broker, else None.
-
controller_id
= None¶ Current controller broker id, or -1.
-
brokers
= None¶ Map of brokers indexed by the broker id (int). Value is a BrokerMetadata object.
-
topics
= None¶ Map of topics indexed by the topic name. Value is a TopicMetadata object.
-
orig_broker_id
= None¶ The broker this metadata originated from.
-
orig_broker_name
= None¶ The broker name/address this metadata originated from.
-
-
class
confluent_kafka.admin.
BrokerMetadata
[source]¶ Provides information about a Kafka broker.
This class is typically not user instantiated.
-
id
= None¶ Broker id
-
host
= None¶ Broker hostname
-
port
= None¶ Broker port
-
-
class
confluent_kafka.admin.
TopicMetadata
[source]¶ Provides information about a Kafka topic.
This class is typically not user instantiated.
-
topic
= None¶ Topic name
-
partitions
= None¶ Map of partitions indexed by partition id. Value is a PartitionMetadata object.
-
error
= None¶ Topic error, or None. Value is a KafkaError object.
-
-
class
confluent_kafka.admin.
PartitionMetadata
[source]¶ Provides information about a Kafka partition.
This class is typically not user instantiated.
Warning: Depending on cluster state the broker ids referenced in leader, replicas and ISRs may temporarily not be reported in ClusterMetadata.brokers. Always check the availability of a broker id in the brokers dict. -
id
= None¶ Partition id.
-
leader
= None¶ Current leader broker for this partition, or -1.
-
replicas
= None¶ List of replica broker ids for this partition.
-
isrs
= None¶ List of in-sync-replica broker ids for this partition.
-
error
= None¶ Partition error, or None. Value is a KafkaError object.
-
-
class
confluent_kafka.admin.
GroupMember
[source]¶ Provides information about a group member.
For more information on the metadata format, refer to: A Guide To The Kafka Protocol.
This class is typically not user instantiated.
-
id
= None¶ Member id (generated by broker).
-
client_id
= None¶ Client id.
-
client_host
= None¶ Client hostname.
-
metadata
= None¶ Member metadata(binary), format depends on protocol type.
-
assignment
= None¶ Member assignment(binary), format depends on protocol type.
-
-
class
confluent_kafka.admin.
GroupMetadata
[source]¶ GroupMetadata provides information about a Kafka consumer group
This class is typically not user instantiated.
-
broker
= None¶ Originating broker metadata.
-
id
= None¶ Group name.
-
error
= None¶ Broker-originated error, or None. Value is a KafkaError object.
-
state
= None¶ Group state.
-
protocol_type
= None¶ Group protocol type.
-
protocol
= None¶ Group protocol.
-
members
= None¶ Group members.
-
NewTopic¶
-
class
confluent_kafka.admin.
NewTopic
¶ NewTopic specifies per-topic settings for passing to AdminClient.create_topics().
-
NewTopic
(topic, num_partitions[, replication_factor][, replica_assignment][, config])¶ Instantiate a NewTopic object.
Parameters: - topic (string) – Topic name
- num_partitions (int) – Number of partitions to create
- replication_factor (int) – Replication factor of partitions, or -1 if replica_assignment is used.
- replica_assignment (list) – List of lists with the replication assignment for each new partition.
- config (dict) – Dict (str:str) of topic configuration. See http://kafka.apache.org/documentation.html#topicconfigs
Return type:
-
config
¶ attribute: Optional topic configuration. See http://kafka.apache.org/documentation.html#topicconfigs.
Type: py
-
num_partitions
¶ attribute: Number of partitions (int)
Type: py
-
replica_assignment
¶ attribute: Replication assignment (list of lists). The outer list index represents the partition index, the inner list is the replica assignment (broker ids) for that partition. replication_factor and replica_assignment are mutually exclusive.
Type: py
-
replication_factor
¶ attribute: Replication factor (int). Must be set to -1 if a replica_assignment is specified.
Type: py
-
topic
¶ attribute:topic - Topic name (string)
Type: py
-
NewPartitions¶
-
class
confluent_kafka.admin.
NewPartitions
¶ NewPartitions specifies per-topic settings for passing to passed to AdminClient.create_partitions().
-
NewPartitions
(topic, new_total_count[, replica_assignment])¶ Instantiate a NewPartitions object.
Parameters: - topic (string) – Topic name
- new_total_cnt (int) – Increase the topic’s partition count to this value.
- replica_assignment (list) – List of lists with the replication assignment for each new partition.
Return type:
-
new_total_count
¶ attribute: Total number of partitions (int)
Type: py
-
replica_assignment
¶ attribute: Replication assignment (list of lists). The outer list index represents the partition index, the inner list is the replica assignment (broker ids) for that partition.
Type: py
-
topic
¶ attribute:topic - Topic name (string)
Type: py
-
ConfigSource¶
-
class
confluent_kafka.admin.
ConfigSource
[source]¶ Enumerates the different sources of configuration properties. Used by ConfigEntry to specify the source of configuration properties returned by describe_configs().
-
UNKNOWN_CONFIG
= 0¶ Unknown
-
DYNAMIC_TOPIC_CONFIG
= 1¶ Dynamic Topic
-
DYNAMIC_BROKER_CONFIG
= 2¶ Dynamic Broker
-
DYNAMIC_DEFAULT_BROKER_CONFIG
= 3¶ Dynamic Default Broker
-
STATIC_BROKER_CONFIG
= 4¶ Static Broker
-
DEFAULT_CONFIG
= 5¶ Default
-
ConfigEntry¶
-
class
confluent_kafka.admin.
ConfigEntry
(name, value, source=<ConfigSource.UNKNOWN_CONFIG: 0>, is_read_only=False, is_default=False, is_sensitive=False, is_synonym=False, synonyms=[])[source]¶ Represents a configuration property. Returned by describe_configs() for each configuration entry of the specified resource.
This class is typically not user instantiated.
-
name
= None¶ Configuration property name.
-
value
= None¶ Configuration value (or None if not set or is_sensitive==True).
-
source
= None¶ Configuration source.
-
is_read_only
= None¶ Indicates whether the configuration property is read-only.
-
is_default
= None¶ Indicates whether the configuration property is using its default value.
-
is_sensitive
= None¶ Indicates whether the configuration property value contains sensitive information (such as security settings), in which case .value is None.
-
is_synonym
= None¶ Indicates whether the configuration property is a synonym for the parent configuration entry.
-
synonyms
= None¶ A list of synonyms (ConfigEntry) and alternate sources for this configuration property.
-
ConfigResource¶
-
class
confluent_kafka.admin.
ConfigResource
(restype, name, set_config=None, described_configs=None, error=None)[source]¶ Represents a resource that has configuration, and (optionally) a collection of configuration properties for that resource. Used by describe_configs() and alter_configs().
Parameters: - restype (ConfigResource.Type) – The resource type.
- name (str) – The resource name, which depends on the resource type. For RESOURCE_BROKER, the resource name is the broker id.
- set_config (dict) – The configuration to set/overwrite. Dictionary of str, str.
-
Type
¶ alias of
confluent_kafka.admin._resource.ResourceType
-
set_config
(name, value, overwrite=True)[source]¶ Set/overwrite a configuration value.
When calling alter_configs, any configuration properties that are not included in the request will be reverted to their default values. As a workaround, use describe_configs() to retrieve the current configuration and overwrite the settings you want to change.
Parameters: - name (str) – Configuration property name
- value (str) – Configuration value
- overwrite (bool) – If True, overwrite entry if it already exists (default). If False, do nothing if entry already exists.
ResourceType¶
-
class
confluent_kafka.admin.
ResourceType
[source]¶ Enumerates the different types of Kafka resources.
-
UNKNOWN
= 0¶ Resource type is not known or not set.
-
ANY
= 1¶ Match any resource, used for lookups.
-
TOPIC
= 2¶ Topic resource. Resource name is topic name.
-
GROUP
= 3¶ Group resource. Resource name is group.id.
-
BROKER
= 4¶ Broker resource. Resource name is broker id.
-
ResourcePatternType¶
-
class
confluent_kafka.admin.
ResourcePatternType
[source]¶ Enumerates the different types of Kafka resource patterns.
-
UNKNOWN
= 0¶ Resource pattern type is not known or not set.
-
ANY
= 1¶ Match any resource, used for lookups.
-
MATCH
= 2¶ will perform pattern matching
Type: Match
-
LITERAL
= 3¶ A literal resource name
Type: Literal
-
PREFIXED
= 4¶ A prefixed resource name
Type: Prefixed
-
AclOperation¶
-
class
confluent_kafka.admin.
AclOperation
[source]¶ Enumerates the different types of ACL operation.
-
UNKNOWN
= 0¶ Unknown
-
ANY
= 1¶ In a filter, matches any AclOperation
-
ALL
= 2¶ ALL the operations
-
READ
= 3¶ READ operation
-
WRITE
= 4¶ WRITE operation
-
CREATE
= 5¶ CREATE operation
-
DELETE
= 6¶ DELETE operation
-
ALTER
= 7¶ ALTER operation
-
DESCRIBE
= 8¶ DESCRIBE operation
-
CLUSTER_ACTION
= 9¶ CLUSTER_ACTION operation
-
DESCRIBE_CONFIGS
= 10¶ DESCRIBE_CONFIGS operation
-
ALTER_CONFIGS
= 11¶ ALTER_CONFIGS operation
-
IDEMPOTENT_WRITE
= 12¶ IDEMPOTENT_WRITE operation
-
AclPermissionType¶
AclBinding¶
-
class
confluent_kafka.admin.
AclBinding
(restype, name, resource_pattern_type, principal, host, operation, permission_type)[source]¶ Represents an ACL binding that specify the operation and permission type for a specific principal over one or more resources of the same type. Used by
AdminClient.create_acls()
, returned byAdminClient.describe_acls()
andAdminClient.delete_acls()
.Parameters: - restype (ResourceType) – The resource type.
- name (str) – The resource name, which depends on the resource type. For
ResourceType.BROKER
, the resource name is the broker id. - resource_pattern_type (ResourcePatternType) – The resource pattern, relative to the name.
- principal (str) – The principal this AclBinding refers to.
- host (str) – The host that the call is allowed to come from.
- operation (AclOperation) – The operation/s specified by this binding.
- permission_type (AclPermissionType) – The permission type for the specified operation.
AclBindingFilter¶
-
class
confluent_kafka.admin.
AclBindingFilter
(restype, name, resource_pattern_type, principal, host, operation, permission_type)[source]¶ Represents an ACL binding filter used to return a list of ACL bindings matching some or all of its attributes. Used by
AdminClient.describe_acls()
andAdminClient.delete_acls()
.Parameters: - restype (ResourceType) – The resource type, or
ResourceType.ANY
to match any value. - name (str) – The resource name to match. None matches any value.
- resource_pattern_type (ResourcePatternType) – The resource pattern,
ResourcePatternType.ANY
to match any value orResourcePatternType.MATCH
to perform pattern matching. - principal (str) – The principal to match, or None to match any value.
- host (str) – The host to match, or None to match any value.
- operation (AclOperation) – The operation to match or
AclOperation.ANY
to match any value. - permission_type (AclPermissionType) – The permission type to match or
AclPermissionType.ANY
to match any value.
- restype (ResourceType) – The resource type, or
Consumer¶
-
class
confluent_kafka.
Consumer
¶ A high-level Apache Kafka consumer
-
Consumer
(config)¶
Create a new Consumer instance using the provided configuration dict (including properties and callback functions). See Kafka Client Configuration for more information.
Parameters: config (dict) – Configuration properties. At a minimum, group.id
must be set andbootstrap.servers
should be set.-
assign
()¶ -
assign
(partitions)
Set the consumer partition assignment to the provided list of
TopicPartition
and start consuming.Parameters: partitions (list(TopicPartition)) – List of topic+partitions and optionally initial offsets to start consuming from. Raises: KafkaException Raises: RuntimeError if called on a closed consumer -
-
assignment
()¶ Returns the current partition assignment.
Returns: List of assigned topic+partitions. Return type: list(TopicPartition) Raises: KafkaException Raises: RuntimeError if called on a closed consumer
-
close
()¶ Close down and terminate the Kafka Consumer.
Actions performed:
- Stops consuming.
- Commits offsets, unless the consumer property ‘enable.auto.commit’ is set to False.
- Leaves the consumer group.
Return type: None
-
commit
()¶ -
commit
([message=None][, offsets=None][, asynchronous=True])
Commit a message or a list of offsets.
The
message
andoffsets
parameters are mutually exclusive. If neither is set, the current partition assignment’s offsets are used instead. Use this method to commit offsets if you have ‘enable.auto.commit’ set to False.Parameters: - message (confluent_kafka.Message) – Commit the message’s offset+1. Note: By convention, committed offsets reflect the next message to be consumed, not the last message consumed.
- offsets (list(TopicPartition)) – List of topic+partitions+offsets to commit.
- asynchronous (bool) – If true, asynchronously commit, returning None immediately. If False, the commit() call will block until the commit succeeds or fails and the committed offsets will be returned (on success). Note that specific partitions may have failed and the .err field of each partition should be checked for success.
Return type: None|list(TopicPartition)
Raises: KafkaException
Raises: RuntimeError if called on a closed consumer
-
-
committed
()¶ -
committed
(partitions[, timeout=None])
Retrieve committed offsets for the specified partitions.
Parameters: - partitions (list(TopicPartition)) – List of topic+partitions to query for stored offsets.
- timeout (float) – Request timeout (seconds).
Returns: List of topic+partitions with offset and possibly error set.
Return type: list(TopicPartition)
Raises: KafkaException
Raises: RuntimeError if called on a closed consumer
-
-
consume
()¶ -
consume
([num_messages=1][, timeout=-1])
Consumes a list of messages (possibly empty on timeout). Callbacks may be executed as a side effect of calling this method.
The application must check the returned
Message
object’sMessage.error()
method to distinguish between proper messages (error() returns None) and errors for eachMessage
in the list (see error().code() for specifics). If the enable.partition.eof configuration property is set to True, partition EOF events will also be exposed as Messages with error().code() set to _PARTITION_EOF.Parameters: - num_messages (int) – The maximum number of messages to return (default: 1).
- timeout (float) – The maximum time to block waiting for message, event or callback (default: infinite (-1)). (Seconds)
Returns: A list of Message objects (possibly empty on timeout)
Return type: list(Message)
Raises: - RuntimeError – if called on a closed consumer
- KafkaError – in case of internal error
- ValueError – if num_messages > 1M
-
-
consumer_group_metadata
()¶ -
consumer_group_metadata
()
Returns: An opaque object representing the consumer’s current group metadata for passing to the transactional producer’s send_offsets_to_transaction() API. -
-
get_watermark_offsets
()¶ -
get_watermark_offsets
(partition[, timeout=None][, cached=False])
Retrieve low and high offsets for the specified partition.
Parameters: - partition (TopicPartition) – Topic+partition to return offsets for.
- timeout (float) – Request timeout (seconds). Ignored if cached=True.
- cached (bool) – Instead of querying the broker, use cached information. Cached values: The low offset is updated periodically (if statistics.interval.ms is set) while the high offset is updated on each message fetched from the broker for this partition.
Returns: Tuple of (low,high) on success or None on timeout. The high offset is the offset of the last message + 1.
Return type: tuple(int,int)
Raises: KafkaException
Raises: RuntimeError if called on a closed consumer
-
-
incremental_assign
()¶ -
incremental_assign
(partitions)
Incrementally add the provided list of :py:class:`TopicPartition`s to the current partition assignment. This list must not contain duplicate entries, or any entry corresponding to an already assigned partition. When a COOPERATIVE assignor (i.e. incremental rebalancing) is being used, this method may be used in the on_assign callback to update the current assignment and specify start offsets. The application should pass a list of partitions identical to the list passed to the callback, even if the list is empty. Note that if you do not call incremental_assign in your on_assign handler, this will be done automatically and start offsets will be the last committed offsets, or determined via the auto offset reset policy (auto.offset.reset) if there are none. This method may also be used outside the context of a rebalance callback.
Parameters: partitions (list(TopicPartition)) – List of topic+partitions and optionally initial offsets to start consuming from. Raises: KafkaException Raises: RuntimeError if called on a closed consumer -
-
incremental_unassign
()¶ -
incremental_unassign
(partitions)
Incrementally remove the provided list of
TopicPartition
from the current partition assignment. This list must not contain dupliate entries and all entries specified must be part of the current assignment. When a COOPERATIVE assignor (i.e. incremental rebalancing) is being used, this method may be used in the on_revoke or on_lost callback to update the current assignment. The application should pass a list of partitions identical to the list passed to the callback. This method may also be used outside the context of a rebalance callback. The value of the TopicPartition offset field is ignored by this method.Parameters: partitions (list(TopicPartition)) – List of topic+partitions to remove from the current assignment. Raises: KafkaException Raises: RuntimeError if called on a closed consumer -
-
list_topics
()¶ -
list_topics
([topic=None][, timeout=-1])
Request metadata from the cluster. This method provides the same information as listTopics(), describeTopics() and describeCluster() in the Java Admin client.
Parameters: - topic (str) – If specified, only request information about this topic, else return results for all topics in cluster. Warning: If auto.create.topics.enable is set to true on the broker and an unknown topic is specified, it will be created.
- timeout (float) – The maximum response time before timing out, or -1 for infinite timeout.
Return type: Raises: KafkaException
-
-
offsets_for_times
()¶ -
offsets_for_times
(partitions[, timeout=None])
Look up offsets by timestamp for the specified partitions.
The returned offset for each partition is the earliest offset whose timestamp is greater than or equal to the given timestamp in the corresponding partition. If the provided timestamp exceeds that of the last message in the partition, a value of -1 will be returned.
param list(TopicPartition) partitions: topic+partitions with timestamps in the TopicPartition.offset field. param float timeout: Request timeout (seconds). returns: List of topic+partition with offset field set and possibly error set rtype: list(TopicPartition) raises: KafkaException raises: RuntimeError if called on a closed consumer -
-
pause
()¶ -
pause
(partitions)
Pause consumption for the provided list of partitions.
Parameters: partitions (list(TopicPartition)) – List of topic+partitions to pause. Return type: None Raises: KafkaException -
-
poll
()¶ -
poll
([timeout=None])
Consumes a single message, calls callbacks and returns events.
The application must check the returned
Message
object’sMessage.error()
method to distinguish between proper messages (error() returns None), or an event or error (see error().code() for specifics).Parameters: timeout (float) – Maximum time to block waiting for message, event or callback (default: infinite (None translated into -1 in the library)). (Seconds) Returns: A Message object or None on timeout Return type: Message
or NoneRaises: RuntimeError if called on a closed consumer -
-
position
()¶ -
position
(partitions)
Retrieve current positions (offsets) for the specified partitions.
Parameters: partitions (list(TopicPartition)) – List of topic+partitions to return current offsets for. The current offset is the offset of the last consumed message + 1. Returns: List of topic+partitions with offset and possibly error set. Return type: list(TopicPartition) Raises: KafkaException Raises: RuntimeError if called on a closed consumer -
-
resume
()¶ -
resume
(partitions)
Resume consumption for the provided list of partitions.
Parameters: partitions (list(TopicPartition)) – List of topic+partitions to resume. Return type: None Raises: KafkaException -
-
seek
()¶ -
seek
(partition)
Set consume position for partition to offset. The offset may be an absolute (>=0) or a logical offset (
OFFSET_BEGINNING
et.al).seek() may only be used to update the consume offset of an actively consumed partition (i.e., after
assign()
), to set the starting offset of partition not being consumed instead pass the offset in an assign() call.Parameters: partition (TopicPartition) – Topic+partition+offset to seek to. Raises: KafkaException -
-
store_offsets
()¶ -
store_offsets
([message=None][, offsets=None])
Store offsets for a message or a list of offsets.
message
andoffsets
are mutually exclusive. The stored offsets will be committed according to ‘auto.commit.interval.ms’ or manual offset-lesscommit()
. Note that ‘enable.auto.offset.store’ must be set to False when using this API.Parameters: - message (confluent_kafka.Message) – Store message’s offset+1.
- offsets (list(TopicPartition)) – List of topic+partitions+offsets to store.
Return type: None
Raises: KafkaException
Raises: RuntimeError if called on a closed consumer
-
-
subscribe
()¶ -
subscribe
(topics[, on_assign=None][, on_revoke=None][, on_lost=None]) Set subscription to supplied list of topics This replaces a previous subscription.
Regexp pattern subscriptions are supported by prefixing the topic string with
"^"
, e.g.:consumer.subscribe(["^my_topic.*", "^another[0-9]-?[a-z]+$", "not_a_regex"])
Parameters: - topics (list(str)) – List of topics (strings) to subscribe to.
- on_assign (callable) – callback to provide handling of customized offsets on completion of a successful partition re-assignment.
- on_revoke (callable) – callback to provide handling of offset commits to a customized store on the start of a rebalance operation.
- on_lost (callable) – callback to provide handling in the case the partition assignment has been lost. If not specified, lost partition events will be delivered to on_revoke, if specified. Partitions that have been lost may already be owned by other members in the group and therefore committing offsets, for example, may fail.
Raises: Raises: RuntimeError if called on a closed consumer
-
on_assign
(consumer, partitions)¶
-
on_revoke
(consumer, partitions)¶
-
on_lost
(consumer, partitions)¶ Parameters: - consumer (Consumer) – Consumer instance.
- partitions (list(TopicPartition)) – Absolute list of partitions being assigned or revoked.
-
-
unassign
()¶ Removes the current partition assignment and stops consuming.
Raises: - KafkaException –
- RuntimeError – if called on a closed consumer
-
unsubscribe
()¶ Remove current subscription.
Raises: KafkaException Raises: RuntimeError if called on a closed consumer
-
DeserializingConsumer¶
-
class
confluent_kafka.
DeserializingConsumer
(conf)[source]¶ A client that consumes records from a Kafka cluster. With deserialization capabilities.
Note
The DeserializingConsumer is an experimental API and subject to change.
New in version 1.4.0: The
key.deserializer
andvalue.deserializer
classes instruct the DeserializingConsumer on how to convert the message payload bytes to objects.Note
All configured callbacks are served from the application queue upon calling
DeserializingConsumer.poll()
Notable DeserializingConsumer configuration properties(* indicates required field)
Property Name Type Description bootstrap.servers
*str Comma-separated list of brokers. group.id
*str Client group id string. All clients sharing the same group.id belong to the same group. key.deserializer
callable Callable(SerializationContext, bytes) -> obj
Deserializer used for message keys.
value.deserializer
callable Callable(SerializationContext, bytes) -> obj
Deserializer used for message values.
error_cb
callable Callable(KafkaError)
Callback for generic/global error events. These errors are typically to be considered informational since the client will automatically try to recover.
logger
logging.Handler
Logging handler to forward logs stats_cb
callable Callable(str)
Callback for statistics. This callback is added to the application queue every
statistics.interval.ms
(configured separately). The function argument is a JSON formatted str containing statistics data.throttle_cb
callable Callable(ThrottleEvent)
Callback for throttled request reporting.
See also
- CONFIGURATION.md for additional configuration property details.
- STATISTICS.md for detailed information about the statistics handled by stats_cb
Parameters: conf (dict) – DeserializingConsumer configuration. Raises: ValueError
– if configuration validation failsInherited-members: -
poll
(timeout=-1)[source]¶ Consume messages and calls callbacks.
Parameters: timeout (float) – Maximum time to block waiting for message(Seconds).
Returns: Message
or None on timeoutRaises: KeyDeserializationError
– If an error occurs during keydeserialization.
ValueDeserializationError
– If an error occurs during valuedeserialization.
- ConsumeError if an error was encountered while polling.
-
consume
(num_messages=1, timeout=-1)[source]¶ Consumer.consume()
not implemented, useDeserializingConsumer.poll()
instead
Producer¶
-
class
confluent_kafka.
Producer
¶ Asynchronous Kafka Producer
-
Producer
(config)¶ Parameters: config (dict) – Configuration properties. At a minimum bootstrap.servers
should be setCreate a new Producer instance using the provided configuration dict.
-
__len__
(self)¶ Producer implements __len__ that can be used as len(producer) to obtain number of messages waiting. :returns: Number of messages and Kafka protocol requests waiting to be delivered to broker. :rtype: int
-
abort_transaction
()¶ -
abort_transaction
([timeout])
Aborts the current transaction. This function should also be used to recover from non-fatal abortable transaction errors when KafkaError.txn_requires_abort() is True.
Any outstanding messages will be purged and fail with _PURGE_INFLIGHT or _PURGE_QUEUE.
Note: This function will block until all outstanding messages are purged and the transaction abort request has been successfully handled by the transaction coordinator, or until the timeout expires, which ever comes first. On timeout the application may call the function again.
Note: Will automatically call purge() and flush() to ensure all queued and in-flight messages are purged before attempting to abort the transaction.
Parameters: timeout (float) – The maximum amount of time to block waiting for transaction to abort in seconds. Raises: KafkaError: Use exc.args[0].retriable() to check if the operation may be retried. Treat any other error as a fatal error. -
-
begin_transaction
()¶ -
begin_transaction
()
Begin a new transaction.
init_transactions() must have been called successfully (once) before this function is called.
Any messages produced or offsets sent to a transaction, after the successful return of this function will be part of the transaction and committed or aborted atomically.
Complete the transaction by calling commit_transaction() or Abort the transaction by calling abort_transaction().
Raises: KafkaError: Use exc.args[0].retriable() to check if the operation may be retried, else treat the error as a fatal error. -
-
commit_transaction
()¶ -
commit_transaction
([timeout])
Commmit the current transaction. Any outstanding messages will be flushed (delivered) before actually committing the transaction.
If any of the outstanding messages fail permanently the current transaction will enter the abortable error state and this function will return an abortable error, in this case the application must call abort_transaction() before attempting a new transaction with begin_transaction().
Note: This function will block until all outstanding messages are delivered and the transaction commit request has been successfully handled by the transaction coordinator, or until the timeout expires, which ever comes first. On timeout the application may call the function again.
Note: Will automatically call flush() to ensure all queued messages are delivered before attempting to commit the transaction. Delivery reports and other callbacks may thus be triggered from this method.
Parameters: timeout (float) – The amount of time to block in seconds. Raises: KafkaError: Use exc.args[0].retriable() to check if the operation may be retried, or exc.args[0].txn_requires_abort() if the current transaction has failed and must be aborted by calling abort_transaction() and then start a new transaction with begin_transaction(). Treat any other error as a fatal error. -
-
flush
()¶ -
flush
([timeout]) - Wait for all messages in the Producer queue to be delivered. This is a convenience method that calls
poll()
untillen()
is zero or the optional timeout elapses.Param: float timeout: Maximum time to block (requires librdkafka >= v0.9.4). (Seconds) Returns: Number of messages still in queue.
Note
See
poll()
for a description on what callbacks may be triggered.-
-
init_transactions
()¶ Initializes transactions for the producer instance.
This function ensures any transactions initiated by previous instances of the producer with the same transactional.id are completed. If the previous instance failed with a transaction in progress the previous transaction will be aborted. This function needs to be called before any other transactional or produce functions are called when the transactional.id is configured.
If the last transaction had begun completion (following transaction commit) but not yet finished, this function will await the previous transaction’s completion.
When any previous transactions have been fenced this function will acquire the internal producer id and epoch, used in all future transactional messages issued by this producer instance.
Upon successful return from this function the application has to perform at least one of the following operations within transactional.timeout.ms to avoid timing out the transaction on the broker: * produce() (et.al) * send_offsets_to_transaction() * commit_transaction() * abort_transaction()
Parameters: timeout (float) – Maximum time to block in seconds. Raises: KafkaError: Use exc.args[0].retriable() to check if the operation may be retried, else treat the error as a fatal error.
-
list_topics
()¶ -
list_topics
([topic=None][, timeout=-1])
Request metadata from the cluster. This method provides the same information as listTopics(), describeTopics() and describeCluster() in the Java Admin client.
Parameters: - topic (str) – If specified, only request information about this topic, else return results for all topics in cluster. Warning: If auto.create.topics.enable is set to true on the broker and an unknown topic is specified, it will be created.
- timeout (float) – The maximum response time before timing out, or -1 for infinite timeout.
Return type: Raises: KafkaException
-
-
poll
()¶ -
poll
([timeout])
Polls the producer for events and calls the corresponding callbacks (if registered).
Callbacks:
on_delivery
callbacks fromproduce()
- …
Parameters: timeout (float) – Maximum time to block waiting for events. (Seconds) Returns: Number of events processed (callbacks served) Return type: int -
-
produce
()¶ -
produce
(topic[, value][, key][, partition][, on_delivery][, timestamp][, headers])
Produce message to topic. This is an asynchronous operation, an application may use the
callback
(aliason_delivery
) argument to pass a function (or lambda) that will be called frompoll()
when the message has been successfully delivered or permanently fails delivery.Currently message headers are not supported on the message returned to the callback. The
msg.headers()
will return None even if the original message had headers set.Parameters: - topic (str) – Topic to produce message to
- value (str|bytes) – Message payload
- key (str|bytes) – Message key
- partition (int) – Partition to produce to, else uses the configured built-in partitioner.
- on_delivery(err,msg) (func) – Delivery report callback to call (from
poll()
orflush()
) on successful or failed delivery - timestamp (int) – Message timestamp (CreateTime) in milliseconds since epoch UTC (requires librdkafka >= v0.9.4, api.version.request=true, and broker >= 0.10.0.0). Default value is current time.
- headers (dict|list) – Message headers to set on the message. The header key must be a string while the value must be binary, unicode or None. Accepts a list of (key,value) or a dict. (Requires librdkafka >= v0.11.4 and broker version >= 0.11.0.0)
Return type: None
Raises: - BufferError – if the internal producer message queue is full (
queue.buffering.max.messages
exceeded) - KafkaException – for other errors, see exception code
- NotImplementedError – if timestamp is specified without underlying library support.
-
-
purge
()¶ -
purge
([in_queue=True][, in_flight=True][, blocking=True]) Purge messages currently handled by the producer instance. The application will need to call poll() or flush() afterwards to serve the delivery report callbacks of the purged messages.
Param: bool in_queue: Purge messages from internal queues. By default, true. Param: bool in_flight: Purge messages in flight to or from the broker. By default, true. Param: bool blocking: If set to False, will not wait on background thread queue purging to finish. By default, true. -
-
send_offsets_to_transaction
()¶ -
send_offsets_to_transaction
(positions, group_metadata[, timeout])
Sends a list of topic partition offsets to the consumer group coordinator for group_metadata and marks the offsets as part of the current transaction. These offsets will be considered committed only if the transaction is committed successfully.
The offsets should be the next message your application will consume, i.e., the last processed message’s offset + 1 for each partition. Either track the offsets manually during processing or use consumer.position() (on the consumer) to get the current offsets for the partitions assigned to the consumer.
Use this method at the end of a consume-transform-produce loop prior to committing the transaction with commit_transaction().
- Note: The consumer must disable auto commits
- (set enable.auto.commit to false on the consumer).
Note: Logical and invalid offsets (e.g., OFFSET_INVALID) in offsets will be ignored. If there are no valid offsets in offsets the function will return successfully and no action will be taken.
Parameters: - offsets (list(TopicPartition)) – current consumer/processing position(offsets) for the list of partitions.
- group_metadata (object) – consumer group metadata retrieved from the input consumer’s get_consumer_group_metadata().
- timeout (float) – Amount of time to block in seconds.
Raises: KafkaError: Use exc.args[0].retriable() to check if the operation may be retried, or exc.args[0].txn_requires_abort() if the current transaction has failed and must be aborted by calling abort_transaction() and then start a new transaction with begin_transaction(). Treat any other error as a fatal error.
-
-
SerializingProducer¶
-
class
confluent_kafka.
SerializingProducer
(conf)[source]¶ A high level Kafka Producer with serialization capabilities.
Note
The SerializingProducer is an experimental API and subject to change.
The SerializingProducer is thread safe and sharing a single instance across threads will generally be more efficient than having multiple instances.
The
SerializingProducer.produce()
method is asynchronous. When called it adds the message to a queue of pending messages and immediately returns. This allows the Producer to batch together individual messages for efficiency.The Producer will automatically retry failed produce requests up to
message.timeout.ms
.New in version 1.4.0: The Transactional Producer allows an application to send messages to multiple partitions (and topics) atomically.
The
key.serializer
andvalue.serializer
classes instruct the SerializingProducer on how to convert the message payload to bytes.Note
All configured callbacks are served from the application queue upon calling
SerializingProducer.poll()
orSerializingProducer.flush()
Notable SerializingProducer configuration properties(* indicates required field)
Property Name Type Description bootstrap.servers
*str Comma-separated list of brokers. key.serializer
callable Callable(obj, SerializationContext) -> bytes
Serializer used for message keys.
value.serializer
callable Callable(obj, SerializationContext) -> bytes
Serializer used for message values.
error_cb
callable Callable(KafkaError)
Callback for generic/global error events. These errors are typically to be considered informational since the client will automatically try to recover.
logger
logging.Handler
Logging handler to forward logs stats_cb
callable Callable(str)
Callback for statistics. This callback is added to the application queue every
statistics.interval.ms
(configured separately). The function argument is a JSON formatted str containing statistics data.throttle_cb
callable Callable(ThrottleEvent)
Callback for throttled request reporting. Callback for throttled request reporting.
See also
- CONFIGURATION.md for additional configuration property details.
- STATISTICS.md for detailed information about the statistics handled by stats_cb
Parameters: conf (producer) – SerializingProducer configuration. Inherited-members: -
produce
(topic, key=None, value=None, partition=-1, on_delivery=None, timestamp=0, headers=None)[source]¶ Produce message to topic.
This is an asynchronous operation, an application may use the
on_delivery
argument to pass a function (or lambda) that will be called fromSerializingProducer.poll()
when the message has been successfully delivered or permanently fails delivery.Currently message headers are not supported on the message returned to the callback. The
msg.headers()
will return None even if the original message had headers set.Parameters: - topic (str) – Topic to produce message to.
- key (object, optional) – Message key.
- value (object, optional) – Message payload.
- partition (int, optional) – Partition to produce to, else uses the configured built-in partitioner.
- on_delivery (callable(KafkaError, Message), optional) – Delivery
report callback to call (from
SerializingProducer.poll()
orSerializingProducer.flush()
on successful or failed delivery. - timestamp (float, optional) – Message timestamp (CreateTime) in ms since epoch UTC (requires broker >= 0.10.0.0). Default value is current time.
- headers (dict, optional) – Message headers to set on the message. The header key must be a str while the value must be binary, unicode or None. (Requires broker version >= 0.11.0.0)
Raises: BufferError
– if the internal producer message queue is full. (queue.buffering.max.messages
exceeded). If this happens the application should callSerializingProducer.Poll()
and try again.KeySerializationError
– If an error occurs during key serialization.ValueSerializationError
– If an error occurs during valueserialization.
KafkaException
– For all other errors
SchemaRegistryClient¶
-
class
confluent_kafka.schema_registry.
SchemaRegistryClient
(conf)[source]¶ Schema Registry Client.
SchemaRegistryClient configuration properties (* indicates a required field):
Property name type Description url
*str Schema Registry URL. ssl.ca.location
str Path to CA certificate file used to verify the Schema Registry’s private key. ssl.key.location
str Path to client’s private key (PEM) used for authentication.
ssl.certificate.location
must also be set.ssl.certificate.location
str Path to client’s public key (PEM) used for authentication.
May be set without ssl.key.location if the private key is stored within the PEM as well.
basic.auth.user.info
str Client HTTP credentials in the form of
username:password
.By default userinfo is extracted from the URL if present.
Parameters: conf (dict) – Schema Registry client configuration. -
register_schema
(subject_name, schema)[source]¶ Registers a schema under
subject_name
.Parameters: - subject_name (str) – subject to register a schema under
- schema (Schema) – Schema instance to register
Returns: Schema id
Return type: int
Raises: SchemaRegistryError
– if Schema violates this subject’s Compatibility policy or is otherwise invalid.See also
-
get_schema
(schema_id)[source]¶ Fetches the schema associated with
schema_id
from the Schema Registry. The result is cached so subsequent attempts will not require an additional round-trip to the Schema Registry.Parameters: schema_id (int) – Schema id Returns: Schema instance identified by the schema_id
Return type: Schema Raises: SchemaRegistryError
– If schema can’t be found.See also
-
lookup_schema
(subject_name, schema)[source]¶ Returns
schema
registration information forsubject
.Parameters: - subject_name (str) – Subject name the schema is registered under
- schema (Schema) – Schema instance.
Returns: Subject registration information for this schema.
Return type: Raises: SchemaRegistryError
– If schema or subject can’t be foundSee also
-
get_subjects
()[source]¶ List all subjects registered with the Schema Registry
Returns: Registered subject names Return type: list(str) Raises: SchemaRegistryError
– if subjects can’t be foundSee also
-
delete_subject
(subject_name, permanent=False)[source]¶ Deletes the specified subject and its associated compatibility level if registered. It is recommended to use this API only when a topic needs to be recycled or in development environments.
Parameters: - subject_name (str) – subject name
- permanent (bool) – True for a hard delete, False (default) for a soft delete
Returns: Versions deleted under this subject
Return type: list(int)
Raises: SchemaRegistryError
– if the request was unsuccessful.See also
-
get_latest_version
(subject_name)[source]¶ Retrieves latest registered version for subject
Parameters: subject_name (str) – Subject name. Returns: Registration information for this version. Return type: RegisteredSchema Raises: SchemaRegistryError
– if the version can’t be found or is invalid.See also
-
get_version
(subject_name, version)[source]¶ Retrieves a specific schema registered under
subject_name
.Parameters: - subject_name (str) – Subject name.
- version (int) – version number. Defaults to latest version.
Returns: Registration information for this version.
Return type: Raises: SchemaRegistryError
– if the version can’t be found or is invalid.See also
-
get_versions
(subject_name)[source]¶ Get a list of all versions registered with this subject.
Parameters: subject_name (str) – Subject name. Returns: Registered versions Return type: list(int) Raises: SchemaRegistryError
– If subject can’t be foundSee also
-
delete_version
(subject_name, version)[source]¶ Deletes a specific version registered to
subject_name
.Parameters: - subject_name (str) –
- version (int) – Version number
Returns: Version number which was deleted
Return type: int
Raises: SchemaRegistryError
– if the subject or version cannot be found.See also
-
set_compatibility
(subject_name=None, level=None)[source]¶ Update global or subject level compatibility level.
Parameters: - level (str) – Compatibility level. See API reference for a list of valid values.
- subject_name (str, optional) – Subject to update. Sets compatibility level policy if not set.
Returns: The newly configured compatibility level.
Return type: str
Raises: SchemaRegistryError
– If the compatibility level is invalid.
-
get_compatibility
(subject_name=None)[source]¶ Get the current compatibility level.
Parameters: subject_name (str, optional) – Subject name. Returns global policy if left unset. Returns: Compatibility level for the subject if set, otherwise the global compatibility level. Return type: str Raises: SchemaRegistryError
– if the request was unsuccessful.
-
test_compatibility
(subject_name, schema, version='latest')[source]¶ Test the compatibility of a candidate schema for a given subject and version
Parameters: - subject_name (str) – Subject name the schema is registered under
- schema (Schema) – Schema instance.
- version (int or str, optional) – Version number, or the string “latest”. Defaults to “latest”.
Returns: True if the schema is compatible with the specified version
Return type: bool
Raises: SchemaRegistryError
– if the request was unsuccessful.
-
AvroProducer(Legacy)¶
-
class
confluent_kafka.avro.
AvroProducer
(config, default_key_schema=None, default_value_schema=None, schema_registry=None, **kwargs)[source]¶ Kafka Producer client which does avro schema encoding to messages. Handles schema registration, Message serialization.
Constructor takes below parameters.
Parameters: - config (dict) – Config parameters containing url for schema registry (
schema.registry.url
) and the standard Kafka client configuration (bootstrap.servers
et.al). - default_key_schema (str) – Optional default avro schema for key
- default_value_schema (str) – Optional default avro schema for value
-
produce
(**kwargs)[source]¶ Asynchronously sends message to Kafka by encoding with specified or default avro schema.
Parameters: - topic (str) – topic name
- value (object) – An object to serialize
- value_schema (str) – Avro schema for value
- key (object) – An object to serialize
- key_schema (str) – Avro schema for key
Plus any other parameters accepted by confluent_kafka.Producer.produce
Raises: - SerializerError – On serialization failure
- BufferError – If producer queue is full.
- KafkaException – For other produce failures.
- config (dict) – Config parameters containing url for schema registry (
AvroConsumer(Legacy)¶
-
class
confluent_kafka.avro.
AvroConsumer
(config, schema_registry=None, reader_key_schema=None, reader_value_schema=None, **kwargs)[source]¶ Kafka Consumer client which does avro schema decoding of messages. Handles message deserialization.
Constructor takes below parameters
Parameters: - config (dict) – Config parameters containing url for schema registry (
schema.registry.url
) and the standard Kafka client configuration (bootstrap.servers
et.al) - reader_key_schema (schema) – a reader schema for the message key
- reader_value_schema (schema) – a reader schema for the message value
Raises: ValueError – For invalid configurations
-
poll
(timeout=None)[source]¶ This is an overriden method from confluent_kafka.Consumer class. This handles message deserialization using avro schema
Parameters: timeout (float) – Poll timeout in seconds (default: indefinite) Returns: message object with deserialized key and value as dict objects Return type: Message
- config (dict) – Config parameters containing url for schema registry (
Serialization API¶
Deserializer¶
-
class
confluent_kafka.serialization.
Deserializer
[source]¶ Extensible class from which all Deserializer implementations derive. Deserializers instruct Kafka clients on how to convert bytes to objects.
See built-in implementations, listed below, for an example of how to extend this class.
Note
This class is not directly instantiable. The derived classes must be used instead.
The following implementations are provided by this module.
Note
Unless noted elsewhere all numeric types are signed and serialization is big-endian.
Name Type Binary Format DoubleDeserializer float IEEE 764 binary64 IntegerDeserializer int int32 StringDeserializer unicode unicode(encoding) -
__call__
(value, ctx)[source]¶ Convert bytes to object
Parameters: - value (bytes) – bytes to be deserialized
- ctx (SerializationContext) – Metadata pertaining to the serialization operation
Raises: SerializerError if an error occurs during deserialization
Returns: object if data is not None, otherwise None
-
AvroDeserializer¶
-
class
confluent_kafka.schema_registry.avro.
AvroDeserializer
(schema_registry_client, schema_str=None, from_dict=None, return_record_name=False)[source]¶ AvroDeserializer decodes bytes written in the Schema Registry Avro format to an object.
Note
Complex Types
are returned as dicts. If a more specific instance type is desired a callable,from_dict
, may be registered with the AvroDeserializer which converts a dict to the desired type.See
avro_consumer.py
in the examples directory in the examples directory for example usage.Parameters: - schema_registry_client (SchemaRegistryClient) – Confluent Schema Registry client instance.
- schema_str (str, optional) – Avro reader schema declaration. If not provided, writer schema is used for deserialization.
- from_dict (callable, optional) – Callable(dict, SerializationContext) -> object. Converts dict to an instance of some object.
- return_record_name (bool) – If True, when reading a union of records, the result will be a tuple where the first value is the name of the record and the second value is the record itself. Defaults to False.
-
__call__
(value, ctx)[source]¶ Decodes a Confluent Schema Registry formatted Avro bytes to an object.
Parameters: - value (bytes) – bytes
- ctx (SerializationContext) – Metadata pertaining to the serialization operation.
Raises: SerializerError
– if an error occurs ready data.Returns: object if
from_dict
is set, otherwise dict. If no value is supplied None is returned.Return type: object
DoubleDeserializer¶
-
class
confluent_kafka.serialization.
DoubleDeserializer
[source]¶ Deserializes float to IEEE 764 binary64.
See also
-
__call__
(value, ctx)[source]¶ Deserializes float from IEEE 764 binary64 bytes.
Parameters: - value (bytes) – bytes to be deserialized
- ctx (SerializationContext) – Metadata pertaining to the serialization operation
Raises: SerializerError if an error occurs during deserialization.
Returns: float if data is not None, otherwise None
-
IntegerDeserializer¶
-
class
confluent_kafka.serialization.
IntegerDeserializer
[source]¶ Deserializes int to int32 bytes.
See also
-
__call__
(value, ctx)[source]¶ Deserializes int from int32 bytes.
Parameters: - value (bytes) – bytes to be deserialized
- ctx (SerializationContext) – Metadata pertaining to the serialization operation
Raises: SerializerError if an error occurs during deserialization.
Returns: int if data is not None, otherwise None
-
JSONDeserializer¶
-
class
confluent_kafka.schema_registry.json_schema.
JSONDeserializer
(schema_str, from_dict=None)[source]¶ JsonDeserializer decodes bytes written in the Schema Registry JSON format to an object.
Parameters: - schema_str (str) – JSON schema definition use for validating records.
- from_dict (callable, optional) – Callable(dict, SerializationContext) -> object. Converts dict to an instance of some object.
-
__call__
(value, ctx)[source]¶ Deserializes Schema Registry formatted JSON to JSON object literal(dict).
Parameters: - value (bytes) – Confluent Schema Registry formatted JSON bytes
- ctx (SerializationContext) – Metadata pertaining to the serialization operation.
Returns: Deserialized JSON
Return type: dict
Raises: SerializerError
– Ifvalue
cannot be validated by the schema configured with this JsonDeserializer instance.
ProtobufDeserializer¶
-
class
confluent_kafka.schema_registry.protobuf.
ProtobufDeserializer
(message_type, conf=None)[source]¶ ProtobufDeserializer decodes bytes written in the Schema Registry Protobuf format to an object.
Parameters: - message_type (GeneratedProtocolMessageType) – Protobuf Message type.
- conf (dict) – Configuration dictionary.
ProtobufDeserializer configuration properties:
Property Name Type Description use.deprecated.format
bool Specifies whether the Protobuf deserializer should deserialize message indexes without zig-zag encoding. This option must be explicitly configured as older and newer Protobuf producers are incompatible. If Protobuf messages in the topic to consume were produced with confluent-kafka-python <1.8 then this property must be set to True until all old messages have been processed and producers have been upgraded. Warning: This configuration property will be removed in a future version of the client. See Also: Protobuf API reference
-
__call__
(value, ctx)[source]¶ Deserializes Schema Registry formatted Protobuf to Protobuf Message.
Parameters: - value (bytes) – Confluent Schema Registry formatted Protobuf bytes.
- ctx (SerializationContext) – Metadata pertaining to the serialization operation.
Returns: Protobuf Message instance.
Return type: Raises: SerializerError
– If response payload and expected Message typediffer.
StringDeserializer¶
-
class
confluent_kafka.serialization.
StringDeserializer
(codec='utf_8')[source]¶ Deserializes a str(py2:unicode) from bytes.
Parameters: codec (str, optional) – encoding scheme. Defaults to utf_8 -
__call__
(value, ctx)[source]¶ Serializes unicode to bytes per the configured codec. Defaults to
utf_8
.- Compatibility Note:
Python 2 str objects must be converted to unicode objects by the application prior to using this serializer.
Python 3 all str objects are already unicode objects.
Parameters: - value (bytes) – bytes to be deserialized
- ctx (SerializationContext) – Metadata pertaining to the serialization operation
Raises: SerializerError if an error occurs during deserialization.
Returns: unicode if data is not None, otherwise None
-
Serializer¶
-
class
confluent_kafka.serialization.
Serializer
[source]¶ Extensible class from which all Serializer implementations derive. Serializers instruct Kafka clients on how to convert Python objects to bytes.
See built-in implementations, listed below, for an example of how to extend this class.
Note
This class is not directly instantiable. The derived classes must be used instead.
The following implementations are provided by this module.
Note
Unless noted elsewhere all numeric types are signed and serialization is big-endian.
Name Type Binary Format DoubleSerializer float IEEE 764 binary64 IntegerSerializer int int32 StringSerializer unicode unicode(encoding) -
__call__
(obj, ctx)[source]¶ Converts obj to bytes.
Parameters: - obj (object) – object to be serialized
- ctx (SerializationContext) – Metadata pertaining to the serialization operation
Raises: SerializerError if an error occurs during serialization
Returns: bytes if obj is not None, otherwise None
-
AvroSerializer¶
-
class
confluent_kafka.schema_registry.avro.
AvroSerializer
(schema_registry_client, schema_str, to_dict=None, conf=None)[source]¶ AvroSerializer serializes objects in the Confluent Schema Registry binary format for Avro.
AvroSerializer configuration properties:
Property Name Type Description auto.register.schemas
bool Registers schemas automatically if not previously associated with a particular subject. Defaults to True. | Whether to use the latest subject version foruse.latest.version
| bool | serialization.- | WARNING: There is no check that the latest| schema is backwards compatible with the object| being serialized.| Defaults to False.
| Callable(SerializationContext, str) -> strsubject.name.strategy
| callable | Instructs the AvroSerializer on how to construct- | Schema Registry subject names.| Defaults to topic_subject_name_strategy.
Schemas are registered to namespaces known as Subjects which define how a schema may evolve over time. By default the subject name is formed by concatenating the topic name with the message field separated by a hyphen.
i.e. {topic name}-{message field}
Alternative naming strategies may be configured with the property
subject.name.strategy
.Supported subject name strategies:
Subject Name Strategy Output Format topic_subject_name_strategy(default) {topic name}-{message field} topic_record_subject_name_strategy {topic name}-{record name} record_subject_name_strategy {record name} See Subject name strategy for additional details.
Note
Prior to serialization all
Complex Types
must first be converted to a dict instance. This may handled manually prior to callingSerializingProducer.produce()
or by registering a to_dict callable with the AvroSerializer.See
avro_producer.py
in the examples directory for example usage.Note
Tuple notation can be used to determine which branch of an ambiguous union to take.
Parameters: - schema_registry_client (SchemaRegistryClient) – Schema Registry client instance.
- schema_str (str) – Avro Schema Declaration.
- to_dict (callable, optional) – Callable(object, SerializationContext) -> dict. Converts object to a dict.
- conf (dict) – AvroSerializer configuration.
-
__call__
(obj, ctx)[source]¶ Serializes an object to the Confluent Schema Registry’s Avro binary format.
Parameters: - obj (object) – object instance to serializes.
- ctx (SerializationContext) – Metadata pertaining to the serialization operation.
Note
None objects are represented as Kafka Null.
Raises: SerializerError
– if any error occurs serializing objReturns: Confluent Schema Registry formatted Avro bytes Return type: bytes
DoubleSerializer¶
-
class
confluent_kafka.serialization.
DoubleSerializer
[source]¶ Serializes float to IEEE 764 binary64.
See also
-
__call__
(obj, ctx)[source]¶ Parameters: - obj (object) – object to be serialized
- ctx (SerializationContext) – Metadata pertaining to the serialization operation
Note
None objects are represented as Kafka Null.
Raises: SerializerError if an error occurs during serialization. Returns: IEEE 764 binary64 bytes if obj is not None, otherwise None
-
IntegerSerializer¶
-
class
confluent_kafka.serialization.
IntegerSerializer
[source]¶ Serializes int to int32 bytes.
See also
-
__call__
(obj, ctx)[source]¶ Serializes int as int32 bytes.
Parameters: - obj (object) – object to be serialized
- ctx (SerializationContext) – Metadata pertaining to the serialization operation
Note
None objects are represented as Kafka Null.
Raises: SerializerError if an error occurs during serialization Returns: int32 bytes if obj is not None, else None
-
JSONSerializer¶
-
class
confluent_kafka.schema_registry.json_schema.
JSONSerializer
(schema_str, schema_registry_client, to_dict=None, conf=None)[source]¶ JsonSerializer serializes objects in the Confluent Schema Registry binary format for JSON.
JsonSerializer configuration properties:
Property Name Type Description auto.register.schemas
bool Registers schemas automatically if not previously associated with a particular subject. Defaults to True. | Whether to use the latest subject version foruse.latest.version
| bool | serialization.- | WARNING: There is no check that the latest| schema is backwards compatible with the object| being serialized.| Defaults to False.
| Callable(SerializationContext, str) -> strsubject.name.strategy
| callable | Instructs the JsonSerializer on how to construct- | Schema Registry subject names.| Defaults to topic_subject_name_strategy.
Schemas are registered to namespaces known as Subjects which define how a schema may evolve over time. By default the subject name is formed by concatenating the topic name with the message field separated by a hyphen.
i.e. {topic name}-{message field}
Alternative naming strategies may be configured with the property
subject.name.strategy
.Supported subject name strategies:
Subject Name Strategy Output Format topic_subject_name_strategy(default) {topic name}-{message field} topic_record_subject_name_strategy {topic name}-{record name} record_subject_name_strategy {record name} See Subject name strategy for additional details.
Note
The
title
annotation, referred to as a record name elsewhere in this document, is not strictly required by the JSON Schema specification. It is however required by this Serializer. This annotation(record name) is used to register the Schema with the Schema Registry. See documentation below for additional details on Subjects and schema registration.Parameters: - schema_str (str) – JSON Schema definition.
- schema_registry_client (SchemaRegistryClient) – Schema Registry client instance.
- to_dict (callable, optional) – Callable(object, SerializationContext) -> dict. Converts object to a dict.
- conf (dict) – JsonSerializer configuration.
-
__call__
(obj, ctx)[source]¶ Serializes an object to the Confluent Schema Registry’s JSON binary format.
Parameters: - obj (object) – object instance to serialize.
- ctx (SerializationContext) – Metadata pertaining to the serialization operation.
Note
None objects are represented as Kafka Null.
Raises: SerializerError if any error occurs serializing obj Returns: Confluent Schema Registry formatted JSON bytes Return type: bytes
ProtobufSerializer¶
-
class
confluent_kafka.schema_registry.protobuf.
ProtobufSerializer
(msg_type, schema_registry_client, conf=None)[source]¶ ProtobufSerializer serializes objects in the Confluent Schema Registry binary format for Protobuf.
ProtobufSerializer configuration properties:
Schemas are registered to namespaces known as Subjects which define how a schema may evolve over time. By default the subject name is formed by concatenating the topic name with the message field separated by a hyphen.
i.e. {topic name}-{message field}
Alternative naming strategies may be configured with the property
subject.name.strategy
.Supported subject name strategies
Subject Name Strategy Output Format topic_subject_name_strategy(default) {topic name}-{message field} topic_record_subject_name_strategy {topic name}-{record name} record_subject_name_strategy {record name} See Subject name strategy for additional details.
Parameters: - msg_type (GeneratedProtocolMessageType) – Protobuf Message type.
- schema_registry_client (SchemaRegistryClient) – Schema Registry client instance.
- conf (dict) – ProtobufSerializer configuration.
See also
-
__call__
(message_type, ctx)[source]¶ Serializes a Protobuf Message to the Confluent Schema Registry Protobuf binary format.
Parameters: - message_type (Message) – Protobuf message instance.
- ctx (SerializationContext) – Metadata pertaining to the serialization operation.
Note
None objects are represented as Kafka Null.
Raises: SerializerError if any error occurs serializing obj Returns: Confluent Schema Registry formatted Protobuf bytes Return type: bytes
StringSerializer¶
-
class
confluent_kafka.serialization.
StringSerializer
(codec='utf_8')[source]¶ Serializes unicode to bytes per the configured codec. Defaults to
utf_8
.Note
None objects are represented as Kafka Null.
Parameters: codec (str, optional) – encoding scheme. Defaults to utf_8 -
__call__
(obj, ctx)[source]¶ Serializes a str(py2:unicode) to bytes.
- Compatibility Note:
- Python 2 str objects must be converted to unicode objects. Python 3 all str objects are already unicode objects.
Parameters: - obj (object) – object to be serialized
- ctx (SerializationContext) – Metadata pertaining to the serialization operation
Raises: SerializerError if an error occurs during serialization.
Returns: serialized bytes if obj is not None, otherwise None
-
Transactional Producer API¶
The transactional producer operates on top of the idempotent producer,
and provides full exactly-once semantics (EOS) for Apache Kafka when used
with the transaction aware consumer (isolation.level=read_committed
).
A producer instance is configured for transactions by setting the transactional.id to an identifier unique for the application. This id will be used to fence stale transactions from previous instances of the application, typically following an outage or crash.
After creating the transactional producer instance the transactional state
must be initialized by calling
confluent_kafka.Producer.init_transactions()
.
This is a blocking call that will acquire a runtime producer id from the
transaction coordinator broker as well as abort any stale transactions and
fence any still running producer instances with the same transactional.id
.
Once transactions are initialized the application may begin a new
transaction by calling confluent_kafka.Producer.begin_transaction()
.
A producer instance may only have one single on-going transaction.
Any messages produced after the transaction has been started will
belong to the ongoing transaction and will be committed or aborted
atomically.
It is not permitted to produce messages outside a transaction
boundary, e.g., before confluent_kafka.Producer.begin_transaction()
or after confluent_kafka.commit_transaction()
,
confluent_kafka.Producer.abort_transaction()
, or after the current
transaction has failed.
If consumed messages are used as input to the transaction, the consumer
instance must be configured with enable.auto.commit set to false.
To commit the consumed offsets along with the transaction pass the
list of consumed partitions and the last offset processed + 1 to
confluent_kafka.Producer.send_offsets_to_transaction()
prior to
committing the transaction.
This allows an aborted transaction to be restarted using the previously
committed offsets.
To commit the produced messages, and any consumed offsets, to the
current transaction, call
confluent_kafka.Producer.commit_transaction()
.
This call will block until the transaction has been fully committed or
failed (typically due to fencing by a newer producer instance).
Alternatively, if processing fails, or an abortable transaction error is
raised, the transaction needs to be aborted by calling
confluent_kafka.Producer.abort_transaction()
which marks any produced
messages and offset commits as aborted.
After the current transaction has been committed or aborted a new
transaction may be started by calling
confluent_kafka.Producer.begin_transaction()
again.
Retriable errors
Some error cases allow the attempted operation to be retried, this is
indicated by the error object having the retriable flag set which can
be detected by calling confluent_kafka.KafkaError.retriable()
on
the KafkaError object.
When this flag is set the application may retry the operation immediately
or preferably after a shorter grace period (to avoid busy-looping).
Retriable errors include timeouts, broker transport failures, etc.
Abortable errors
An ongoing transaction may fail permanently due to various errors,
such as transaction coordinator becoming unavailable, write failures to the
Apache Kafka log, under-replicated partitions, etc.
At this point the producer application must abort the current transaction
using confluent_kafka.Producer.abort_transaction()
and optionally
start a new transaction by calling
confluent_kafka.Producer.begin_transaction()
.
Whether an error is abortable or not is detected by calling
confluent_kafka.KafkaError.txn_requires_abort()
.
Fatal errors
While the underlying idempotent producer will typically only raise
fatal errors for unrecoverable cluster errors where the idempotency
guarantees can’t be maintained, most of these are treated as abortable by
the transactional producer since transactions may be aborted and retried
in their entirety;
The transactional producer on the other hand introduces a set of additional
fatal errors which the application needs to handle by shutting down the
producer and terminate. There is no way for a producer instance to recover
from fatal errors.
Whether an error is fatal or not is detected by calling
confluent_kafka.KafkaError.fatal()
.
Handling of other errors
For errors that have neither retriable, abortable or the fatal flag set it is not always obvious how to handle them. While some of these errors may be indicative of bugs in the application code, such as when an invalid parameter is passed to a method, other errors might originate from the broker and be passed thru as-is to the application. The general recommendation is to treat these errors, that have neither the retriable or abortable flags set, as fatal.
Error handling example
while True:
try:
producer.commit_transaction(10.0)
break
except KafkaException as e:
if e.args[0].retriable():
# retriable error, try again
continue
elif e.args[0].txn_requires_abort():
# abort current transaction, begin a new transaction,
# and rewind the consumer to start over.
producer.abort_transaction()
producer.begin_transaction()
rewind_consumer_offsets...()
else:
# treat all other errors as fatal
raise
Kafka Client Configuration¶
Configuration of producer and consumer instances is performed by providing a dict of configuration properties to the instance constructor, e.g.
conf = {'bootstrap.servers': 'mybroker.com',
'group.id': 'mygroup',
'session.timeout.ms': 6000,
'on_commit': my_commit_callback,
'auto.offset.reset': 'earliest'}
consumer = confluent_kafka.Consumer(conf)
The supported configuration values are dictated by the underlying librdkafka C library. For the full range of configuration properties please consult librdkafka’s documentation: https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
The Python bindings also provide some additional configuration properties:
default.topic.config
: value is a dict of client topic-level configuration properties that are applied to all used topics for the instance. DEPRECATED: topic configuration should now be specified in the global top-level configuration.error_cb(kafka.KafkaError)
: Callback for generic/global error events, these errors are typically to be considered informational since the client will automatically try to recover. This callback is served upon callingclient.poll()
orproducer.flush()
.throttle_cb(confluent_kafka.ThrottleEvent)
: Callback for throttled request reporting. This callback is served upon callingclient.poll()
orproducer.flush()
.stats_cb(json_str)
: Callback for statistics data. This callback is triggered by poll() or flush everystatistics.interval.ms
(needs to be configured separately). Function argumentjson_str
is a str instance of a JSON document containing statistics data. This callback is served upon callingclient.poll()
orproducer.flush()
. See https://github.com/edenhill/librdkafka/wiki/Statistics” for more information.oauth_cb(config_str)
: Callback for retrieving OAuth Bearer token. Function argumentconfig_str
is a str from config:sasl.oauthbearer.config
. Return value of this callback is expected to be(token_str, expiry_time)
tuple whereexpiry_time
is the time in seconds since the epoch as a floating point number. This callback is useful only whensasl.mechanisms=OAUTHBEARER
is set and is served to get the initial token before a successful broker connection can be made. The callback can be triggered by callingclient.poll()
orproducer.flush()
.on_delivery(kafka.KafkaError, kafka.Message)
(Producer): value is a Python function reference that is called once for each produced message to indicate the final delivery result (success or failure). This property may also be set per-message by passingcallback=callable
(oron_delivery=callable
) to the confluent_kafka.Producer.produce() function. Currently message headers are not supported on the message returned to the callback. Themsg.headers()
will return None even if the original message had headers set. This callback is served upon callingproducer.poll()
orproducer.flush()
.on_commit(kafka.KafkaError, list(kafka.TopicPartition))
(Consumer): Callback used to indicate success or failure of asynchronous and automatic commit requests. This callback is served upon callingconsumer.poll()
. Is not triggered for synchronous commits. Callback arguments: KafkaError is the commit error, or None on success. list(TopicPartition) is the list of partitions with their committed offsets or per-partition errors.logger=logging.Handler
kwarg: forward logs from the Kafka client to the providedlogging.Handler
instance. To avoid spontaneous calls from non-Python threads the log messages will only be forwarded whenclient.poll()
orproducer.flush()
are called. For example:
mylogger = logging.getLogger()
mylogger.addHandler(logging.StreamHandler())
producer = confluent_kafka.Producer({'bootstrap.servers': 'mybroker.com'}, logger=mylogger)
Supporting Classes¶
Message¶
-
class
confluent_kafka.
Message
¶ The Message object represents either a single consumed or produced message, or an event (
error()
is not None).An application must check with
error()
to see if the object is a proper message (error() returns None) or an error/event.This class is not user-instantiable.
-
len
()¶ Returns: Message value (payload) size in bytes Return type: int
-
error
()¶ The message object is also used to propagate errors and events, an application must check error() to determine if the Message is a proper message (error() returns None) or an error or event (error() returns a KafkaError object)
Return type: None or KafkaError
-
headers
()¶ Retrieve the headers set on a message. Each header is a key valuepair. Please note that header keys are ordered and can repeat.
Returns: list of two-tuples, one (key, value) pair for each header. Return type: [(str, bytes),..] or None.
-
key
()¶ Returns: message key or None if not available. Return type: str|bytes or None
-
latency
()¶ Retrieve the time it took to produce the message, from calling produce() to the time the acknowledgement was received from the broker. Must only be used with the producer for successfully produced messages.
returns: latency as float seconds, or None if latency information is not available (such as for errored messages). rtype: float or None
-
offset
()¶ Returns: message offset or None if not available. Return type: int or None
-
partition
()¶ Returns: partition number or None if not available. Return type: int or None
-
set_headers
()¶ Set the field ‘Message.headers’ with new value.
Parameters: value (object) – Message.headers. Returns: None. Return type: None
-
set_key
()¶ Set the field ‘Message.key’ with new value.
Parameters: value (object) – Message.key. Returns: None. Return type: None
-
set_value
()¶ Set the field ‘Message.value’ with new value.
Parameters: value (object) – Message.value. Returns: None. Return type: None
-
timestamp
()¶ Retrieve timestamp type and timestamp from message. The timestamp type is one of:
TIMESTAMP_NOT_AVAILABLE
- Timestamps not supported by broker.TIMESTAMP_CREATE_TIME
- Message creation time (or source / producer time).TIMESTAMP_LOG_APPEND_TIME
- Broker receive time.
The returned timestamp should be ignored if the timestamp type is
TIMESTAMP_NOT_AVAILABLE
.The timestamp is the number of milliseconds since the epoch (UTC).
Timestamps require broker version 0.10.0.0 or later and
{'api.version.request': True}
configured on the client.returns: tuple of message timestamp type, and timestamp. rtype: (int, int)
-
topic
()¶ Returns: topic name or None if not available. Return type: str or None
-
value
()¶ Returns: message value (payload) or None if not available. Return type: str|bytes or None
-
TopicPartition¶
-
class
confluent_kafka.
TopicPartition
¶ TopicPartition is a generic type to hold a single partition and various information about it.
It is typically used to provide a list of topics or partitions for various operations, such as
Consumer.assign()
.-
TopicPartition
(topic[, partition][, offset])¶ Instantiate a TopicPartition object.
Parameters: - topic (string) – Topic name
- partition (int) – Partition id
- offset (int) – Initial partition offset
Return type:
-
error
¶ Indicates an error (with
KafkaError
) unless None.Type: attribute error
-
offset
¶ Offset (long)
Either an absolute offset (>=0) or a logical offset:
OFFSET_BEGINNING
,OFFSET_END
,OFFSET_STORED
,OFFSET_INVALID
Type: attribute offset
-
partition
¶ Partition number (int)
Type: attribute partition
-
topic
¶ Topic name (string)
Type: attribute topic
-
MessageField¶
SerializationContext¶
-
class
confluent_kafka.serialization.
SerializationContext
(topic, field, headers=None)[source]¶ SerializationContext provides additional context to the serializer/deserializer about the data it’s serializing/deserializing.
Parameters: - topic (str) – Topic data is being produce to or consumed from.
- field (MessageField) – Describes what part of the message is being serialized.
- headers (list) – List of message header tuples. Defaults to None.
Schema¶
-
class
confluent_kafka.schema_registry.
Schema
(schema_str, schema_type, references=[])[source]¶ An unregistered Schema.
Parameters: - schema_str (str) – String representation of the schema.
- references ([SchemaReference]) – SchemaReferences used in this schema.
- schema_type (str) – The schema type: AVRO, PROTOBUF or JSON.
RegisteredSchema¶
-
class
confluent_kafka.schema_registry.
RegisteredSchema
(schema_id, schema, subject, version)[source]¶ Schema registration information.
Represents a Schema registered with a subject. Use this class when you need a specific version of a subject such as forming a SchemaReference.
Parameters: - schema_id (int) – Registered Schema id
- schema (Schema) – Registered Schema
- subject (str) – Subject this schema is registered under
- version (int) – Version of this subject this schema is registered to
SchemaRegistryError¶
-
class
confluent_kafka.schema_registry.error.
SchemaRegistryError
(http_status_code, error_code, error_message)[source]¶ Represents an error returned by the Confluent Schema Registry
Parameters: - http_status_code (int) – HTTP status code
- error_code (int) – Schema Registry error code; -1 represents an unknown error.
- error_message (str) – Description of the error
See also
KafkaError¶
-
class
confluent_kafka.
KafkaError
¶ Kafka error and event object
The KafkaError class serves multiple purposes
- Propagation of errors
- Propagation of events
- Exceptions
Parameters: - error_code (KafkaError) – Error code indicating the type of error.
- reason (str) – Alternative message to describe the error.
- fatal (bool) – Set to true if a fatal error.
- retriable (bool) – Set to true if operation is retriable.
- txn_requires_abort (bool) – Set to true if this is an abortable
- error. (transaction) –
Error and event constants:
Constant Description _BAD_MSG Local: Bad message format _BAD_COMPRESSION Local: Invalid compressed data _DESTROY Local: Broker handle destroyed _FAIL Local: Communication failure with broker _TRANSPORT Local: Broker transport failure _CRIT_SYS_RESOURCE Local: Critical system resource failure _RESOLVE Local: Host resolution failure _MSG_TIMED_OUT Local: Message timed out _PARTITION_EOF Broker: No more messages _UNKNOWN_PARTITION Local: Unknown partition _FS Local: File or filesystem error _UNKNOWN_TOPIC Local: Unknown topic _ALL_BROKERS_DOWN Local: All broker connections are down _INVALID_ARG Local: Invalid argument or configuration _TIMED_OUT Local: Timed out _QUEUE_FULL Local: Queue full _ISR_INSUFF Local: ISR count insufficient _NODE_UPDATE Local: Broker node update _SSL Local: SSL error _WAIT_COORD Local: Waiting for coordinator _UNKNOWN_GROUP Local: Unknown group _IN_PROGRESS Local: Operation in progress _PREV_IN_PROGRESS Local: Previous operation in progress _EXISTING_SUBSCRIPTION Local: Existing subscription _ASSIGN_PARTITIONS Local: Assign partitions _REVOKE_PARTITIONS Local: Revoke partitions _CONFLICT Local: Conflicting use _STATE Local: Erroneous state _UNKNOWN_PROTOCOL Local: Unknown protocol _NOT_IMPLEMENTED Local: Not implemented _AUTHENTICATION Local: Authentication failure _NO_OFFSET Local: No offset stored _OUTDATED Local: Outdated _TIMED_OUT_QUEUE Local: Timed out in queue _UNSUPPORTED_FEATURE Local: Required feature not supported by broker _WAIT_CACHE Local: Awaiting cache update _INTR Local: Operation interrupted _KEY_SERIALIZATION Local: Key serialization error _VALUE_SERIALIZATION Local: Value serialization error _KEY_DESERIALIZATION Local: Key deserialization error _VALUE_DESERIALIZATION Local: Value deserialization error _PARTIAL Local: Partial response _READ_ONLY Local: Read-only object _NOENT Local: No such entry _UNDERFLOW Local: Read underflow _INVALID_TYPE Local: Invalid type _RETRY Local: Retry operation _PURGE_QUEUE Local: Purged in queue _PURGE_INFLIGHT Local: Purged in flight _FATAL Local: Fatal error _INCONSISTENT Local: Inconsistent state _GAPLESS_GUARANTEE Local: Gap-less ordering would not be guaranteed if proceeding _MAX_POLL_EXCEEDED Local: Maximum application poll interval (max.poll.interval.ms) exceeded _UNKNOWN_BROKER Local: Unknown broker _NOT_CONFIGURED Local: Functionality not configured _FENCED Local: This instance has been fenced by a newer instance _APPLICATION Local: Application generated error _ASSIGNMENT_LOST Local: Group partition assignment lost _NOOP Local: No operation performed _AUTO_OFFSET_RESET Local: No offset to automatically reset to UNKNOWN Unknown broker error NO_ERROR Success OFFSET_OUT_OF_RANGE Broker: Offset out of range INVALID_MSG Broker: Invalid message UNKNOWN_TOPIC_OR_PART Broker: Unknown topic or partition INVALID_MSG_SIZE Broker: Invalid message size LEADER_NOT_AVAILABLE Broker: Leader not available NOT_LEADER_FOR_PARTITION Broker: Not leader for partition REQUEST_TIMED_OUT Broker: Request timed out BROKER_NOT_AVAILABLE Broker: Broker not available REPLICA_NOT_AVAILABLE Broker: Replica not available MSG_SIZE_TOO_LARGE Broker: Message size too large STALE_CTRL_EPOCH Broker: StaleControllerEpochCode OFFSET_METADATA_TOO_LARGE Broker: Offset metadata string too large NETWORK_EXCEPTION Broker: Broker disconnected before response received COORDINATOR_LOAD_IN_PROGRESS Broker: Coordinator load in progress COORDINATOR_NOT_AVAILABLE Broker: Coordinator not available NOT_COORDINATOR Broker: Not coordinator TOPIC_EXCEPTION Broker: Invalid topic RECORD_LIST_TOO_LARGE Broker: Message batch larger than configured server segment size NOT_ENOUGH_REPLICAS Broker: Not enough in-sync replicas NOT_ENOUGH_REPLICAS_AFTER_APPEND Broker: Message(s) written to insufficient number of in-sync replicas INVALID_REQUIRED_ACKS Broker: Invalid required acks value ILLEGAL_GENERATION Broker: Specified group generation id is not valid INCONSISTENT_GROUP_PROTOCOL Broker: Inconsistent group protocol INVALID_GROUP_ID Broker: Invalid group.id UNKNOWN_MEMBER_ID Broker: Unknown member INVALID_SESSION_TIMEOUT Broker: Invalid session timeout REBALANCE_IN_PROGRESS Broker: Group rebalance in progress INVALID_COMMIT_OFFSET_SIZE Broker: Commit offset data size is not valid TOPIC_AUTHORIZATION_FAILED Broker: Topic authorization failed GROUP_AUTHORIZATION_FAILED Broker: Group authorization failed CLUSTER_AUTHORIZATION_FAILED Broker: Cluster authorization failed INVALID_TIMESTAMP Broker: Invalid timestamp UNSUPPORTED_SASL_MECHANISM Broker: Unsupported SASL mechanism ILLEGAL_SASL_STATE Broker: Request not valid in current SASL state UNSUPPORTED_VERSION Broker: API version not supported TOPIC_ALREADY_EXISTS Broker: Topic already exists INVALID_PARTITIONS Broker: Invalid number of partitions INVALID_REPLICATION_FACTOR Broker: Invalid replication factor INVALID_REPLICA_ASSIGNMENT Broker: Invalid replica assignment INVALID_CONFIG Broker: Configuration is invalid NOT_CONTROLLER Broker: Not controller for cluster INVALID_REQUEST Broker: Invalid request UNSUPPORTED_FOR_MESSAGE_FORMAT Broker: Message format on broker does not support request POLICY_VIOLATION Broker: Policy violation OUT_OF_ORDER_SEQUENCE_NUMBER Broker: Broker received an out of order sequence number DUPLICATE_SEQUENCE_NUMBER Broker: Broker received a duplicate sequence number INVALID_PRODUCER_EPOCH Broker: Producer attempted an operation with an old epoch INVALID_TXN_STATE Broker: Producer attempted a transactional operation in an invalid state INVALID_PRODUCER_ID_MAPPING Broker: Producer attempted to use a producer id which is not currently assigned to its transactional INVALID_TRANSACTION_TIMEOUT Broker: Transaction timeout is larger than the maximum value allowed by the broker’s max.transaction CONCURRENT_TRANSACTIONS Broker: Producer attempted to update a transaction while another concurrent operation on the same tr TRANSACTION_COORDINATOR_FENCED Broker: Indicates that the transaction coordinator sending a WriteTxnMarker is no longer the current TRANSACTIONAL_ID_AUTHORIZATION_FAILED Broker: Transactional Id authorization failed SECURITY_DISABLED Broker: Security features are disabled OPERATION_NOT_ATTEMPTED Broker: Operation not attempted KAFKA_STORAGE_ERROR Broker: Disk error when trying to access log file on disk LOG_DIR_NOT_FOUND Broker: The user-specified log directory is not found in the broker config SASL_AUTHENTICATION_FAILED Broker: SASL Authentication failed UNKNOWN_PRODUCER_ID Broker: Unknown Producer Id REASSIGNMENT_IN_PROGRESS Broker: Partition reassignment is in progress DELEGATION_TOKEN_AUTH_DISABLED Broker: Delegation Token feature is not enabled DELEGATION_TOKEN_NOT_FOUND Broker: Delegation Token is not found on server DELEGATION_TOKEN_OWNER_MISMATCH Broker: Specified Principal is not valid Owner/Renewer DELEGATION_TOKEN_REQUEST_NOT_ALLOWED Broker: Delegation Token requests are not allowed on this connection DELEGATION_TOKEN_AUTHORIZATION_FAILED Broker: Delegation Token authorization failed DELEGATION_TOKEN_EXPIRED Broker: Delegation Token is expired INVALID_PRINCIPAL_TYPE Broker: Supplied principalType is not supported NON_EMPTY_GROUP Broker: The group is not empty GROUP_ID_NOT_FOUND Broker: The group id does not exist FETCH_SESSION_ID_NOT_FOUND Broker: The fetch session ID was not found INVALID_FETCH_SESSION_EPOCH Broker: The fetch session epoch is invalid LISTENER_NOT_FOUND Broker: No matching listener TOPIC_DELETION_DISABLED Broker: Topic deletion is disabled FENCED_LEADER_EPOCH Broker: Leader epoch is older than broker epoch UNKNOWN_LEADER_EPOCH Broker: Leader epoch is newer than broker epoch UNSUPPORTED_COMPRESSION_TYPE Broker: Unsupported compression type STALE_BROKER_EPOCH Broker: Broker epoch has changed OFFSET_NOT_AVAILABLE Broker: Leader high watermark is not caught up MEMBER_ID_REQUIRED Broker: Group member needs a valid member ID PREFERRED_LEADER_NOT_AVAILABLE Broker: Preferred leader was not available GROUP_MAX_SIZE_REACHED Broker: Consumer group has reached maximum size FENCED_INSTANCE_ID Broker: Static consumer fenced by other consumer with same group.instance.id ELIGIBLE_LEADERS_NOT_AVAILABLE Broker: Eligible partition leaders are not available ELECTION_NOT_NEEDED Broker: Leader election not needed for topic partition NO_REASSIGNMENT_IN_PROGRESS Broker: No partition reassignment is in progress GROUP_SUBSCRIBED_TO_TOPIC Broker: Deleting offsets of a topic while the consumer group is subscribed to it INVALID_RECORD Broker: Broker failed to validate record UNSTABLE_OFFSET_COMMIT Broker: There are unstable offsets that need to be cleared THROTTLING_QUOTA_EXCEEDED Broker: Throttling quota has been exceeded PRODUCER_FENCED Broker: There is a newer producer with the same transactionalId which fences the current one RESOURCE_NOT_FOUND Broker: Request illegally referred to resource that does not exist DUPLICATE_RESOURCE Broker: Request illegally referred to the same resource twice UNACCEPTABLE_CREDENTIAL Broker: Requested credential would not meet criteria for acceptability INCONSISTENT_VOTER_SET Broker: Indicates that the either the sender or recipient of a voter-only request is not one of the INVALID_UPDATE_VERSION Broker: Invalid update version FEATURE_UPDATE_FAILED Broker: Unable to update finalized features due to server error PRINCIPAL_DESERIALIZATION_FAILURE Broker: Request principal deserialization failed during forwarding -
code
()¶ Returns the error/event code for comparison toKafkaError.<ERR_CONSTANTS>.
Returns: error/event code Return type: int
-
fatal
()¶ Returns: True if this a fatal error, else False. Return type: bool
-
name
()¶ Returns the enum name for error/event.
Returns: error/event enum name string Return type: str
-
retriable
()¶ Returns: True if the operation that failed may be retried, else False. Return type: bool
-
str
()¶ Returns the human-readable error/event string.
Returns: error/event message string Return type: str
-
txn_requires_abort
()¶ Returns: True if the error is an abortable transaction error in which case application must abort the current transaction with abort_transaction() and start a new transaction with begin_transaction() if it wishes to proceed with transactional operations. This will only return true for errors from the transactional producer API. Return type: bool
KafkaException¶
-
class
confluent_kafka.
KafkaException
¶ Kafka exception that wraps the
KafkaError
class.Use
exception.args[0]
to extract theKafkaError
object
ConsumeError¶
-
class
confluent_kafka.error.
ConsumeError
(kafka_error, exception=None, kafka_message=None)[source]¶ Wraps all errors encountered during the consumption of a message.
Note
In the event of a serialization error the original message contents may be retrieved from the
kafka_message
attribute.Parameters: - kafka_error (KafkaError) – KafkaError instance.
- exception (Exception, optional) – The original exception
- kafka_message (Message, optional) – The Kafka Message
- by the broker. (returned) –
ProduceError¶
-
class
confluent_kafka.error.
ProduceError
(kafka_error, exception=None)[source]¶ Wraps all errors encountered when Producing messages.
Parameters: - kafka_error (KafkaError) – KafkaError instance.
- exception (Exception, optional) – The original exception.
SerializationError¶
KeySerializationError¶
ValueSerializationError¶
KeyDeserializationError¶
-
class
confluent_kafka.error.
KeyDeserializationError
(exception=None, kafka_message=None)[source]¶ Wraps all errors encountered during the deserialization of a Kafka Message’s key.
Parameters: - exception (Exception, optional) – The original exception
- kafka_message (Message, optional) – The Kafka Message returned
- the broker. (by) –
ValueDeserializationError¶
-
class
confluent_kafka.error.
ValueDeserializationError
(exception=None, kafka_message=None)[source]¶ Wraps all errors encountered during the deserialization of a Kafka Message’s value.
Parameters: - exception (Exception, optional) – The original exception
- kafka_message (Message, optional) – The Kafka Message returned
- the broker. (by) –
Offset¶
Logical offset constants:
OFFSET_BEGINNING
- Beginning of partition (oldest offset)OFFSET_END
- End of partition (next offset)OFFSET_STORED
- Use stored/committed offsetOFFSET_INVALID
- Invalid/Default offset
ThrottleEvent¶
-
class
confluent_kafka.
ThrottleEvent
(broker_name, broker_id, throttle_time)[source]¶ ThrottleEvent contains details about a throttled request. Set up a throttle callback by setting the
throttle_cb
configuration property to a callable that takes a ThrottleEvent object as its only argument. The callback will be triggered from poll(), consume() or flush() when a request has been throttled by the broker.This class is typically not user instantiated.
Variables: - broker_name (str) – The hostname of the broker which throttled the request
- broker_id (int) – The broker id
- throttle_time (float) – The amount of time (in seconds) the broker throttled (delayed) the request