confluent_kafka API¶
A reliable, performant and feature-rich Python client for Apache Kafka v0.8 and above.
- Guides
- Client API
- Serialization API
Avro serializer / deserializer
JSON Schema serializer / deserializer
Protobuf serializer / deserializer
String serializer / deserializer
Integer serializer / deserializer
Double serializer / deserializer
- Supporting classes
- Admin API
- Experimental
These classes are experimental and are likely to be removed, or subject to incompatible API changes in future versions of the library. To avoid breaking changes on upgrading, we recommend using (de)serializers directly, as per the examples applications in the github repo.
- Legacy
These classes are deprecated and will be removed in a future version of the library.
Kafka Clients¶
AdminClient¶
Kafka admin client: create, view, alter, and delete topics and resources.
- class confluent_kafka.admin.AdminClient(conf, **kwargs)[source]
AdminClient provides admin operations for Kafka brokers, topics, groups, and other resource types supported by the broker.
The Admin API methods are asynchronous and return a dict of concurrent.futures.Future objects keyed by the entity. The entity is a topic name for create_topics(), delete_topics(), create_partitions(), and a ConfigResource for alter_configs() and describe_configs().
All the futures for a single API call will currently finish/fail at the same time (backed by the same protocol request), but this might change in future versions of the client.
See examples/adminapi.py for example usage.
For more information see the Java Admin API documentation.
Requires broker version v0.11.0.0 or later.
- create_topics(new_topics, **kwargs)[source]
Create one or more new topics.
- Parameters
new_topics (list(NewTopic)) – A list of specifictions (NewTopic) for the topics that should be created.
operation_timeout (float) – The operation timeout in seconds, controlling how long the CreateTopics request will block on the broker waiting for the topic creation to propagate in the cluster. A value of 0 returns immediately. Default: socket.timeout.ms/1000.0
request_timeout (float) – The overall request timeout in seconds, including broker lookup, request transmission, operation time on broker, and response. Default: socket.timeout.ms/1000.0
validate_only (bool) – If true, the request is only validated without creating the topic. Default: False
- Returns
A dict of futures for each topic, keyed by the topic name. The future result() method returns None.
- Return type
dict(<topic_name, future>)
- Raises
KafkaException – Operation failed locally or on broker.
TypeException – Invalid input.
ValueException – Invalid input.
- delete_topics(topics, **kwargs)[source]
Delete one or more topics.
- Parameters
topics (list(str)) – A list of topics to mark for deletion.
operation_timeout (float) – The operation timeout in seconds, controlling how long the DeleteTopics request will block on the broker waiting for the topic deletion to propagate in the cluster. A value of 0 returns immediately. Default: socket.timeout.ms/1000.0
request_timeout (float) – The overall request timeout in seconds, including broker lookup, request transmission, operation time on broker, and response. Default: socket.timeout.ms/1000.0
- Returns
A dict of futures for each topic, keyed by the topic name. The future result() method returns None.
- Return type
dict(<topic_name, future>)
- Raises
KafkaException – Operation failed locally or on broker.
TypeException – Invalid input.
ValueException – Invalid input.
- list_topics(*args, **kwargs)[source]
-
Request metadata from the cluster. This method provides the same information as listTopics(), describeTopics() and describeCluster() in the Java Admin client.
- Parameters
topic (str) – If specified, only request information about this topic, else return results for all topics in cluster. Warning: If auto.create.topics.enable is set to true on the broker and an unknown topic is specified, it will be created.
timeout (float) – The maximum response time before timing out, or -1 for infinite timeout.
- Return type
- Raises
KafkaException
- list_groups(*args, **kwargs)[source]
Deprecated since version 2.0.2: Use
list_consumer_groups()
and describe_consumer_groups instead.- list_groups([group=None][, timeout=-1])[source]¶
Request Group Metadata from cluster. This method provides the same information as listGroups(), describeGroups() in the Java Admin client.
- Parameters
group (str) – If specified, only request info about this group, else return for all groups in cluster :param float timeout: Maximum response time before timing out, or -1 for infinite timeout.
- Return type
- Raises
KafkaException
- create_partitions(new_partitions, **kwargs)[source]
Create additional partitions for the given topics.
- Parameters
new_partitions (list(NewPartitions)) – New partitions to be created.
operation_timeout (float) – The operation timeout in seconds, controlling how long the CreatePartitions request will block on the broker waiting for the partition creation to propagate in the cluster. A value of 0 returns immediately. Default: socket.timeout.ms/1000.0
request_timeout (float) – The overall request timeout in seconds, including broker lookup, request transmission, operation time on broker, and response. Default: socket.timeout.ms/1000.0
validate_only (bool) – If true, the request is only validated without creating the partitions. Default: False
- Returns
A dict of futures for each topic, keyed by the topic name. The future result() method returns None.
- Return type
dict(<topic_name, future>)
- Raises
KafkaException – Operation failed locally or on broker.
TypeException – Invalid input.
ValueException – Invalid input.
- describe_configs(resources, **kwargs)[source]
Get the configuration of the specified resources.
- Warning
Multiple resources and resource types may be requested, but at most one resource of type RESOURCE_BROKER is allowed per call since these resource requests must be sent to the broker specified in the resource.
- Parameters
resources (list(ConfigResource)) – Resources to get the configuration for.
request_timeout (float) – The overall request timeout in seconds, including broker lookup, request transmission, operation time on broker, and response. Default: socket.timeout.ms/1000.0
- Returns
A dict of futures for each resource, keyed by the ConfigResource. The type of the value returned by the future result() method is dict(<configname, ConfigEntry>).
- Return type
dict(<ConfigResource, future>)
- Raises
KafkaException – Operation failed locally or on broker.
TypeException – Invalid input.
ValueException – Invalid input.
- alter_configs(resources, **kwargs)[source]
Deprecated since version 2.2.0.
Update configuration properties for the specified resources. Updates are not transactional so they may succeed for a subset of the provided resources while the others fail. The configuration for a particular resource is updated atomically, replacing the specified values while reverting unspecified configuration entries to their default values.
- Warning
alter_configs() will replace all existing configuration for the provided resources with the new configuration given, reverting all other configuration for the resource back to their default values.
- Warning
Multiple resources and resource types may be specified, but at most one resource of type RESOURCE_BROKER is allowed per call since these resource requests must be sent to the broker specified in the resource.
- Parameters
resources (list(ConfigResource)) – Resources to update configuration of.
request_timeout (float) – The overall request timeout in seconds, including broker lookup, request transmission, operation time on broker, and response. Default: socket.timeout.ms/1000.0.
validate_only (bool) – If true, the request is validated only, without altering the configuration. Default: False
- Returns
A dict of futures for each resource, keyed by the ConfigResource. The future result() method returns None or throws a KafkaException.
- Return type
dict(<ConfigResource, future>)
- Raises
KafkaException – Operation failed locally or on broker.
TypeError – Invalid type.
ValueError – Invalid value.
- incremental_alter_configs(resources, **kwargs)[source]
Update configuration properties for the specified resources. Updates are incremental, i.e only the values mentioned are changed and rest remain as is.
- Parameters
resources (list(ConfigResource)) – Resources to update configuration of.
request_timeout (float) – The overall request timeout in seconds, including broker lookup, request transmission, operation time on broker, and response. Default: socket.timeout.ms/1000.0.
validate_only (bool) – If true, the request is validated only, without altering the configuration. Default: False
broker (int) – Broker id to send the request to. When altering broker configurations, it’s ignored because the request needs to go to that broker only. Default: controller broker.
- Returns
A dict of futures for each resource, keyed by the ConfigResource. The future result() method returns None or throws a KafkaException.
- Return type
dict(<ConfigResource, future>)
- Raises
KafkaException – Operation failed locally or on broker.
TypeError – Invalid type.
ValueError – Invalid value.
- create_acls(acls, **kwargs)[source]
Create one or more ACL bindings.
- Parameters
acls (list(AclBinding)) – A list of unique ACL binding specifications (
AclBinding
) to create.request_timeout (float) – The overall request timeout in seconds, including broker lookup, request transmission, operation time on broker, and response. Default: socket.timeout.ms/1000.0
- Returns
A dict of futures for each ACL binding, keyed by the
AclBinding
object. The future result() method returns None on success.- Return type
dict[AclBinding, future]
- Raises
KafkaException – Operation failed locally or on broker.
TypeException – Invalid input.
ValueException – Invalid input.
- describe_acls(acl_binding_filter, **kwargs)[source]
Match ACL bindings by filter.
- Parameters
acl_binding_filter (AclBindingFilter) – a filter with attributes that must match. String attributes match exact values or any string if set to None. Enums attributes match exact values or any value if equal to ANY. If
ResourcePatternType
is set toResourcePatternType.MATCH
returns ACL bindings with:ResourcePatternType.LITERAL
pattern type with resource name equal to the given resource name;ResourcePatternType.LITERAL
pattern type with wildcard resource name that matches the given resource name;ResourcePatternType.PREFIXED
pattern type with resource name that is a prefix of the given resource namerequest_timeout (float) – The overall request timeout in seconds, including broker lookup, request transmission, operation time on broker, and response. Default: socket.timeout.ms/1000.0
- Returns
A future returning a list(
AclBinding
) as result- Return type
future
- Raises
KafkaException – Operation failed locally or on broker.
TypeException – Invalid input.
ValueException – Invalid input.
- delete_acls(acl_binding_filters, **kwargs)[source]
Delete ACL bindings matching one or more ACL binding filters.
- Parameters
acl_binding_filters (list(AclBindingFilter)) – a list of unique ACL binding filters to match ACLs to delete. String attributes match exact values or any string if set to None. Enums attributes match exact values or any value if equal to ANY. If
ResourcePatternType
is set toResourcePatternType.MATCH
deletes ACL bindings with:ResourcePatternType.LITERAL
pattern type with resource name equal to the given resource name;ResourcePatternType.LITERAL
pattern type with wildcard resource name that matches the given resource name;ResourcePatternType.PREFIXED
pattern type with resource name that is a prefix of the given resource namerequest_timeout (float) – The overall request timeout in seconds, including broker lookup, request transmission, operation time on broker, and response. Default: socket.timeout.ms/1000.0
- Returns
A dict of futures for each ACL binding filter, keyed by the
AclBindingFilter
object. The future result() method returns a list ofAclBinding
.- Return type
dict[AclBindingFilter, future]
- Raises
KafkaException – Operation failed locally or on broker.
TypeException – Invalid input.
ValueException – Invalid input.
- list_consumer_groups(**kwargs)[source]
List consumer groups.
- Parameters
request_timeout (float) – The overall request timeout in seconds, including broker lookup, request transmission, operation time on broker, and response. Default: socket.timeout.ms/1000.0
states (set(ConsumerGroupState)) – only list consumer groups which are currently in these states.
types (set(ConsumerGroupType)) – only list consumer groups of these types.
- Returns
a future. Result method of the future returns
ListConsumerGroupsResult
.- Return type
future
- Raises
KafkaException – Operation failed locally or on broker.
TypeException – Invalid input.
ValueException – Invalid input.
- describe_consumer_groups(group_ids, **kwargs)[source]
Describe consumer groups.
- Parameters
group_ids (list(str)) – List of group_ids which need to be described.
include_authorized_operations (bool) – If True, fetches group AclOperations. Default: False
request_timeout (float) – The overall request timeout in seconds, including broker lookup, request transmission, operation time on broker, and response. Default: socket.timeout.ms/1000.0
- Returns
A dict of futures for each group, keyed by the group_id. The future result() method returns
ConsumerGroupDescription
.- Return type
dict[str, future]
- Raises
KafkaException – Operation failed locally or on broker.
TypeException – Invalid input.
ValueException – Invalid input.
- describe_topics(topics, **kwargs)[source]
Describe topics.
- Parameters
topics (TopicCollection) – Collection of list of topic names to describe.
include_authorized_operations (bool) – If True, fetches topic AclOperations. Default: False
request_timeout (float) – The overall request timeout in seconds, including broker lookup, request transmission, operation time on broker, and response. Default: socket.timeout.ms/1000.0
- Returns
A dict of futures for each topic, keyed by the topic. The future result() method returns
TopicDescription
.- Return type
dict[str, future]
- Raises
KafkaException – Operation failed locally or on broker.
TypeError – Invalid input type.
ValueError – Invalid input value.
- describe_cluster(**kwargs)[source]
Describe cluster.
- Parameters
include_authorized_operations (bool) – If True, fetches topic AclOperations. Default: False
request_timeout (float) – The overall request timeout in seconds, including broker lookup, request transmission, operation time on broker, and response. Default: socket.timeout.ms/1000.0
- Returns
A future returning description of the cluster as result
- Return type
future containing the description of the cluster in result.
- Raises
KafkaException – Operation failed locally or on broker.
TypeError – Invalid input type.
ValueError – Invalid input value.
- delete_consumer_groups(group_ids, **kwargs)[source]
Delete the given consumer groups.
- Parameters
group_ids (list(str)) – List of group_ids which need to be deleted.
request_timeout (float) – The overall request timeout in seconds, including broker lookup, request transmission, operation time on broker, and response. Default: socket.timeout.ms/1000.0
- Returns
A dict of futures for each group, keyed by the group_id. The future result() method returns None.
- Return type
dict[str, future]
- Raises
KafkaException – Operation failed locally or on broker.
TypeError – Invalid input type.
ValueError – Invalid input value.
- list_consumer_group_offsets(list_consumer_group_offsets_request, **kwargs)[source]
List offset information for the consumer group and (optional) topic partition provided in the request.
- Note
Currently, the API supports only a single group.
- Parameters
list_consumer_group_offsets_request (list(ConsumerGroupTopicPartitions)) – List of
ConsumerGroupTopicPartitions
which consist of group name and topic partition information for which offset detail is expected. If only group name is provided, then offset information of all the topic and partition associated with that group is returned.require_stable (bool) – If True, fetches stable offsets. Default: False
request_timeout (float) – The overall request timeout in seconds, including broker lookup, request transmission, operation time on broker, and response. Default: socket.timeout.ms/1000.0
- Returns
A dict of futures for each group, keyed by the group id. The future result() method returns
ConsumerGroupTopicPartitions
.- Return type
dict[str, future]
- Raises
KafkaException – Operation failed locally or on broker.
TypeException – Invalid input.
ValueException – Invalid input.
- alter_consumer_group_offsets(alter_consumer_group_offsets_request, **kwargs)[source]
Alter offset for the consumer group and topic partition provided in the request.
- Note
Currently, the API supports only a single group.
- Parameters
alter_consumer_group_offsets_request (list(ConsumerGroupTopicPartitions)) – List of
ConsumerGroupTopicPartitions
which consist of group name and topic partition; and corresponding offset to be updated.request_timeout (float) – The overall request timeout in seconds, including broker lookup, request transmission, operation time on broker, and response. Default: socket.timeout.ms/1000.0
- Returns
A dict of futures for each group, keyed by the group id. The future result() method returns
ConsumerGroupTopicPartitions
.- Return type
dict[ConsumerGroupTopicPartitions, future]
- Raises
KafkaException – Operation failed locally or on broker.
TypeException – Invalid input.
ValueException – Invalid input.
- set_sasl_credentials(username, password)[source]
Sets the SASL credentials used for this client. These credentials will overwrite the old ones, and will be used the next time the client needs to authenticate. This method will not disconnect existing broker connections that have been established with the old credentials. This method is applicable only to SASL PLAIN and SCRAM mechanisms.
- Parameters
username (str) – The username to set.
password (str) – The password to set.
- Return type
None
- Raises
KafkaException – Operation failed locally or on broker.
TypeException – Invalid input.
- describe_user_scram_credentials(users=None, **kwargs)[source]
Describe user SASL/SCRAM credentials.
- Parameters
users (list(str)) – List of user names to describe. Duplicate users aren’t allowed. Can be None to describe all user’s credentials.
request_timeout (float) – The overall request timeout in seconds, including broker lookup, request transmission, operation time on broker, and response. Default: socket.timeout.ms/1000.0
- Returns
In case None is passed it returns a single future. The future yields a dict[str, UserScramCredentialsDescription] or raises a KafkaException
In case a list of user names is passed, it returns a dict[str, future[UserScramCredentialsDescription]]. The futures yield a
UserScramCredentialsDescription
or raise a KafkaException- Return type
Union[future[dict[str, UserScramCredentialsDescription]], dict[str, future[UserScramCredentialsDescription]]]
- Raises
TypeError – Invalid input type.
ValueError – Invalid input value.
- alter_user_scram_credentials(alterations, **kwargs)[source]
Alter user SASL/SCRAM credentials.
- Parameters
alterations (list(UserScramCredentialAlteration)) – List of
UserScramCredentialAlteration
to apply. The pair (user, mechanism) must be unique among alterations.request_timeout (float) – The overall request timeout in seconds, including broker lookup, request transmission, operation time on broker, and response. Default: socket.timeout.ms/1000.0
- Returns
A dict of futures keyed by user name. The future result() method returns None or raises KafkaException
- Return type
dict[str, future]
- Raises
TypeError – Invalid input type.
ValueError – Invalid input value.
- list_offsets(topic_partition_offsets, **kwargs)[source]
Enables to find the beginning offset, end offset as well as the offset matching a timestamp or the offset with max timestamp in partitions.
- Parameters
OffsetSpec]) topic_partition_offsets (dict([TopicPartition,) – Dictionary of TopicPartition objects associated with the corresponding OffsetSpec to query for.
isolation_level (IsolationLevel) – The isolation level to use when querying.
request_timeout (float) – The overall request timeout in seconds, including broker lookup, request transmission, operation time on broker, and response. Default: socket.timeout.ms/1000.0
- Returns
A dict of futures keyed by TopicPartition. The future result() method returns ListOffsetsResultInfo raises KafkaException
- Return type
dict[TopicPartition, future]
- Raises
TypeError – Invalid input type.
ValueError – Invalid input value.
- delete_records(topic_partition_offsets, **kwargs)[source]
Deletes all the records before the specified offsets (not including), in the specified topics and partitions.
- Parameters
topic_partition_offsets (list(TopicPartition)) – A list of
TopicPartition
objects having offset field set to the offset before which all the records should be deleted. offset can be set toOFFSET_END
(-1) to delete all records in the partition.request_timeout (float) – The overall request timeout in seconds, including broker lookup, request transmission, operation time on broker, and response. Default: socket.timeout.ms/1000.0
operation_timeout (float) – The operation timeout in seconds, controlling how long the delete_records request will block on the broker waiting for the record deletion to propagate in the cluster. A value of 0 returns immediately. Default: socket.timeout.ms/1000.0
- Returns
A dict of futures keyed by the
TopicPartition
. The future result() method returnsDeletedRecords
or raisesKafkaException
- Return type
dict[TopicPartition, future]
- Raises
KafkaException – Operation failed locally or on broker.
TypeError – Invalid input type.
ValueError – Invalid input value.
- elect_leaders(election_type, partitions=None, **kwargs)[source]
Perform Preferred or Unclean leader election for all the specified partitions or all partitions in the cluster.
- Parameters
election_type (ElectionType) – The type of election to perform.
partitions (List[TopicPartition]|None) – The topic partitions to perform the election on. Use
None
to perform on all the topic partitions.request_timeout (float) – The overall request timeout in seconds, including broker lookup, request transmission, operation time on broker, and response. Default: socket.timeout.ms*1000.0
operation_timeout (float) – The operation timeout in seconds, controlling how long the ‘elect_leaders’ request will block on the broker waiting for the election to propagate in the cluster. A value of 0 returns immediately. Default: socket.timeout.ms/1000.0
- Returns
A future. Method result() of the future returns dict[TopicPartition, KafkaException|None].
- Return type
future
- Raises
KafkaException – Operation failed locally or on broker.
TypeError – Invalid input type.
ValueError – Invalid input value.
NewTopic¶
- class confluent_kafka.admin.NewTopic¶
NewTopic specifies per-topic settings for passing to AdminClient.create_topics().
- NewTopic(topic[, num_partitions][, replication_factor][, replica_assignment][, config])¶
Instantiate a NewTopic object.
- Parameters
topic (string) – Topic name
num_partitions (int) – Number of partitions to create, or -1 if replica_assignment is used.
replication_factor (int) – Replication factor of partitions, or -1 if replica_assignment is used.
replica_assignment (list) – List of lists with the replication assignment for each new partition.
config (dict) – Dict (str:str) of topic configuration. See http://kafka.apache.org/documentation.html#topicconfigs
- Return type
- config¶
attribute: Optional topic configuration. See http://kafka.apache.org/documentation.html#topicconfigs.
- Type
py
- num_partitions¶
attribute: Number of partitions (int). Or -1 if a replica_assignment is specified
- Type
py
- replica_assignment¶
attribute: Replication assignment (list of lists). The outer list index represents the partition index, the inner list is the replica assignment (broker ids) for that partition. replication_factor and replica_assignment are mutually exclusive.
- Type
py
- replication_factor¶
attribute: Replication factor (int). Must be set to -1 if a replica_assignment is specified.
- Type
py
- topic¶
attribute:topic - Topic name (string)
- Type
py
NewPartitions¶
- class confluent_kafka.admin.NewPartitions¶
NewPartitions specifies per-topic settings for passing to passed to AdminClient.create_partitions().
- NewPartitions(topic, new_total_count[, replica_assignment])¶
Instantiate a NewPartitions object.
- Parameters
topic (string) – Topic name
new_total_count (int) – Increase the topic’s partition count to this value.
replica_assignment (list) – List of lists with the replication assignment for each new partition.
- Return type
- new_total_count¶
attribute: Total number of partitions (int)
- Type
py
- replica_assignment¶
attribute: Replication assignment (list of lists). The outer list index represents the partition index, the inner list is the replica assignment (broker ids) for that partition.
- Type
py
- topic¶
attribute:topic - Topic name (string)
- Type
py
ConfigSource¶
- class confluent_kafka.admin.ConfigSource(value)[source]¶
Enumerates the different sources of configuration properties. Used by ConfigEntry to specify the source of configuration properties returned by describe_configs().
- UNKNOWN_CONFIG = 0¶
Unknown
- DYNAMIC_TOPIC_CONFIG = 1¶
Dynamic Topic
- DYNAMIC_BROKER_CONFIG = 2¶
Dynamic Broker
- DYNAMIC_DEFAULT_BROKER_CONFIG = 3¶
Dynamic Default Broker
- STATIC_BROKER_CONFIG = 4¶
Static Broker
- DEFAULT_CONFIG = 5¶
Default
ConfigEntry¶
- class confluent_kafka.admin.ConfigEntry(name, value, source=<ConfigSource.UNKNOWN_CONFIG: 0>, is_read_only=False, is_default=False, is_sensitive=False, is_synonym=False, synonyms=[], incremental_operation=None)[source]¶
Represents a configuration property. Returned by describe_configs() for each configuration entry of the specified resource.
This class is typically not user instantiated.
- name¶
Configuration property name.
- value¶
Configuration value (or None if not set or is_sensitive==True. Ignored when altering configurations incrementally if incremental_operation is DELETE).
- source¶
Configuration source.
- is_read_only¶
Indicates whether the configuration property is read-only.
- is_default¶
Indicates whether the configuration property is using its default value.
- is_sensitive¶
Indicates whether the configuration property value contains sensitive information (such as security settings), in which case .value is None.
- is_synonym¶
Indicates whether the configuration property is a synonym for the parent configuration entry.
- synonyms¶
A list of synonyms (ConfigEntry) and alternate sources for this configuration property.
- incremental_operation¶
The incremental operation (AlterConfigOpType) to use in incremental_alter_configs.
ConfigResource¶
- class confluent_kafka.admin.ConfigResource(restype, name, set_config=None, described_configs=None, error=None, incremental_configs=None)[source]¶
Represents a resource that has configuration, and (optionally) a collection of configuration properties for that resource. Used by describe_configs() and alter_configs().
- Parameters
restype (ConfigResource.Type) – The resource type.
name (str) – The resource name, which depends on the resource type. For RESOURCE_BROKER, the resource name is the broker id.
set_config (dict) – The configuration to set/overwrite. Dictionary of str, str.
- Type¶
- set_config(name, value, overwrite=True)[source]¶
Set/overwrite a configuration value.
When calling alter_configs, any configuration properties that are not included in the request will be reverted to their default values. As a workaround, use describe_configs() to retrieve the current configuration and overwrite the settings you want to change.
- Parameters
name (str) – Configuration property name
value (str) – Configuration value
overwrite (bool) – If True, overwrite entry if it already exists (default). If False, do nothing if entry already exists.
- add_incremental_config(config_entry)[source]¶
Add a ConfigEntry for incremental alter configs, using the configured incremental_operation.
- Parameters
config_entry (ConfigEntry) – config entry to incrementally alter.
ResourceType¶
- class confluent_kafka.admin.ResourceType(value)[source]¶
Enumerates the different types of Kafka resources.
- UNKNOWN = 0¶
Resource type is not known or not set.
- ANY = 1¶
Match any resource, used for lookups.
- TOPIC = 2¶
Topic resource. Resource name is topic name.
- GROUP = 3¶
Group resource. Resource name is group.id.
- BROKER = 4¶
Broker resource. Resource name is broker id.
- TRANSACTIONAL_ID = 5¶
Transactional ID resource.
ResourcePatternType¶
- class confluent_kafka.admin.ResourcePatternType(value)[source]¶
Enumerates the different types of Kafka resource patterns.
- UNKNOWN = 0¶
Resource pattern type is not known or not set.
- ANY = 1¶
Match any resource, used for lookups.
- MATCH = 2¶
will perform pattern matching
- Type
Match
- LITERAL = 3¶
A literal resource name
- Type
Literal
- PREFIXED = 4¶
A prefixed resource name
- Type
Prefixed
AlterConfigOpType¶
- class confluent_kafka.admin.AlterConfigOpType(value)[source]¶
Set of incremental operations that can be used with incremental alter configs.
- SET = 0¶
Set the value of the configuration entry.
- DELETE = 1¶
Revert the configuration entry to the default value (possibly null).
- APPEND = 2¶
(For list-type configuration entries only.) Add the specified values to the current list of values of the configuration entry.
- SUBTRACT = 3¶
(For list-type configuration entries only.) Removes the specified values from the current list of values of the configuration entry.
AclOperation¶
- class confluent_kafka.admin.AclOperation(value)[source]¶
Enumerates the different types of ACL operation.
- UNKNOWN = 0¶
Unknown
- ANY = 1¶
In a filter, matches any AclOperation
- ALL = 2¶
ALL the operations
- READ = 3¶
READ operation
- WRITE = 4¶
WRITE operation
- CREATE = 5¶
CREATE operation
- DELETE = 6¶
DELETE operation
- ALTER = 7¶
ALTER operation
- DESCRIBE = 8¶
DESCRIBE operation
- CLUSTER_ACTION = 9¶
CLUSTER_ACTION operation
- DESCRIBE_CONFIGS = 10¶
DESCRIBE_CONFIGS operation
- ALTER_CONFIGS = 11¶
ALTER_CONFIGS operation
- IDEMPOTENT_WRITE = 12¶
IDEMPOTENT_WRITE operation
AclPermissionType¶
AclBinding¶
- class confluent_kafka.admin.AclBinding(restype, name, resource_pattern_type, principal, host, operation, permission_type)[source]¶
Represents an ACL binding that specify the operation and permission type for a specific principal over one or more resources of the same type. Used by
AdminClient.create_acls()
, returned byAdminClient.describe_acls()
andAdminClient.delete_acls()
.- Parameters
restype (ResourceType) – The resource type.
name (str) – The resource name, which depends on the resource type. For
ResourceType.BROKER
, the resource name is the broker id.resource_pattern_type (ResourcePatternType) – The resource pattern, relative to the name.
principal (str) – The principal this AclBinding refers to.
host (str) – The host that the call is allowed to come from.
operation (AclOperation) – The operation/s specified by this binding.
permission_type (AclPermissionType) – The permission type for the specified operation.
AclBindingFilter¶
- class confluent_kafka.admin.AclBindingFilter(restype, name, resource_pattern_type, principal, host, operation, permission_type)[source]¶
Represents an ACL binding filter used to return a list of ACL bindings matching some or all of its attributes. Used by
AdminClient.describe_acls()
andAdminClient.delete_acls()
.- Parameters
restype (ResourceType) – The resource type, or
ResourceType.ANY
to match any value.name (str) – The resource name to match. None matches any value.
resource_pattern_type (ResourcePatternType) – The resource pattern,
ResourcePatternType.ANY
to match any value orResourcePatternType.MATCH
to perform pattern matching.principal (str) – The principal to match, or None to match any value.
host (str) – The host to match, or None to match any value.
operation (AclOperation) – The operation to match or
AclOperation.ANY
to match any value.permission_type (AclPermissionType) – The permission type to match or
AclPermissionType.ANY
to match any value.
ScramMechanism¶
ScramCredentialInfo¶
- class confluent_kafka.admin.ScramCredentialInfo(mechanism, iterations)[source]¶
Contains mechanism and iterations for a SASL/SCRAM credential associated with a user.
- Parameters
mechanism (ScramMechanism) – SASL/SCRAM mechanism.
iterations (int) – Positive number of iterations used when creating the credential.
UserScramCredentialsDescription¶
- class confluent_kafka.admin.UserScramCredentialsDescription(user, scram_credential_infos)[source]¶
Represent all SASL/SCRAM credentials associated with a user that can be retrieved.
- Parameters
user (str) – The user name.
scram_credential_infos (list(ScramCredentialInfo)) – SASL/SCRAM credential representations for the user.
UserScramCredentialAlteration¶
UserScramCredentialUpsertion¶
- class confluent_kafka.admin.UserScramCredentialUpsertion(user, scram_credential_info, password, salt=None)[source]¶
A request to update/insert a SASL/SCRAM credential for a user.
- Parameters
user (str) – The user name.
scram_credential_info (ScramCredentialInfo) – The mechanism and iterations.
password (bytes) – Password to HMAC before storage.
salt (bytes) – Salt to use. Will be generated randomly if None. (optional)
UserScramCredentialDeletion¶
- class confluent_kafka.admin.UserScramCredentialDeletion(user, mechanism)[source]¶
A request to delete a SASL/SCRAM credential for a user.
- Parameters
user (str) – The user name.
mechanism (ScramMechanism) – SASL/SCRAM mechanism.
OffsetSpec¶
ListOffsetsResultInfo¶
- class confluent_kafka.admin.ListOffsetsResultInfo(offset, timestamp, leader_epoch)[source]¶
Result of a AdminClient.list_offsets call associated to a partition.
- Parameters
offset (int) – The offset returned by the list_offsets call.
timestamp (int) – The timestamp in milliseconds corresponding to the offset. Not available (-1) when querying for the earliest or the latest offsets.
leader_epoch (int) – The leader epoch corresponding to the offset (optional).
TopicDescription¶
- class confluent_kafka.admin.TopicDescription(name, topic_id, is_internal, partitions, authorized_operations=None)[source]¶
Represents topic description information for a topic used in describe topic operation. Used by
AdminClient.describe_topics()
.- Parameters
name (str) – The topic name.
topic_id (Uuid) – The topic id of the topic
is_internal – Whether the topic is internal or not
partitions (list(TopicPartitionInfo)) – Partition information.
authorized_operations (list(AclOperation)) – AclOperations allowed for the topic.
DescribeClusterResult¶
- class confluent_kafka.admin.DescribeClusterResult(controller, nodes, cluster_id=None, authorized_operations=None)[source]¶
Represents cluster description information used in describe cluster operation. Used by
AdminClient.describe_cluster()
.- Parameters
controller (Node) – The current controller in the cluster.
nodes (list(Node)) – Information about each node in the cluster.
cluster_id (str) – The current cluster id in the cluster.
authorized_operations (list(AclOperation)) – AclOperations allowed for the cluster.
BrokerMetadata¶
ClusterMetadata¶
- class confluent_kafka.admin.ClusterMetadata[source]¶
Provides information about the Kafka cluster, brokers, and topics. Returned by list_topics().
This class is typically not user instantiated.
- cluster_id¶
Cluster id string, if supported by the broker, else None.
- controller_id¶
Current controller broker id, or -1.
- brokers¶
Map of brokers indexed by the broker id (int). Value is a BrokerMetadata object.
- topics¶
Map of topics indexed by the topic name. Value is a TopicMetadata object.
- orig_broker_id¶
The broker this metadata originated from.
- orig_broker_name¶
The broker name/address this metadata originated from.
GroupMember¶
- class confluent_kafka.admin.GroupMember[source]¶
Provides information about a group member.
For more information on the metadata format, refer to: A Guide To The Kafka Protocol.
This class is typically not user instantiated.
- id¶
Member id (generated by broker).
- client_id¶
Client id.
- client_host¶
Client hostname.
- metadata¶
Member metadata(binary), format depends on protocol type.
- assignment¶
Member assignment(binary), format depends on protocol type.
GroupMetadata¶
- class confluent_kafka.admin.GroupMetadata[source]¶
GroupMetadata provides information about a Kafka consumer group
This class is typically not user instantiated.
- broker¶
Originating broker metadata.
- id¶
Group name.
- error¶
Broker-originated error, or None. Value is a KafkaError object.
- state¶
Group state.
- protocol_type¶
Group protocol type.
- protocol¶
Group protocol.
- members¶
Group members.
PartitionMetadata¶
- class confluent_kafka.admin.PartitionMetadata[source]¶
Provides information about a Kafka partition.
This class is typically not user instantiated.
- Warning
Depending on cluster state the broker ids referenced in leader, replicas and ISRs may temporarily not be reported in ClusterMetadata.brokers. Always check the availability of a broker id in the brokers dict.
- id¶
Partition id.
- leader¶
Current leader broker for this partition, or -1.
- replicas¶
List of replica broker ids for this partition.
- isrs¶
List of in-sync-replica broker ids for this partition.
- error¶
Partition error, or None. Value is a KafkaError object.
TopicMetadata¶
- class confluent_kafka.admin.TopicMetadata[source]¶
Provides information about a Kafka topic.
This class is typically not user instantiated.
- topic¶
Topic name
- partitions¶
Map of partitions indexed by partition id. Value is a PartitionMetadata object.
- error¶
Topic error, or None. Value is a KafkaError object.
ConsumerGroupListing¶
- class confluent_kafka.admin.ConsumerGroupListing(group_id, is_simple_consumer_group, state=None, type=None)[source]¶
Represents consumer group listing information for a group used in list consumer group operation. Used by
ListConsumerGroupsResult
.- Parameters
group_id (str) – The consumer group id.
is_simple_consumer_group (bool) – Whether a consumer group is simple or not.
state (ConsumerGroupState) – Current state of the consumer group.
type (ConsumerGroupType) – Type of the consumer group.
ListConsumerGroupsResult¶
- class confluent_kafka.admin.ListConsumerGroupsResult(valid=None, errors=None)[source]¶
Represents result of List Consumer Group operation. Used by
AdminClient.list_consumer_groups()
.- Parameters
valid (list(ConsumerGroupListing)) – List of successful consumer group listing responses.
errors (list(KafkaException)) – List of errors encountered during the operation, if any.
ConsumerGroupDescription¶
- class confluent_kafka.admin.ConsumerGroupDescription(group_id, is_simple_consumer_group, members, partition_assignor, state, coordinator, authorized_operations=None)[source]¶
Represents consumer group description information for a group used in describe consumer group operation. Used by
AdminClient.describe_consumer_groups()
.- Parameters
group_id (str) – The consumer group id.
is_simple_consumer_group (bool) – Whether a consumer group is simple or not.
members (list(MemberDescription)) – Description of the members of the consumer group.
partition_assignor (str) – Partition assignor.
state (ConsumerGroupState) – Current state of the consumer group.
coordinator (Node) – Consumer group coordinator.
authorized_operations (list(AclOperation)) – AclOperations allowed for the consumer group.
DeletedRecords¶
MemberAssignment¶
- class confluent_kafka.admin.MemberAssignment(topic_partitions=[])[source]¶
Represents member assignment information. Used by
MemberDescription
.- Parameters
topic_partitions (list(TopicPartition)) – The topic partitions assigned to a group member.
MemberDescription¶
- class confluent_kafka.admin.MemberDescription(member_id, client_id, host, assignment, group_instance_id=None)[source]¶
Represents member information. Used by
ConsumerGroupDescription
.- Parameters
member_id (str) – The consumer id of the group member.
client_id (str) – The client id of the group member.
host (str) – The host where the group member is running.
assignment (MemberAssignment) – The assignment of the group member
group_instance_id (str) – The instance id of the group member.
Consumer¶
- class confluent_kafka.Consumer
A high-level Apache Kafka consumer
- Consumer(config)¶
Create a new Consumer instance using the provided configuration dict (including properties and callback functions). See Kafka Client Configuration for more information.
- Parameters
config (dict) – Configuration properties. At a minimum,
group.id
must be set andbootstrap.servers
should be set.
- assign()
- assign(partitions)¶
Set the consumer partition assignment to the provided list of
TopicPartition
and start consuming.- Parameters
partitions (list(TopicPartition)) – List of topic+partitions and optionally initial offsets to start consuming from.
- Raises
KafkaException
- Raises
RuntimeError if called on a closed consumer
- assignment()
Returns the current partition assignment.
- Returns
List of assigned topic+partitions.
- Return type
list(TopicPartition)
- Raises
KafkaException
- Raises
RuntimeError if called on a closed consumer
- close()
Close down and terminate the Kafka Consumer.
Actions performed:
Stops consuming.
Commits offsets, unless the consumer property ‘enable.auto.commit’ is set to False.
Leaves the consumer group.
- Return type
None
- commit()
- commit([message=None][, offsets=None][, asynchronous=True])¶
Commit a message or a list of offsets.
The
message
andoffsets
parameters are mutually exclusive. If neither is set, the current partition assignment’s offsets are used instead. Use this method to commit offsets if you have ‘enable.auto.commit’ set to False.- Parameters
message (confluent_kafka.Message) – Commit the message’s offset+1. Note: By convention, committed offsets reflect the next message to be consumed, not the last message consumed.
offsets (list(TopicPartition)) – List of topic+partitions+offsets to commit.
asynchronous (bool) – If true, asynchronously commit, returning None immediately. If False, the commit() call will block until the commit succeeds or fails and the committed offsets will be returned (on success). Note that specific partitions may have failed and the .err field of each partition should be checked for success.
- Return type
None|list(TopicPartition)
- Raises
KafkaException
- Raises
RuntimeError if called on a closed consumer
- committed()
- committed(partitions[, timeout=None])¶
Retrieve committed offsets for the specified partitions.
- Parameters
partitions (list(TopicPartition)) – List of topic+partitions to query for stored offsets.
timeout (float) – Request timeout (seconds).
- Returns
List of topic+partitions with offset and possibly error set.
- Return type
list(TopicPartition)
- Raises
KafkaException
- Raises
RuntimeError if called on a closed consumer
- consume()
- consume([num_messages=1][, timeout=-1])¶
Consumes a list of messages (possibly empty on timeout). Callbacks may be executed as a side effect of calling this method.
The application must check the returned
Message
object’sMessage.error()
method to distinguish between proper messages (error() returns None) and errors for eachMessage
in the list (see error().code() for specifics). If the enable.partition.eof configuration property is set to True, partition EOF events will also be exposed as Messages with error().code() set to _PARTITION_EOF.- Parameters
num_messages (int) – The maximum number of messages to return (default: 1).
timeout (float) – The maximum time to block waiting for message, event or callback (default: infinite (-1)). (Seconds)
- Returns
A list of Message objects (possibly empty on timeout)
- Return type
list(Message)
- Raises
RuntimeError – if called on a closed consumer
KafkaError – in case of internal error
ValueError – if num_messages > 1M
- consumer_group_metadata()
- consumer_group_metadata()¶
- Returns
An opaque object representing the consumer’s current group metadata for passing to the transactional producer’s send_offsets_to_transaction() API.
- get_watermark_offsets()
- get_watermark_offsets(partition[, timeout=None][, cached=False])¶
Retrieve low and high offsets for the specified partition.
- Parameters
partition (TopicPartition) – Topic+partition to return offsets for.
timeout (float) – Request timeout (seconds). Ignored if cached=True.
cached (bool) – Instead of querying the broker, use cached information. Cached values: The low offset is updated periodically (if statistics.interval.ms is set) while the high offset is updated on each message fetched from the broker for this partition.
- Returns
Tuple of (low,high) on success or None on timeout. The high offset is the offset of the last message + 1.
- Return type
tuple(int,int)
- Raises
KafkaException
- Raises
RuntimeError if called on a closed consumer
- incremental_assign()
- incremental_assign(partitions)¶
Incrementally add the provided list of
TopicPartition
to the current partition assignment. This list must not contain duplicate entries, or any entry corresponding to an already assigned partition. When a COOPERATIVE assignor (i.e. incremental rebalancing) is being used, this method may be used in the on_assign callback to update the current assignment and specify start offsets. The application should pass a list of partitions identical to the list passed to the callback, even if the list is empty. Note that if you do not call incremental_assign in your on_assign handler, this will be done automatically and start offsets will be the last committed offsets, or determined via the auto offset reset policy (auto.offset.reset) if there are none. This method may also be used outside the context of a rebalance callback.- Parameters
partitions (list(TopicPartition)) – List of topic+partitions and optionally initial offsets to start consuming from.
- Raises
KafkaException
- Raises
RuntimeError if called on a closed consumer
- incremental_unassign()
- incremental_unassign(partitions)¶
Incrementally remove the provided list of
TopicPartition
from the current partition assignment. This list must not contain dupliate entries and all entries specified must be part of the current assignment. When a COOPERATIVE assignor (i.e. incremental rebalancing) is being used, this method may be used in the on_revoke or on_lost callback to update the current assignment. The application should pass a list of partitions identical to the list passed to the callback. This method may also be used outside the context of a rebalance callback. The value of the TopicPartition offset field is ignored by this method.- Parameters
partitions (list(TopicPartition)) – List of topic+partitions to remove from the current assignment.
- Raises
KafkaException
- Raises
RuntimeError if called on a closed consumer
- list_topics()
- list_topics([topic=None][, timeout=-1])¶
Request metadata from the cluster. This method provides the same information as listTopics(), describeTopics() and describeCluster() in the Java Admin client.
- Parameters
topic (str) – If specified, only request information about this topic, else return results for all topics in cluster. Warning: If auto.create.topics.enable is set to true on the broker and an unknown topic is specified, it will be created.
timeout (float) – The maximum response time before timing out, or -1 for infinite timeout.
- Return type
- Raises
KafkaException
- memberid()
- memberid()¶
Return this client’s broker-assigned group member id.
The member id is assigned by the group coordinator and is propagated to the consumer during rebalance.
- returns
Member id string or None
- rtype
string
- raises
RuntimeError if called on a closed consumer
- offsets_for_times()
- offsets_for_times(partitions[, timeout=None])¶
Look up offsets by timestamp for the specified partitions.
The returned offset for each partition is the earliest offset whose timestamp is greater than or equal to the given timestamp in the corresponding partition. If the provided timestamp exceeds that of the last message in the partition, a value of -1 will be returned.
- param list(TopicPartition) partitions
topic+partitions with timestamps in the TopicPartition.offset field.
- param float timeout
Request timeout (seconds).
- returns
List of topic+partition with offset field set and possibly error set
- rtype
list(TopicPartition)
- raises
KafkaException
- raises
RuntimeError if called on a closed consumer
- pause()
- pause(partitions)¶
Pause consumption for the provided list of partitions.
- Parameters
partitions (list(TopicPartition)) – List of topic+partitions to pause.
- Return type
None
- Raises
KafkaException
- poll()
- poll([timeout=None])¶
Consumes a single message, calls callbacks and returns events.
The application must check the returned
Message
object’sMessage.error()
method to distinguish between proper messages (error() returns None), or an event or error (see error().code() for specifics).- Parameters
timeout (float) – Maximum time to block waiting for message, event or callback (default: infinite (None translated into -1 in the library)). (Seconds)
- Returns
A Message object or None on timeout
- Return type
Message
or None- Raises
RuntimeError if called on a closed consumer
- position()
- position(partitions)¶
Retrieve current positions (offsets) for the specified partitions.
- Parameters
partitions (list(TopicPartition)) – List of topic+partitions to return current offsets for. The current offset is the offset of the last consumed message + 1.
- Returns
List of topic+partitions with offset and possibly error set.
- Return type
list(TopicPartition)
- Raises
KafkaException
- Raises
RuntimeError if called on a closed consumer
- resume()
- resume(partitions)¶
Resume consumption for the provided list of partitions.
- Parameters
partitions (list(TopicPartition)) – List of topic+partitions to resume.
- Return type
None
- Raises
KafkaException
- seek()
- seek(partition)¶
Set consume position for partition to offset. The offset may be an absolute (>=0) or a logical offset (
OFFSET_BEGINNING
et.al).seek() may only be used to update the consume offset of an actively consumed partition (i.e., after
assign()
), to set the starting offset of partition not being consumed instead pass the offset in an assign() call.- Parameters
partition (TopicPartition) – Topic+partition+offset to seek to.
- Raises
KafkaException
- set_sasl_credentials()
- set_sasl_credentials(username, password)¶
Sets the SASL credentials used for this client. These credentials will overwrite the old ones, and will be used the next time the client needs to authenticate. This method will not disconnect existing broker connections that have been established with the old credentials. This method is applicable only to SASL PLAIN and SCRAM mechanisms.
- store_offsets()
- store_offsets([message=None][, offsets=None])¶
Store offsets for a message or a list of offsets.
message
andoffsets
are mutually exclusive. The stored offsets will be committed according to ‘auto.commit.interval.ms’ or manual offset-lesscommit()
. Note that ‘enable.auto.offset.store’ must be set to False when using this API.- Parameters
message (confluent_kafka.Message) – Store message’s offset+1.
offsets (list(TopicPartition)) – List of topic+partitions+offsets to store.
- Return type
None
- Raises
KafkaException
- Raises
RuntimeError if called on a closed consumer
- subscribe()
- subscribe(topics[, on_assign=None][, on_revoke=None][, on_lost=None])¶
Set subscription to supplied list of topics This replaces a previous subscription.
Regexp pattern subscriptions are supported by prefixing the topic string with
"^"
, e.g.:consumer.subscribe(["^my_topic.*", "^another[0-9]-?[a-z]+$", "not_a_regex"])
- Parameters
topics (list(str)) – List of topics (strings) to subscribe to.
on_assign (callable) – callback to provide handling of customized offsets on completion of a successful partition re-assignment.
on_revoke (callable) – callback to provide handling of offset commits to a customized store on the start of a rebalance operation.
on_lost (callable) – callback to provide handling in the case the partition assignment has been lost. If not specified, lost partition events will be delivered to on_revoke, if specified. Partitions that have been lost may already be owned by other members in the group and therefore committing offsets, for example, may fail.
- Raises
- Raises
RuntimeError if called on a closed consumer
- on_assign(consumer, partitions)¶
- on_revoke(consumer, partitions)¶
- on_lost(consumer, partitions)¶
- Parameters
consumer (Consumer) – Consumer instance.
partitions (list(TopicPartition)) – Absolute list of partitions being assigned or revoked.
- unassign()
Removes the current partition assignment and stops consuming.
- Raises
RuntimeError – if called on a closed consumer
- unsubscribe()
Remove current subscription.
- Raises
KafkaException
- Raises
RuntimeError if called on a closed consumer
DeserializingConsumer (experimental)¶
- class confluent_kafka.DeserializingConsumer(conf)[source]¶
A high level Kafka consumer with deserialization capabilities.
This class is experimental and likely to be removed, or subject to incompatible API changes in future versions of the library. To avoid breaking changes on upgrading, we recommend using deserializers directly.
Derived from the
Consumer
class, overriding theConsumer.poll()
method to add deserialization capabilities.Additional configuration properties:
Property Name
Type
Description
key.deserializer
callable
Callable(bytes, SerializationContext) -> obj
Deserializer used for message keys.
value.deserializer
callable
Callable(bytes, SerializationContext) -> obj
Deserializer used for message values.
Deserializers for string, integer and double (
StringDeserializer
,IntegerDeserializer
andDoubleDeserializer
) are supplied out-of-the-box in theconfluent_kafka.serialization
namespace.Deserializers for Protobuf, JSON Schema and Avro (
ProtobufDeserializer
,JSONDeserializer
andAvroDeserializer
) with Confluent Schema Registry integration are supplied out-of-the-box in theconfluent_kafka.schema_registry
namespace.See also
The Configuration Guide for in depth information on how to configure the client.
CONFIGURATION.md for a comprehensive set of configuration properties.
STATISTICS.md for detailed information on the statistics provided by stats_cb
The
Consumer
class for inherited methods.
- Parameters
conf (dict) – DeserializingConsumer configuration.
- Raises
ValueError – if configuration validation fails
- Inherited-members
- poll(timeout=- 1)[source]¶
Consume messages and calls callbacks.
- Parameters
timeout (float) – Maximum time to block waiting for message(Seconds).
- Returns
Message
or None on timeout- Raises
KeyDeserializationError – If an error occurs during key deserialization.
ValueDeserializationError – If an error occurs during value deserialization.
ConsumeError – If an error was encountered while polling.
- consume(num_messages=1, timeout=- 1)[source]¶
Consumer.consume()
not implemented, useDeserializingConsumer.poll()
instead
Producer¶
- class confluent_kafka.Producer
Asynchronous Kafka Producer
- Producer(config)¶
- Parameters
config (dict) – Configuration properties. At a minimum
bootstrap.servers
should be set
Create a new Producer instance using the provided configuration dict.
- __len__(self)¶
Producer implements __len__ that can be used as len(producer) to obtain number of messages waiting. :returns: Number of messages and Kafka protocol requests waiting to be delivered to broker. :rtype: int
- abort_transaction()
- abort_transaction([timeout])¶
Aborts the current transaction. This function should also be used to recover from non-fatal abortable transaction errors when KafkaError.txn_requires_abort() is True.
Any outstanding messages will be purged and fail with _PURGE_INFLIGHT or _PURGE_QUEUE.
Note: This function will block until all outstanding messages are purged and the transaction abort request has been successfully handled by the transaction coordinator, or until the timeout expires, which ever comes first. On timeout the application may call the function again.
Note: Will automatically call purge() and flush() to ensure all queued and in-flight messages are purged before attempting to abort the transaction.
- Parameters
timeout (float) – The maximum amount of time to block waiting for transaction to abort in seconds.
- Raises
KafkaError: Use exc.args[0].retriable() to check if the operation may be retried. Treat any other error as a fatal error.
- begin_transaction()
- begin_transaction()¶
Begin a new transaction.
init_transactions() must have been called successfully (once) before this function is called.
Any messages produced or offsets sent to a transaction, after the successful return of this function will be part of the transaction and committed or aborted atomically.
Complete the transaction by calling commit_transaction() or Abort the transaction by calling abort_transaction().
- Raises
KafkaError: Use exc.args[0].retriable() to check if the operation may be retried, else treat the error as a fatal error.
- commit_transaction()
- commit_transaction([timeout])¶
Commmit the current transaction. Any outstanding messages will be flushed (delivered) before actually committing the transaction.
If any of the outstanding messages fail permanently the current transaction will enter the abortable error state and this function will return an abortable error, in this case the application must call abort_transaction() before attempting a new transaction with begin_transaction().
Note: This function will block until all outstanding messages are delivered and the transaction commit request has been successfully handled by the transaction coordinator, or until the timeout expires, which ever comes first. On timeout the application may call the function again.
Note: Will automatically call flush() to ensure all queued messages are delivered before attempting to commit the transaction. Delivery reports and other callbacks may thus be triggered from this method.
- Parameters
timeout (float) – The amount of time to block in seconds.
- Raises
KafkaError: Use exc.args[0].retriable() to check if the operation may be retried, or exc.args[0].txn_requires_abort() if the current transaction has failed and must be aborted by calling abort_transaction() and then start a new transaction with begin_transaction(). Treat any other error as a fatal error.
- flush()
- flush([timeout])¶
Wait for all messages in the Producer queue to be delivered. This is a convenience method that calls
poll()
untillen()
is zero or the optional timeout elapses.- Param
float timeout: Maximum time to block (requires librdkafka >= v0.9.4). (Seconds)
- Returns
Number of messages still in queue.
Note
See
poll()
for a description on what callbacks may be triggered.
- init_transactions()
Initializes transactions for the producer instance.
This function ensures any transactions initiated by previous instances of the producer with the same transactional.id are completed. If the previous instance failed with a transaction in progress the previous transaction will be aborted. This function needs to be called before any other transactional or produce functions are called when the transactional.id is configured.
If the last transaction had begun completion (following transaction commit) but not yet finished, this function will await the previous transaction’s completion.
When any previous transactions have been fenced this function will acquire the internal producer id and epoch, used in all future transactional messages issued by this producer instance.
Upon successful return from this function the application has to perform at least one of the following operations within transaction.timeout.ms to avoid timing out the transaction on the broker: * produce() (et.al) * send_offsets_to_transaction() * commit_transaction() * abort_transaction()
- Parameters
timeout (float) – Maximum time to block in seconds.
- Raises
KafkaError: Use exc.args[0].retriable() to check if the operation may be retried, else treat the error as a fatal error.
- list_topics()
- list_topics([topic=None][, timeout=-1])¶
Request metadata from the cluster. This method provides the same information as listTopics(), describeTopics() and describeCluster() in the Java Admin client.
- Parameters
topic (str) – If specified, only request information about this topic, else return results for all topics in cluster. Warning: If auto.create.topics.enable is set to true on the broker and an unknown topic is specified, it will be created.
timeout (float) – The maximum response time before timing out, or -1 for infinite timeout.
- Return type
- Raises
KafkaException
- poll()
- poll([timeout])¶
Polls the producer for events and calls the corresponding callbacks (if registered).
Callbacks:
on_delivery
callbacks fromproduce()
…
- Parameters
timeout (float) – Maximum time to block waiting for events. (Seconds)
- Returns
Number of events processed (callbacks served)
- Return type
int
- produce()
- produce(topic[, value][, key][, partition][, on_delivery][, timestamp][, headers])¶
Produce message to topic. This is an asynchronous operation, an application may use the
callback
(aliason_delivery
) argument to pass a function (or lambda) that will be called frompoll()
when the message has been successfully delivered or permanently fails delivery.Currently message headers are not supported on the message returned to the callback. The
msg.headers()
will return None even if the original message had headers set.- Parameters
topic (str) – Topic to produce message to
value (str|bytes) – Message payload
key (str|bytes) – Message key
partition (int) – Partition to produce to, else uses the configured built-in partitioner.
on_delivery(err,msg) (func) – Delivery report callback to call (from
poll()
orflush()
) on successful or failed deliverytimestamp (int) – Message timestamp (CreateTime) in milliseconds since epoch UTC (requires librdkafka >= v0.9.4, api.version.request=true, and broker >= 0.10.0.0). Default value is current time.
headers (dict|list) – Message headers to set on the message. The header key must be a string while the value must be binary, unicode or None. Accepts a list of (key,value) or a dict. (Requires librdkafka >= v0.11.4 and broker version >= 0.11.0.0)
- Return type
None
- Raises
BufferError – if the internal producer message queue is full (
queue.buffering.max.messages
exceeded)KafkaException – for other errors, see exception code
NotImplementedError – if timestamp is specified without underlying library support.
- purge()
- purge([in_queue=True][, in_flight=True][, blocking=True])¶
Purge messages currently handled by the producer instance. The application will need to call poll() or flush() afterwards to serve the delivery report callbacks of the purged messages.
- Param
bool in_queue: Purge messages from internal queues. By default, true.
- Param
bool in_flight: Purge messages in flight to or from the broker. By default, true.
- Param
bool blocking: If set to False, will not wait on background thread queue purging to finish. By default, true.
- send_offsets_to_transaction()
- send_offsets_to_transaction(positions, group_metadata[, timeout])¶
Sends a list of topic partition offsets to the consumer group coordinator for group_metadata and marks the offsets as part of the current transaction. These offsets will be considered committed only if the transaction is committed successfully.
The offsets should be the next message your application will consume, i.e., the last processed message’s offset + 1 for each partition. Either track the offsets manually during processing or use consumer.position() (on the consumer) to get the current offsets for the partitions assigned to the consumer.
Use this method at the end of a consume-transform-produce loop prior to committing the transaction with commit_transaction().
- Note: The consumer must disable auto commits
(set enable.auto.commit to false on the consumer).
Note: Logical and invalid offsets (e.g., OFFSET_INVALID) in offsets will be ignored. If there are no valid offsets in offsets the function will return successfully and no action will be taken.
- Parameters
offsets (list(TopicPartition)) – current consumer/processing position(offsets) for the list of partitions.
group_metadata (object) – consumer group metadata retrieved from the input consumer’s get_consumer_group_metadata().
timeout (float) – Amount of time to block in seconds.
- Raises
KafkaError: Use exc.args[0].retriable() to check if the operation may be retried, or exc.args[0].txn_requires_abort() if the current transaction has failed and must be aborted by calling abort_transaction() and then start a new transaction with begin_transaction(). Treat any other error as a fatal error.
- set_sasl_credentials()
- set_sasl_credentials(username, password)¶
Sets the SASL credentials used for this client. These credentials will overwrite the old ones, and will be used the next time the client needs to authenticate. This method will not disconnect existing broker connections that have been established with the old credentials. This method is applicable only to SASL PLAIN and SCRAM mechanisms.
SerializingProducer (experimental)¶
- class confluent_kafka.SerializingProducer(conf)[source]¶
A high level Kafka producer with serialization capabilities.
This class is experimental and likely to be removed, or subject to incompatible API changes in future versions of the library. To avoid breaking changes on upgrading, we recommend using serializers directly.
Derived from the
Producer
class, overriding theProducer.produce()
method to add serialization capabilities.Additional configuration properties:
Property Name
Type
Description
key.serializer
callable
Callable(obj, SerializationContext) -> bytes
Serializer used for message keys.
value.serializer
callable
Callable(obj, SerializationContext) -> bytes
Serializer used for message values.
Serializers for string, integer and double (
StringSerializer
,IntegerSerializer
andDoubleSerializer
) are supplied out-of-the-box in theconfluent_kafka.serialization
namespace.Serializers for Protobuf, JSON Schema and Avro (
ProtobufSerializer
,JSONSerializer
andAvroSerializer
) with Confluent Schema Registry integration are supplied out-of-the-box in theconfluent_kafka.schema_registry
namespace.See also
The Configuration Guide for in depth information on how to configure the client.
CONFIGURATION.md for a comprehensive set of configuration properties.
STATISTICS.md for detailed information on the statistics provided by stats_cb
The
Producer
class for inherited methods.
- Parameters
conf (producer) – SerializingProducer configuration.
- Inherited-members
- produce(topic, key=None, value=None, partition=- 1, on_delivery=None, timestamp=0, headers=None)[source]¶
Produce a message.
This is an asynchronous operation. An application may use the
on_delivery
argument to pass a function (or lambda) that will be called fromSerializingProducer.poll()
when the message has been successfully delivered or permanently fails delivery.Note
Currently message headers are not supported on the message returned to the callback. The
msg.headers()
will return None even if the original message had headers set.- Parameters
topic (str) – Topic to produce message to.
key (object, optional) – Message payload key.
value (object, optional) – Message payload value.
partition (int, optional) – Partition to produce to, else the configured built-in partitioner will be used.
on_delivery (callable(KafkaError, Message), optional) – Delivery report callback. Called as a side effect of
SerializingProducer.poll()
orSerializingProducer.flush()
on successful or failed delivery.timestamp (float, optional) – Message timestamp (CreateTime) in milliseconds since Unix epoch UTC (requires broker >= 0.10.0.0). Default value is current time.
headers (dict, optional) – Message headers. The header key must be a str while the value must be binary, unicode or None. (Requires broker version >= 0.11.0.0)
- Raises
BufferError – if the internal producer message queue is full. (
queue.buffering.max.messages
exceeded). If this happens the application should callSerializingProducer.Poll()
and try again.KeySerializationError – If an error occurs during key serialization.
ValueSerializationError – If an error occurs during value serialization.
KafkaException – For all other errors
SchemaRegistryClient¶
- class confluent_kafka.schema_registry.SchemaRegistryClient(conf: dict)[source]¶
A Confluent Schema Registry client.
Configuration properties (* indicates a required field):
Property name
type
Description
url
*str
Comma-separated list of Schema Registry URLs.
ssl.ca.location
str
Path to CA certificate file used to verify the Schema Registry’s private key.
ssl.key.location
str
Path to client’s private key (PEM) used for authentication.
ssl.certificate.location
must also be set.ssl.certificate.location
str
Path to client’s public key (PEM) used for authentication.
May be set without ssl.key.location if the private key is stored within the PEM as well.
basic.auth.user.info
str
Client HTTP credentials in the form of
username:password
.By default userinfo is extracted from the URL if present.
proxy
str
Proxy such as http://localhost:8030.
timeout
int
Request timeout.
cache.capacity
int
Cache capacity. Defaults to 1000.
cache.latest.ttl.sec
int
TTL in seconds for caching the latest schema.
max.retries
int
Maximum retries for a request. Defaults to 2.
retries.wait.ms
int
Maximum time to wait for the first retry. When jitter is applied, the actual wait may be less.
Defaults to 1000.
- Parameters
conf (dict) – Schema Registry client configuration.
- register_schema(subject_name: str, schema: confluent_kafka.schema_registry.schema_registry_client.Schema, normalize_schemas: bool = False) → int[source]¶
Registers a schema under
subject_name
.- Parameters
subject_name (str) – subject to register a schema under
schema (Schema) – Schema instance to register
normalize_schemas (bool) – Normalize schema before registering
- Returns
Schema id
- Return type
int
- Raises
SchemaRegistryError – if Schema violates this subject’s Compatibility policy or is otherwise invalid.
See also
- register_schema_full_response(subject_name: str, schema: confluent_kafka.schema_registry.schema_registry_client.Schema, normalize_schemas: bool = False) → confluent_kafka.schema_registry.schema_registry_client.RegisteredSchema[source]¶
Registers a schema under
subject_name
.- Parameters
subject_name (str) – subject to register a schema under
schema (Schema) – Schema instance to register
normalize_schemas (bool) – Normalize schema before registering
- Returns
Schema id
- Return type
int
- Raises
SchemaRegistryError – if Schema violates this subject’s Compatibility policy or is otherwise invalid.
See also
- get_schema(schema_id: int, subject_name: Optional[str] = None, fmt: Optional[str] = None) → confluent_kafka.schema_registry.schema_registry_client.Schema[source]¶
Fetches the schema associated with
schema_id
from the Schema Registry. The result is cached so subsequent attempts will not require an additional round-trip to the Schema Registry.- Parameters
schema_id (int) – Schema id
subject_name (str) – Subject name the schema is registered under
fmt (str) – Format of the schema
- Returns
Schema instance identified by the
schema_id
- Return type
- Raises
SchemaRegistryError – If schema can’t be found.
See also
- lookup_schema(subject_name: str, schema: confluent_kafka.schema_registry.schema_registry_client.Schema, normalize_schemas: bool = False, deleted: bool = False) → confluent_kafka.schema_registry.schema_registry_client.RegisteredSchema[source]¶
Returns
schema
registration information forsubject
.- Parameters
subject_name (str) – Subject name the schema is registered under
schema (Schema) – Schema instance.
normalize_schemas (bool) – Normalize schema before registering
deleted (bool) – Whether to include deleted schemas.
- Returns
Subject registration information for this schema.
- Return type
- Raises
SchemaRegistryError – If schema or subject can’t be found
See also
- get_subjects() → List[str][source]¶
List all subjects registered with the Schema Registry
- Returns
Registered subject names
- Return type
list(str)
- Raises
SchemaRegistryError – if subjects can’t be found
See also
- delete_subject(subject_name: str, permanent: bool = False) → List[int][source]¶
Deletes the specified subject and its associated compatibility level if registered. It is recommended to use this API only when a topic needs to be recycled or in development environments.
- Parameters
subject_name (str) – subject name
permanent (bool) – True for a hard delete, False (default) for a soft delete
- Returns
Versions deleted under this subject
- Return type
list(int)
- Raises
SchemaRegistryError – if the request was unsuccessful.
See also
- get_latest_version(subject_name: str, fmt: Optional[str] = None) → confluent_kafka.schema_registry.schema_registry_client.RegisteredSchema[source]¶
Retrieves latest registered version for subject
- Parameters
subject_name (str) – Subject name.
fmt (str) – Format of the schema
- Returns
Registration information for this version.
- Return type
- Raises
SchemaRegistryError – if the version can’t be found or is invalid.
See also
- get_latest_with_metadata(subject_name: str, metadata: Dict[str, str], deleted: bool = False, fmt: Optional[str] = None) → confluent_kafka.schema_registry.schema_registry_client.RegisteredSchema[source]¶
Retrieves latest registered version for subject with the given metadata
- Parameters
subject_name (str) – Subject name.
metadata (dict) – The key-value pairs for the metadata.
deleted (bool) – Whether to include deleted schemas.
fmt (str) – Format of the schema
- Returns
Registration information for this version.
- Return type
- Raises
SchemaRegistryError – if the version can’t be found or is invalid.
- get_version(subject_name: str, version: int, deleted: bool = False, fmt: Optional[str] = None) → confluent_kafka.schema_registry.schema_registry_client.RegisteredSchema[source]¶
Retrieves a specific schema registered under
subject_name
.- Parameters
subject_name (str) – Subject name.
version (int) – version number. Defaults to latest version.
deleted (bool) – Whether to include deleted schemas.
fmt (str) – Format of the schema
- Returns
Registration information for this version.
- Return type
- Raises
SchemaRegistryError – if the version can’t be found or is invalid.
See also
- get_versions(subject_name: str) → List[int][source]¶
Get a list of all versions registered with this subject.
- Parameters
subject_name (str) – Subject name.
- Returns
Registered versions
- Return type
list(int)
- Raises
SchemaRegistryError – If subject can’t be found
See also
- delete_version(subject_name: str, version: int, permanent: bool = False) → int[source]¶
Deletes a specific version registered to
subject_name
.- Parameters
subject_name (str) –
version (int) – Version number
permanent (bool) – True for a hard delete, False (default) for a soft delete
- Returns
Version number which was deleted
- Return type
int
- Raises
SchemaRegistryError – if the subject or version cannot be found.
See also
- set_compatibility(subject_name: Optional[str] = None, level: Optional[str] = None) → str[source]¶
Update global or subject level compatibility level.
- Parameters
level (str) – Compatibility level. See API reference for a list of valid values.
subject_name (str, optional) – Subject to update. Sets global compatibility level policy if not set.
- Returns
The newly configured compatibility level.
- Return type
str
- Raises
SchemaRegistryError – If the compatibility level is invalid.
- get_compatibility(subject_name: Optional[str] = None) → str[source]¶
Get the current compatibility level.
- Parameters
subject_name (str, optional) – Subject name. Returns global policy if left unset.
- Returns
Compatibility level for the subject if set, otherwise the global compatibility level.
- Return type
str
- Raises
SchemaRegistryError – if the request was unsuccessful.
- test_compatibility(subject_name: str, schema: confluent_kafka.schema_registry.schema_registry_client.Schema, version: Union[int, str] = 'latest') → bool[source]¶
Test the compatibility of a candidate schema for a given subject and version
- Parameters
subject_name (str) – Subject name the schema is registered under
schema (Schema) – Schema instance.
version (int or str, optional) – Version number, or the string “latest”. Defaults to “latest”.
- Returns
True if the schema is compatible with the specified version
- Return type
bool
- Raises
SchemaRegistryError – if the request was unsuccessful.
- set_config(subject_name: Optional[str] = None, config: Optional[confluent_kafka.schema_registry.schema_registry_client.ServerConfig] = None) → confluent_kafka.schema_registry.schema_registry_client.ServerConfig[source]¶
Update global or subject config.
- Parameters
config (ServerConfig) – Config. See API reference for a list of valid values.
subject_name (str, optional) – Subject to update. Sets global config if not set.
- Returns
The newly configured config.
- Return type
str
- Raises
SchemaRegistryError – If the config is invalid.
See also
- get_config(subject_name: Optional[str] = None) → confluent_kafka.schema_registry.schema_registry_client.ServerConfig[source]¶
Get the current config.
- Parameters
subject_name (str, optional) – Subject name. Returns global config if left unset.
- Returns
Config for the subject if set, otherwise the global config.
- Return type
ServerConfig
- Raises
SchemaRegistryError – if the request was unsuccessful.
See also
Serialization API¶
Deserializer¶
- class confluent_kafka.serialization.Deserializer[source]¶
Extensible class from which all Deserializer implementations derive. Deserializers instruct Kafka clients on how to convert bytes to objects.
See built-in implementations, listed below, for an example of how to extend this class.
Note
This class is not directly instantiable. The derived classes must be used instead.
The following implementations are provided by this module.
Note
Unless noted elsewhere all numeric types are signed and serialization is big-endian.
Name
Type
Binary Format
DoubleDeserializer
float
IEEE 764 binary64
IntegerDeserializer
int
int32
StringDeserializer
unicode
unicode(encoding)
- __call__(value, ctx=None)[source]¶
Convert bytes to object
- Parameters
value (bytes) – bytes to be deserialized
ctx (SerializationContext) – Metadata pertaining to the serialization operation
- Raises
SerializerError if an error occurs during deserialization –
- Returns
object if data is not None, otherwise None
AvroDeserializer¶
- class confluent_kafka.schema_registry.avro.AvroDeserializer(schema_registry_client: confluent_kafka.schema_registry.schema_registry_client.SchemaRegistryClient, schema_str: Optional[Union[str, confluent_kafka.schema_registry.schema_registry_client.Schema]] = None, from_dict: Optional[Callable[[dict, confluent_kafka.serialization.SerializationContext], object]] = None, return_record_name: bool = False, conf: Optional[dict] = None, rule_conf: Optional[dict] = None, rule_registry: Optional[confluent_kafka.schema_registry.rule_registry.RuleRegistry] = None)[source]¶
Deserializer for Avro binary encoded data with Confluent Schema Registry framing.
Property Name
Type
Description
use.latest.version
bool
Whether to use the latest subject version for deserialization.
Defaults to False.
use.latest.with.metadata
bool
Whether to use the latest subject version with the given metadata.
Defaults to None.
subject.name.strategy
callable
Callable(SerializationContext, str) -> str
Defines how Schema Registry subject names are constructed. Standard naming strategies are defined in the confluent_kafka.schema_registry namespace.
Defaults to topic_subject_name_strategy.
Note
By default, Avro complex types are returned as dicts. This behavior can be overridden by registering a callable
from_dict
with the deserializer to convert the dicts to the desired type.See
avro_consumer.py
in the examples directory in the examples directory for example usage.- Parameters
schema_registry_client (SchemaRegistryClient) – Confluent Schema Registry client instance.
schema_str (str, Schema, optional) – Avro reader schema declaration Accepts either a string or a
Schema
instance. If not provided, the writer schema will be used as the reader schema. Note that string definitions cannot reference other schemas. For referencing other schemas, use aSchema
instance.from_dict (callable, optional) – Callable(dict, SerializationContext) -> object. Converts a dict to an instance of some object.
return_record_name (bool) – If True, when reading a union of records, the result will be a tuple where the first value is the name of the record and the second value is the record itself. Defaults to False.
- __call__(data: bytes, ctx: Optional[confluent_kafka.serialization.SerializationContext] = None) → Optional[Union[dict, object]][source]¶
Deserialize Avro binary encoded data with Confluent Schema Registry framing to a dict, or object instance according to from_dict, if specified.
- Parameters
data (bytes) – bytes
ctx (SerializationContext) – Metadata relevant to the serialization operation.
- Raises
SerializerError – if an error occurs parsing data.
- Returns
- If data is None, then None. Else, a dict, or object instance according
to from_dict, if specified.
- Return type
object
DoubleDeserializer¶
- class confluent_kafka.serialization.DoubleDeserializer[source]¶
Deserializes float to IEEE 764 binary64.
See also
- __call__(value, ctx=None)[source]¶
Deserializes float from IEEE 764 binary64 bytes.
- Parameters
value (bytes) – bytes to be deserialized
ctx (SerializationContext) – Metadata pertaining to the serialization operation
- Raises
SerializerError if an error occurs during deserialization. –
- Returns
float if data is not None, otherwise None
IntegerDeserializer¶
- class confluent_kafka.serialization.IntegerDeserializer[source]¶
Deserializes int to int32 bytes.
See also
- __call__(value, ctx=None)[source]¶
Deserializes int from int32 bytes.
- Parameters
value (bytes) – bytes to be deserialized
ctx (SerializationContext) – Metadata pertaining to the serialization operation
- Raises
SerializerError if an error occurs during deserialization. –
- Returns
int if data is not None, otherwise None
JSONDeserializer¶
- class confluent_kafka.schema_registry.json_schema.JSONDeserializer(schema_str: Optional[Union[str, confluent_kafka.schema_registry.schema_registry_client.Schema]], from_dict: Optional[Callable[[dict, confluent_kafka.serialization.SerializationContext], object]] = None, schema_registry_client: Optional[confluent_kafka.schema_registry.schema_registry_client.SchemaRegistryClient] = None, conf: Optional[dict] = None, rule_conf: Optional[dict] = None, rule_registry: Optional[confluent_kafka.schema_registry.rule_registry.RuleRegistry] = None)[source]¶
Deserializer for JSON encoded data with Confluent Schema Registry framing.
Configuration properties:
Property Name
Type
Description
use.latest.version
bool
Whether to use the latest subject version for deserialization.
Defaults to False.
use.latest.with.metadata
bool
Whether to use the latest subject version with the given metadata.
Defaults to None.
subject.name.strategy
callable
Callable(SerializationContext, str) -> str
Defines how Schema Registry subject names are constructed. Standard naming strategies are defined in the confluent_kafka.schema_registry namespace.
Defaults to topic_subject_name_strategy.
validate
bool
Whether to validate the payload against the the given schema.
- Parameters
schema_str (str, Schema, optional) – JSON schema definition Accepts schema as either a string or a
Schema
instance. Note that string definitions cannot reference other schemas. For referencing other schemas, use aSchema
instance. If not provided, schemas will be retrieved from schema_registry_client based on the schema ID in the wire header of each message.from_dict (callable, optional) – Callable(dict, SerializationContext) -> object. Converts a dict to a Python object instance.
schema_registry_client (SchemaRegistryClient, optional) – Schema Registry client instance. Needed if
schema_str
is a schema referencing other schemas or is not provided.
- __call__(data: bytes, ctx: Optional[confluent_kafka.serialization.SerializationContext] = None) → Optional[Union[dict, object]][source]¶
Deserialize a JSON encoded record with Confluent Schema Registry framing to a dict, or object instance according to from_dict if from_dict is specified.
- Parameters
data (bytes) – A JSON serialized record with Confluent Schema Registry framing.
ctx (SerializationContext) – Metadata relevant to the serialization operation.
- Returns
A dict, or object instance according to from_dict if from_dict is specified.
- Raises
SerializerError – If there was an error reading the Confluent framing data, or if
data
was not successfully validated with the configured schema.
ProtobufDeserializer¶
- class confluent_kafka.schema_registry.protobuf.ProtobufDeserializer(message_type: google.protobuf.message.Message, conf: Optional[dict] = None, schema_registry_client: Optional[confluent_kafka.schema_registry.schema_registry_client.SchemaRegistryClient] = None, rule_conf: Optional[dict] = None, rule_registry: Optional[confluent_kafka.schema_registry.rule_registry.RuleRegistry] = None)[source]¶
Deserializer for Protobuf serialized data with Confluent Schema Registry framing.
- Parameters
message_type (Message derived type) – Protobuf Message type.
conf (dict) – Configuration dictionary.
ProtobufDeserializer configuration properties:
Property Name
Type
Description
use.latest.version
bool
Whether to use the latest subject version for deserialization.
Defaults to False.
use.latest.with.metadata
bool
Whether to use the latest subject version with the given metadata.
Defaults to None.
subject.name.strategy
callable
Callable(SerializationContext, str) -> str
Defines how Schema Registry subject names are constructed. Standard naming strategies are defined in the confluent_kafka. schema_registry namespace .
Defaults to topic_subject_name_strategy.
use.deprecated.format
bool
Specifies whether the Protobuf deserializer should deserialize message indexes without zig-zag encoding. This option must be explicitly configured as older and newer Protobuf producers are incompatible. If Protobuf messages in the topic to consume were produced with confluent-kafka-python <1.8 then this property must be set to True until all old messages have been processed and producers have been upgraded. Warning: This configuration property will be removed in a future version of the client.
See Also: Protobuf API reference
- __call__(data: bytes, ctx: Optional[confluent_kafka.serialization.SerializationContext] = None) → Optional[google.protobuf.message.Message][source]¶
Deserialize a serialized protobuf message with Confluent Schema Registry framing.
- Parameters
data (bytes) – Serialized protobuf message with Confluent Schema Registry framing.
ctx (SerializationContext) – Metadata relevant to the serialization operation.
- Returns
Protobuf Message instance.
- Return type
- Raises
SerializerError – If there was an error reading the Confluent framing data, or parsing the protobuf serialized message.
StringDeserializer¶
- class confluent_kafka.serialization.StringDeserializer(codec='utf_8')[source]¶
Deserializes a str(py2:unicode) from bytes.
- Parameters
codec (str, optional) – encoding scheme. Defaults to utf_8
- __call__(value, ctx=None)[source]¶
Serializes unicode to bytes per the configured codec. Defaults to
utf_8
.- Compatibility Note:
Python 2 str objects must be converted to unicode objects by the application prior to using this serializer.
Python 3 all str objects are already unicode objects.
- Parameters
value (bytes) – bytes to be deserialized
ctx (SerializationContext) – Metadata pertaining to the serialization operation
- Raises
SerializerError if an error occurs during deserialization. –
- Returns
unicode if data is not None, otherwise None
Serializer¶
- class confluent_kafka.serialization.Serializer[source]¶
Extensible class from which all Serializer implementations derive. Serializers instruct Kafka clients on how to convert Python objects to bytes.
See built-in implementations, listed below, for an example of how to extend this class.
Note
This class is not directly instantiable. The derived classes must be used instead.
The following implementations are provided by this module.
Note
Unless noted elsewhere all numeric types are signed and serialization is big-endian.
Name
Type
Binary Format
DoubleSerializer
float
IEEE 764 binary64
IntegerSerializer
int
int32
StringSerializer
unicode
unicode(encoding)
- __call__(obj, ctx=None)[source]¶
Converts obj to bytes.
- Parameters
obj (object) – object to be serialized
ctx (SerializationContext) – Metadata pertaining to the serialization operation
- Raises
SerializerError if an error occurs during serialization –
- Returns
bytes if obj is not None, otherwise None
AvroSerializer¶
- class confluent_kafka.schema_registry.avro.AvroSerializer(schema_registry_client: confluent_kafka.schema_registry.schema_registry_client.SchemaRegistryClient, schema_str: Optional[Union[str, confluent_kafka.schema_registry.schema_registry_client.Schema]] = None, to_dict: Optional[Callable[[object, confluent_kafka.serialization.SerializationContext], dict]] = None, conf: Optional[dict] = None, rule_conf: Optional[dict] = None, rule_registry: Optional[confluent_kafka.schema_registry.rule_registry.RuleRegistry] = None)[source]¶
Serializer that outputs Avro binary encoded data with Confluent Schema Registry framing.
Configuration properties:
Property Name
Type
Description
auto.register.schemas
bool
If True, automatically register the configured schema with Confluent Schema Registry if it has not previously been associated with the relevant subject (determined via subject.name.strategy).
Defaults to True.
normalize.schemas
bool
Whether to normalize schemas, which will transform schemas to have a consistent format, including ordering properties and references.
use.latest.version
bool
Whether to use the latest subject version for serialization.
WARNING: There is no check that the latest schema is backwards compatible with the object being serialized.
Defaults to False.
use.latest.with.metadata
bool
Whether to use the latest subject version with the given metadata.
WARNING: There is no check that the latest schema is backwards compatible with the object being serialized.
Defaults to None.
subject.name.strategy
callable
Callable(SerializationContext, str) -> str
Defines how Schema Registry subject names are constructed. Standard naming strategies are defined in the confluent_kafka.schema_registry namespace.
Defaults to topic_subject_name_strategy.
Schemas are registered against subject names in Confluent Schema Registry that define a scope in which the schemas can be evolved. By default, the subject name is formed by concatenating the topic name with the message field (key or value) separated by a hyphen.
i.e. {topic name}-{message field}
Alternative naming strategies may be configured with the property
subject.name.strategy
.Supported subject name strategies:
Subject Name Strategy
Output Format
topic_subject_name_strategy(default)
{topic name}-{message field}
topic_record_subject_name_strategy
{topic name}-{record name}
record_subject_name_strategy
{record name}
See Subject name strategy for additional details.
Note
Prior to serialization, all values must first be converted to a dict instance. This may handled manually prior to calling
Producer.produce()
or by registering a to_dict callable with AvroSerializer.See
avro_producer.py
in the examples directory for example usage.Note
Tuple notation can be used to determine which branch of an ambiguous union to take.
- Parameters
schema_registry_client (SchemaRegistryClient) – Schema Registry client instance.
schema_str (str or Schema) – Avro Schema Declaration. Accepts either a string or a
Schema
instance. Note that string definitions cannot reference other schemas. For referencing other schemas, use aSchema
instance.to_dict (callable, optional) – Callable(object, SerializationContext) -> dict. Converts object to a dict.
conf (dict) – AvroSerializer configuration.
- __call__(obj: object, ctx: Optional[confluent_kafka.serialization.SerializationContext] = None) → Optional[bytes][source]¶
Serializes an object to Avro binary format, prepending it with Confluent Schema Registry framing.
- Parameters
obj (object) – The object instance to serialize.
ctx (SerializationContext) – Metadata pertaining to the serialization operation.
- Raises
SerializerError – If any error occurs serializing obj.
SchemaRegistryError – If there was an error registering the schema with Schema Registry, or auto.register.schemas is false and the schema was not registered.
- Returns
Confluent Schema Registry encoded Avro bytes
- Return type
bytes
DoubleSerializer¶
- class confluent_kafka.serialization.DoubleSerializer[source]¶
Serializes float to IEEE 764 binary64.
See also
- __call__(obj, ctx=None)[source]¶
- Parameters
obj (object) – object to be serialized
ctx (SerializationContext) – Metadata pertaining to the serialization operation
Note
None objects are represented as Kafka Null.
- Raises
SerializerError if an error occurs during serialization. –
- Returns
IEEE 764 binary64 bytes if obj is not None, otherwise None
IntegerSerializer¶
- class confluent_kafka.serialization.IntegerSerializer[source]¶
Serializes int to int32 bytes.
See also
- __call__(obj, ctx=None)[source]¶
Serializes int as int32 bytes.
- Parameters
obj (object) – object to be serialized
ctx (SerializationContext) – Metadata pertaining to the serialization operation
Note
None objects are represented as Kafka Null.
- Raises
SerializerError if an error occurs during serialization –
- Returns
int32 bytes if obj is not None, else None
JSONSerializer¶
- class confluent_kafka.schema_registry.json_schema.JSONSerializer(schema_str: Optional[Union[str, confluent_kafka.schema_registry.schema_registry_client.Schema]], schema_registry_client: confluent_kafka.schema_registry.schema_registry_client.SchemaRegistryClient, to_dict: Optional[Callable[[object, confluent_kafka.serialization.SerializationContext], dict]] = None, conf: Optional[dict] = None, rule_conf: Optional[dict] = None, rule_registry: Optional[confluent_kafka.schema_registry.rule_registry.RuleRegistry] = None)[source]¶
Serializer that outputs JSON encoded data with Confluent Schema Registry framing.
Configuration properties:
Property Name
Type
Description
auto.register.schemas
bool
If True, automatically register the configured schema with Confluent Schema Registry if it has not previously been associated with the relevant subject (determined via subject.name.strategy).
Defaults to True.
Raises SchemaRegistryError if the schema was not registered against the subject, or could not be successfully registered.
normalize.schemas
bool
Whether to normalize schemas, which will transform schemas to have a consistent format, including ordering properties and references.
use.latest.version
bool
Whether to use the latest subject version for serialization.
WARNING: There is no check that the latest schema is backwards compatible with the object being serialized.
Defaults to False.
use.latest.with.metadata
bool
Whether to use the latest subject version with the given metadata.
WARNING: There is no check that the latest schema is backwards compatible with the object being serialized.
Defaults to None.
subject.name.strategy
callable
Callable(SerializationContext, str) -> str
Defines how Schema Registry subject names are constructed. Standard naming strategies are defined in the confluent_kafka.schema_registry namespace.
Defaults to topic_subject_name_strategy.
validate
bool
Whether to validate the payload against the the given schema.
Schemas are registered against subject names in Confluent Schema Registry that define a scope in which the schemas can be evolved. By default, the subject name is formed by concatenating the topic name with the message field (key or value) separated by a hyphen.
i.e. {topic name}-{message field}
Alternative naming strategies may be configured with the property
subject.name.strategy
.Supported subject name strategies:
Subject Name Strategy
Output Format
topic_subject_name_strategy(default)
{topic name}-{message field}
topic_record_subject_name_strategy
{topic name}-{record name}
record_subject_name_strategy
{record name}
See Subject name strategy for additional details.
Notes
The
title
annotation, referred to elsewhere as a record name is not strictly required by the JSON Schema specification. It is however required by this serializer in order to register the schema with Confluent Schema Registry.Prior to serialization, all objects must first be converted to a dict instance. This may be handled manually prior to calling
Producer.produce()
or by registering a to_dict callable with JSONSerializer.- Parameters
schema_str (str, Schema) – JSON Schema definition. Accepts schema as either a string or a
Schema
instance. Note that string definitions cannot reference other schemas. For referencing other schemas, use aSchema
instance.schema_registry_client (SchemaRegistryClient) – Schema Registry client instance.
to_dict (callable, optional) – Callable(object, SerializationContext) -> dict. Converts object to a dict.
conf (dict) – JsonSerializer configuration.
- __call__(obj: object, ctx: Optional[confluent_kafka.serialization.SerializationContext] = None) → Optional[bytes][source]¶
Serializes an object to JSON, prepending it with Confluent Schema Registry framing.
- Parameters
obj (object) – The object instance to serialize.
ctx (SerializationContext) – Metadata relevant to the serialization operation.
- Raises
SerializerError if any error occurs serializing obj. –
- Returns
None if obj is None, else a byte array containing the JSON serialized data with Confluent Schema Registry framing.
- Return type
bytes
ProtobufSerializer¶
- class confluent_kafka.schema_registry.protobuf.ProtobufSerializer(msg_type: google.protobuf.message.Message, schema_registry_client: confluent_kafka.schema_registry.schema_registry_client.SchemaRegistryClient, conf: Optional[dict] = None, rule_conf: Optional[dict] = None, rule_registry: Optional[confluent_kafka.schema_registry.rule_registry.RuleRegistry] = None)[source]¶
Serializer for Protobuf Message derived classes. Serialization format is Protobuf, with Confluent Schema Registry framing.
Configuration properties:
Property Name
Type
Description
auto.register.schemas
bool
If True, automatically register the configured schema with Confluent Schema Registry if it has not previously been associated with the relevant subject (determined via subject.name.strategy).
Defaults to True.
Raises SchemaRegistryError if the schema was not registered against the subject, or could not be successfully registered.
normalize.schemas
bool
Whether to normalize schemas, which will transform schemas to have a consistent format, including ordering properties and references.
use.latest.version
bool
Whether to use the latest subject version for serialization.
WARNING: There is no check that the latest schema is backwards compatible with the object being serialized.
Defaults to False.
use.latest.with.metadata
bool
Whether to use the latest subject version with the given metadata.
WARNING: There is no check that the latest schema is backwards compatible with the object being serialized.
Defaults to None.
skip.known.types
bool
Whether or not to skip known types when resolving schema dependencies.
Defaults to True.
subject.name.strategy
callable
Callable(SerializationContext, str) -> str
Defines how Schema Registry subject names are constructed. Standard naming strategies are defined in the confluent_kafka.schema_registry namespace.
Defaults to topic_subject_name_strategy.
reference.subject.name.strategy
callable
Callable(SerializationContext, str) -> str
Defines how Schema Registry subject names for schema references are constructed.
Defaults to reference_subject_name_strategy
use.deprecated.format
bool
Specifies whether the Protobuf serializer should serialize message indexes without zig-zag encoding. This option must be explicitly configured as older and newer Protobuf producers are incompatible. If the consumers of the topic being produced to are using confluent-kafka-python <1.8 then this property must be set to True until all old consumers have have been upgraded.
Warning: This configuration property will be removed in a future version of the client.
Schemas are registered against subject names in Confluent Schema Registry that define a scope in which the schemas can be evolved. By default, the subject name is formed by concatenating the topic name with the message field (key or value) separated by a hyphen.
i.e. {topic name}-{message field}
Alternative naming strategies may be configured with the property
subject.name.strategy
.Supported subject name strategies
Subject Name Strategy
Output Format
topic_subject_name_strategy(default)
{topic name}-{message field}
topic_record_subject_name_strategy
{topic name}-{record name}
record_subject_name_strategy
{record name}
See Subject name strategy for additional details.
- Parameters
msg_type (Message) – Protobuf Message type.
schema_registry_client (SchemaRegistryClient) – Schema Registry client instance.
conf (dict) – ProtobufSerializer configuration.
See also
- __call__(message: google.protobuf.message.Message, ctx: Optional[confluent_kafka.serialization.SerializationContext] = None) → Optional[bytes][source]¶
Serializes an instance of a class derived from Protobuf Message, and prepends it with Confluent Schema Registry framing.
- Parameters
message (Message) – An instance of a class derived from Protobuf Message.
ctx (SerializationContext) – Metadata relevant to the serialization. operation.
- Raises
SerializerError if any error occurs during serialization. –
- Returns
None if messages is None, else a byte array containing the Protobuf serialized message with Confluent Schema Registry framing.
StringSerializer¶
- class confluent_kafka.serialization.StringSerializer(codec='utf_8')[source]¶
Serializes unicode to bytes per the configured codec. Defaults to
utf_8
.Note
None objects are represented as Kafka Null.
- Parameters
codec (str, optional) – encoding scheme. Defaults to utf_8
- __call__(obj, ctx=None)[source]¶
Serializes a str(py2:unicode) to bytes.
- Compatibility Note:
Python 2 str objects must be converted to unicode objects. Python 3 all str objects are already unicode objects.
- Parameters
obj (object) – object to be serialized
ctx (SerializationContext) – Metadata pertaining to the serialization operation
- Raises
SerializerError if an error occurs during serialization. –
- Returns
serialized bytes if obj is not None, otherwise None
Supporting Classes¶
Message¶
- class confluent_kafka.Message¶
The Message object represents either a single consumed or produced message, or an event (
error()
is not None).An application must check with
error()
to see if the object is a proper message (error() returns None) or an error/event.This class is not user-instantiable.
- len()¶
- Returns
Message value (payload) size in bytes
- Return type
int
- error()¶
The message object is also used to propagate errors and events, an application must check error() to determine if the Message is a proper message (error() returns None) or an error or event (error() returns a KafkaError object)
- Return type
None or
KafkaError
- headers()¶
Retrieve the headers set on a message. Each header is a key valuepair. Please note that header keys are ordered and can repeat.
- Returns
list of two-tuples, one (key, value) pair for each header.
- Return type
[(str, bytes),…] or None.
- key()¶
- Returns
message key or None if not available.
- Return type
str|bytes or None
- latency()¶
Retrieve the time it took to produce the message, from calling produce() to the time the acknowledgement was received from the broker. Must only be used with the producer for successfully produced messages.
- returns
latency as float seconds, or None if latency information is not available (such as for errored messages).
- rtype
float or None
- leader_epoch()¶
- Returns
message offset leader epoch or None if not available.
- Return type
int or None
- offset()¶
- Returns
message offset or None if not available.
- Return type
int or None
- partition()¶
- Returns
partition number or None if not available.
- Return type
int or None
- set_headers()¶
Set the field ‘Message.headers’ with new value.
- Parameters
value (object) – Message.headers.
- Returns
None.
- Return type
None
- set_key()¶
Set the field ‘Message.key’ with new value.
- Parameters
value (object) – Message.key.
- Returns
None.
- Return type
None
- set_value()¶
Set the field ‘Message.value’ with new value.
- Parameters
value (object) – Message.value.
- Returns
None.
- Return type
None
- timestamp()¶
Retrieve timestamp type and timestamp from message. The timestamp type is one of:
TIMESTAMP_NOT_AVAILABLE
- Timestamps not supported by broker.TIMESTAMP_CREATE_TIME
- Message creation time (or source / producer time).TIMESTAMP_LOG_APPEND_TIME
- Broker receive time.
The returned timestamp should be ignored if the timestamp type is
TIMESTAMP_NOT_AVAILABLE
.The timestamp is the number of milliseconds since the epoch (UTC).
Timestamps require broker version 0.10.0.0 or later and
{'api.version.request': True}
configured on the client.- returns
tuple of message timestamp type, and timestamp.
- rtype
(int, int)
- topic()¶
- Returns
topic name or None if not available.
- Return type
str or None
- value()¶
- Returns
message value (payload) or None if not available.
- Return type
str|bytes or None
TopicPartition¶
- class confluent_kafka.TopicPartition¶
TopicPartition is a generic type to hold a single partition and various information about it.
It is typically used to provide a list of topics or partitions for various operations, such as
Consumer.assign()
.- TopicPartition(topic[, partition][, offset][, metadata][, leader_epoch])¶
Instantiate a TopicPartition object.
- Parameters
topic (string) – Topic name
partition (int) – Partition id
offset (int) – Initial partition offset
metadata (string) – Offset metadata
leader_epoch (int) – Offset leader epoch
- Return type
- error¶
Indicates an error (with
KafkaError
) unless None.- Type
attribute error
- leader_epoch¶
Offset leader epoch (int), or None
- Type
attribute leader_epoch
- metadata¶
Optional application metadata committed with the offset (string)
- Type
attribute metadata
- offset¶
Offset (long)
Either an absolute offset (>=0) or a logical offset:
OFFSET_BEGINNING
,OFFSET_END
,OFFSET_STORED
,OFFSET_INVALID
- Type
attribute offset
- partition¶
Partition number (int)
- Type
attribute partition
- topic¶
Topic name (string)
- Type
attribute topic
TopicCollection¶
TopicPartitionInfo¶
Node¶
- class confluent_kafka.Node(id, host, port, rack=None)[source]¶
Represents node information. Used by
ConsumerGroupDescription
- Parameters
id (int) – The node id of this node.
id_string – String representation of the node id.
host – The host name for this node.
port (int) – The port for this node.
rack (str) – The rack for this node.
ConsumerGroupTopicPartitions¶
- class confluent_kafka.ConsumerGroupTopicPartitions(group_id, topic_partitions=None)[source]¶
Represents consumer group and its topic partition information. Used by
AdminClient.list_consumer_group_offsets()
andAdminClient.alter_consumer_group_offsets()
.- Parameters
group_id (str) – Id of the consumer group.
topic_partitions (list(TopicPartition)) – List of topic partitions information.
ConsumerGroupState¶
- class confluent_kafka.ConsumerGroupState(value)[source]¶
Enumerates the different types of Consumer Group State.
Note that the state
UNKOWN
(typo one) is deprecated and will be removed in future major release. UseUNKNOWN
instead.- UNKNOWN = 0¶
State is not known or not set
- PREPARING_REBALANCING = 1¶
Preparing rebalance for the consumer group.
- COMPLETING_REBALANCING = 2¶
Consumer Group is completing rebalancing.
- STABLE = 3¶
Consumer Group is stable.
- DEAD = 4¶
Consumer Group is dead.
- EMPTY = 5¶
Consumer Group is empty.
Uuid¶
- class confluent_kafka.Uuid¶
Generic Uuid. Being used in various identifiers including topic_id.
- Uuid(most_significant_bits, least_significant_bits)¶
Instantiate a Uuid object.
- Parameters
most_significant_bits (long) – Most significant 64 bits of the 128 bits Uuid.
least_significant_bits (long) – Least significant 64 bits of the 128 bits Uuid.
- Return type
- get_least_significant_bits()¶
- Returns
Least significant 64 bits of the 128 bits Uuid
- Return type
int
- get_most_significant_bits()¶
- Returns
Most significant 64 bits of the 128 bits Uuid
- Return type
int
ElectionType¶
MessageField¶
SerializationContext¶
- class confluent_kafka.serialization.SerializationContext(topic, field, headers=None)[source]¶
SerializationContext provides additional context to the serializer/deserializer about the data it’s serializing/deserializing.
- Parameters
topic (str) – Topic data is being produce to or consumed from.
field (MessageField) – Describes what part of the message is being serialized.
headers (list) – List of message header tuples. Defaults to None.
Schema¶
- class confluent_kafka.schema_registry.Schema(schema_str: Optional[str], schema_type: Optional[str] = 'AVRO', references: Optional[List[confluent_kafka.schema_registry.schema_registry_client.SchemaReference]] = NOTHING, metadata: Optional[confluent_kafka.schema_registry.schema_registry_client.Metadata] = None, rule_set: Optional[confluent_kafka.schema_registry.schema_registry_client.RuleSet] = None)[source]¶
An unregistered schema.
RegisteredSchema¶
- class confluent_kafka.schema_registry.RegisteredSchema(schema_id: Optional[int], schema: Optional[confluent_kafka.schema_registry.schema_registry_client.Schema], subject: Optional[str], version: Optional[int])[source]¶
An registered schema.
SchemaRegistryError¶
- class confluent_kafka.schema_registry.error.SchemaRegistryError(http_status_code: int, error_code: int, error_message: str)[source]¶
Represents an error returned by the Confluent Schema Registry
- Parameters
http_status_code (int) – HTTP status code
error_code (int) – Schema Registry error code; -1 represents an unknown error.
error_message (str) – Description of the error
See also
KafkaError¶
- class confluent_kafka.KafkaError¶
Kafka error and event object
The KafkaError class serves multiple purposes
Propagation of errors
Propagation of events
Exceptions
- Parameters
error (KafkaError) – Error code indicating the type of error.
reason (str) – Alternative message to describe the error.
fatal (bool) – Set to true if a fatal error.
retriable (bool) – Set to true if operation is retriable.
txn_requires_abort (bool) – Set to true if this is an abortable
error. (transaction) –
Error and event constants:
Constant
Description
_BAD_MSG
Local: Bad message format
_BAD_COMPRESSION
Local: Invalid compressed data
_DESTROY
Local: Broker handle destroyed
_FAIL
Local: Communication failure with broker
_TRANSPORT
Local: Broker transport failure
_CRIT_SYS_RESOURCE
Local: Critical system resource failure
_RESOLVE
Local: Host resolution failure
_MSG_TIMED_OUT
Local: Message timed out
_PARTITION_EOF
Broker: No more messages
_UNKNOWN_PARTITION
Local: Unknown partition
_FS
Local: File or filesystem error
_UNKNOWN_TOPIC
Local: Unknown topic
_ALL_BROKERS_DOWN
Local: All broker connections are down
_INVALID_ARG
Local: Invalid argument or configuration
_TIMED_OUT
Local: Timed out
_QUEUE_FULL
Local: Queue full
_ISR_INSUFF
Local: ISR count insufficient
_NODE_UPDATE
Local: Broker node update
_SSL
Local: SSL error
_WAIT_COORD
Local: Waiting for coordinator
_UNKNOWN_GROUP
Local: Unknown group
_IN_PROGRESS
Local: Operation in progress
_PREV_IN_PROGRESS
Local: Previous operation in progress
_EXISTING_SUBSCRIPTION
Local: Existing subscription
_ASSIGN_PARTITIONS
Local: Assign partitions
_REVOKE_PARTITIONS
Local: Revoke partitions
_CONFLICT
Local: Conflicting use
_STATE
Local: Erroneous state
_UNKNOWN_PROTOCOL
Local: Unknown protocol
_NOT_IMPLEMENTED
Local: Not implemented
_AUTHENTICATION
Local: Authentication failure
_NO_OFFSET
Local: No offset stored
_OUTDATED
Local: Outdated
_TIMED_OUT_QUEUE
Local: Timed out in queue
_UNSUPPORTED_FEATURE
Local: Required feature not supported by broker
_WAIT_CACHE
Local: Awaiting cache update
_INTR
Local: Operation interrupted
_KEY_SERIALIZATION
Local: Key serialization error
_VALUE_SERIALIZATION
Local: Value serialization error
_KEY_DESERIALIZATION
Local: Key deserialization error
_VALUE_DESERIALIZATION
Local: Value deserialization error
_PARTIAL
Local: Partial response
_READ_ONLY
Local: Read-only object
_NOENT
Local: No such entry
_UNDERFLOW
Local: Read underflow
_INVALID_TYPE
Local: Invalid type
_RETRY
Local: Retry operation
_PURGE_QUEUE
Local: Purged in queue
_PURGE_INFLIGHT
Local: Purged in flight
_FATAL
Local: Fatal error
_INCONSISTENT
Local: Inconsistent state
_GAPLESS_GUARANTEE
Local: Gap-less ordering would not be guaranteed if proceeding
_MAX_POLL_EXCEEDED
Local: Maximum application poll interval (max.poll.interval.ms) exceeded
_UNKNOWN_BROKER
Local: Unknown broker
_NOT_CONFIGURED
Local: Functionality not configured
_FENCED
Local: This instance has been fenced by a newer instance
_APPLICATION
Local: Application generated error
_ASSIGNMENT_LOST
Local: Group partition assignment lost
_NOOP
Local: No operation performed
_AUTO_OFFSET_RESET
Local: No offset to automatically reset to
_LOG_TRUNCATION
Local: Partition log truncation detected
_INVALID_DIFFERENT_RECORD
Local: an invalid record in the same batch caused the failure of this message too.
UNKNOWN
Unknown broker error
NO_ERROR
Success
OFFSET_OUT_OF_RANGE
Broker: Offset out of range
INVALID_MSG
Broker: Invalid message
UNKNOWN_TOPIC_OR_PART
Broker: Unknown topic or partition
INVALID_MSG_SIZE
Broker: Invalid message size
LEADER_NOT_AVAILABLE
Broker: Leader not available
NOT_LEADER_FOR_PARTITION
Broker: Not leader for partition
REQUEST_TIMED_OUT
Broker: Request timed out
BROKER_NOT_AVAILABLE
Broker: Broker not available
REPLICA_NOT_AVAILABLE
Broker: Replica not available
MSG_SIZE_TOO_LARGE
Broker: Message size too large
STALE_CTRL_EPOCH
Broker: StaleControllerEpochCode
OFFSET_METADATA_TOO_LARGE
Broker: Offset metadata string too large
NETWORK_EXCEPTION
Broker: Broker disconnected before response received
COORDINATOR_LOAD_IN_PROGRESS
Broker: Coordinator load in progress
COORDINATOR_NOT_AVAILABLE
Broker: Coordinator not available
NOT_COORDINATOR
Broker: Not coordinator
TOPIC_EXCEPTION
Broker: Invalid topic
RECORD_LIST_TOO_LARGE
Broker: Message batch larger than configured server segment size
NOT_ENOUGH_REPLICAS
Broker: Not enough in-sync replicas
NOT_ENOUGH_REPLICAS_AFTER_APPEND
Broker: Message(s) written to insufficient number of in-sync replicas
INVALID_REQUIRED_ACKS
Broker: Invalid required acks value
ILLEGAL_GENERATION
Broker: Specified group generation id is not valid
INCONSISTENT_GROUP_PROTOCOL
Broker: Inconsistent group protocol
INVALID_GROUP_ID
Broker: Invalid group.id
UNKNOWN_MEMBER_ID
Broker: Unknown member
INVALID_SESSION_TIMEOUT
Broker: Invalid session timeout
REBALANCE_IN_PROGRESS
Broker: Group rebalance in progress
INVALID_COMMIT_OFFSET_SIZE
Broker: Commit offset data size is not valid
TOPIC_AUTHORIZATION_FAILED
Broker: Topic authorization failed
GROUP_AUTHORIZATION_FAILED
Broker: Group authorization failed
CLUSTER_AUTHORIZATION_FAILED
Broker: Cluster authorization failed
INVALID_TIMESTAMP
Broker: Invalid timestamp
UNSUPPORTED_SASL_MECHANISM
Broker: Unsupported SASL mechanism
ILLEGAL_SASL_STATE
Broker: Request not valid in current SASL state
UNSUPPORTED_VERSION
Broker: API version not supported
TOPIC_ALREADY_EXISTS
Broker: Topic already exists
INVALID_PARTITIONS
Broker: Invalid number of partitions
INVALID_REPLICATION_FACTOR
Broker: Invalid replication factor
INVALID_REPLICA_ASSIGNMENT
Broker: Invalid replica assignment
INVALID_CONFIG
Broker: Configuration is invalid
NOT_CONTROLLER
Broker: Not controller for cluster
INVALID_REQUEST
Broker: Invalid request
UNSUPPORTED_FOR_MESSAGE_FORMAT
Broker: Message format on broker does not support request
POLICY_VIOLATION
Broker: Policy violation
OUT_OF_ORDER_SEQUENCE_NUMBER
Broker: Broker received an out of order sequence number
DUPLICATE_SEQUENCE_NUMBER
Broker: Broker received a duplicate sequence number
INVALID_PRODUCER_EPOCH
Broker: Producer attempted an operation with an old epoch
INVALID_TXN_STATE
Broker: Producer attempted a transactional operation in an invalid state
INVALID_PRODUCER_ID_MAPPING
Broker: Producer attempted to use a producer id which is not currently assigned to its transactional
INVALID_TRANSACTION_TIMEOUT
Broker: Transaction timeout is larger than the maximum value allowed by the broker’s max.transaction
CONCURRENT_TRANSACTIONS
Broker: Producer attempted to update a transaction while another concurrent operation on the same tr
TRANSACTION_COORDINATOR_FENCED
Broker: Indicates that the transaction coordinator sending a WriteTxnMarker is no longer the current
TRANSACTIONAL_ID_AUTHORIZATION_FAILED
Broker: Transactional Id authorization failed
SECURITY_DISABLED
Broker: Security features are disabled
OPERATION_NOT_ATTEMPTED
Broker: Operation not attempted
KAFKA_STORAGE_ERROR
Broker: Disk error when trying to access log file on disk
LOG_DIR_NOT_FOUND
Broker: The user-specified log directory is not found in the broker config
SASL_AUTHENTICATION_FAILED
Broker: SASL Authentication failed
UNKNOWN_PRODUCER_ID
Broker: Unknown Producer Id
REASSIGNMENT_IN_PROGRESS
Broker: Partition reassignment is in progress
DELEGATION_TOKEN_AUTH_DISABLED
Broker: Delegation Token feature is not enabled
DELEGATION_TOKEN_NOT_FOUND
Broker: Delegation Token is not found on server
DELEGATION_TOKEN_OWNER_MISMATCH
Broker: Specified Principal is not valid Owner/Renewer
DELEGATION_TOKEN_REQUEST_NOT_ALLOWED
Broker: Delegation Token requests are not allowed on this connection
DELEGATION_TOKEN_AUTHORIZATION_FAILED
Broker: Delegation Token authorization failed
DELEGATION_TOKEN_EXPIRED
Broker: Delegation Token is expired
INVALID_PRINCIPAL_TYPE
Broker: Supplied principalType is not supported
NON_EMPTY_GROUP
Broker: The group is not empty
GROUP_ID_NOT_FOUND
Broker: The group id does not exist
FETCH_SESSION_ID_NOT_FOUND
Broker: The fetch session ID was not found
INVALID_FETCH_SESSION_EPOCH
Broker: The fetch session epoch is invalid
LISTENER_NOT_FOUND
Broker: No matching listener
TOPIC_DELETION_DISABLED
Broker: Topic deletion is disabled
FENCED_LEADER_EPOCH
Broker: Leader epoch is older than broker epoch
UNKNOWN_LEADER_EPOCH
Broker: Leader epoch is newer than broker epoch
UNSUPPORTED_COMPRESSION_TYPE
Broker: Unsupported compression type
STALE_BROKER_EPOCH
Broker: Broker epoch has changed
OFFSET_NOT_AVAILABLE
Broker: Leader high watermark is not caught up
MEMBER_ID_REQUIRED
Broker: Group member needs a valid member ID
PREFERRED_LEADER_NOT_AVAILABLE
Broker: Preferred leader was not available
GROUP_MAX_SIZE_REACHED
Broker: Consumer group has reached maximum size
FENCED_INSTANCE_ID
Broker: Static consumer fenced by other consumer with same group.instance.id
ELIGIBLE_LEADERS_NOT_AVAILABLE
Broker: Eligible partition leaders are not available
ELECTION_NOT_NEEDED
Broker: Leader election not needed for topic partition
NO_REASSIGNMENT_IN_PROGRESS
Broker: No partition reassignment is in progress
GROUP_SUBSCRIBED_TO_TOPIC
Broker: Deleting offsets of a topic while the consumer group is subscribed to it
INVALID_RECORD
Broker: Broker failed to validate record
UNSTABLE_OFFSET_COMMIT
Broker: There are unstable offsets that need to be cleared
THROTTLING_QUOTA_EXCEEDED
Broker: Throttling quota has been exceeded
PRODUCER_FENCED
Broker: There is a newer producer with the same transactionalId which fences the current one
RESOURCE_NOT_FOUND
Broker: Request illegally referred to resource that does not exist
DUPLICATE_RESOURCE
Broker: Request illegally referred to the same resource twice
UNACCEPTABLE_CREDENTIAL
Broker: Requested credential would not meet criteria for acceptability
INCONSISTENT_VOTER_SET
Broker: Indicates that the either the sender or recipient of a voter-only request is not one of the
INVALID_UPDATE_VERSION
Broker: Invalid update version
FEATURE_UPDATE_FAILED
Broker: Unable to update finalized features due to server error
PRINCIPAL_DESERIALIZATION_FAILURE
Broker: Request principal deserialization failed during forwarding
UNKNOWN_TOPIC_ID
Broker: Unknown topic id
FENCED_MEMBER_EPOCH
Broker: The member epoch is fenced by the group coordinator
UNRELEASED_INSTANCE_ID
Broker: The instance ID is still used by another member in the consumer group
UNSUPPORTED_ASSIGNOR
Broker: The assignor or its version range is not supported by the consumer group
STALE_MEMBER_EPOCH
Broker: The member epoch is stale
UNKNOWN_SUBSCRIPTION_ID
Broker: Client sent a push telemetry request with an invalid or outdated subscription ID
TELEMETRY_TOO_LARGE
Broker: Client sent a push telemetry request larger than the maximum size the broker will accept
- code()¶
Returns the error/event code for comparison toKafkaError.<ERR_CONSTANTS>.
- Returns
error/event code
- Return type
int
- fatal()¶
- Returns
True if this a fatal error, else False.
- Return type
bool
- name()¶
Returns the enum name for error/event.
- Returns
error/event enum name string
- Return type
str
- retriable()¶
- Returns
True if the operation that failed may be retried, else False.
- Return type
bool
- str()¶
Returns the human-readable error/event string.
- Returns
error/event message string
- Return type
str
- txn_requires_abort()¶
- Returns
True if the error is an abortable transaction error in which case application must abort the current transaction with abort_transaction() and start a new transaction with begin_transaction() if it wishes to proceed with transactional operations. This will only return true for errors from the transactional producer API.
- Return type
bool
KafkaException¶
- class confluent_kafka.KafkaException¶
Kafka exception that wraps the
KafkaError
class.Use
exception.args[0]
to extract theKafkaError
object
ConsumeError¶
- class confluent_kafka.error.ConsumeError(kafka_error, exception=None, kafka_message=None)[source]¶
Wraps all errors encountered during the consumption of a message.
Note
In the event of a serialization error the original message contents may be retrieved from the
kafka_message
attribute.- Parameters
kafka_error (KafkaError) – KafkaError instance.
exception (Exception, optional) – The original exception
kafka_message (Message, optional) – The Kafka Message
by the broker. (returned) –
ProduceError¶
- class confluent_kafka.error.ProduceError(kafka_error, exception=None)[source]¶
Wraps all errors encountered when Producing messages.
- Parameters
kafka_error (KafkaError) – KafkaError instance.
exception (Exception, optional) – The original exception.
SerializationError¶
KeySerializationError¶
ValueSerializationError¶
KeyDeserializationError¶
- class confluent_kafka.error.KeyDeserializationError(exception=None, kafka_message=None)[source]¶
Wraps all errors encountered during the deserialization of a Kafka Message’s key.
- Parameters
exception (Exception, optional) – The original exception
kafka_message (Message, optional) – The Kafka Message returned
the broker. (by) –
ValueDeserializationError¶
- class confluent_kafka.error.ValueDeserializationError(exception=None, kafka_message=None)[source]¶
Wraps all errors encountered during the deserialization of a Kafka Message’s value.
- Parameters
exception (Exception, optional) – The original exception
kafka_message (Message, optional) – The Kafka Message returned
the broker. (by) –
Offset¶
Logical offset constants:
OFFSET_BEGINNING
- Beginning of partition (oldest offset)
OFFSET_END
- End of partition (next offset)
OFFSET_STORED
- Use stored/committed offset
OFFSET_INVALID
- Invalid/Default offset
ThrottleEvent¶
- class confluent_kafka.ThrottleEvent(broker_name, broker_id, throttle_time)[source]¶
ThrottleEvent contains details about a throttled request. Set up a throttle callback by setting the
throttle_cb
configuration property to a callable that takes a ThrottleEvent object as its only argument. The callback will be triggered from poll(), consume() or flush() when a request has been throttled by the broker.This class is typically not user instantiated.
- Variables
broker_name (str) – The hostname of the broker which throttled the request
broker_id (int) – The broker id
throttle_time (float) – The amount of time (in seconds) the broker throttled (delayed) the request
IsolationLevel¶
AvroProducer (Legacy)¶
- class confluent_kafka.avro.AvroProducer(config, default_key_schema=None, default_value_schema=None, schema_registry=None, **kwargs)[source]¶
Deprecated since version 2.0.2.
This class will be removed in a future version of the library.
Kafka Producer client which does avro schema encoding to messages. Handles schema registration, Message serialization.
Constructor arguments:
- Parameters
config (dict) – Config parameters containing url for schema registry (
schema.registry.url
) and the standard Kafka client configuration (bootstrap.servers
et.al).default_key_schema (str) – Optional default avro schema for key
default_value_schema (str) – Optional default avro schema for value
- produce(**kwargs)[source]¶
Asynchronously sends message to Kafka by encoding with specified or default avro schema.
- Parameters
topic (str) – topic name
value (object) – An object to serialize
value_schema (str) – Avro schema for value
key (object) – An object to serialize
key_schema (str) – Avro schema for key
Plus any other parameters accepted by confluent_kafka.Producer.produce
- Raises
SerializerError – On serialization failure
BufferError – If producer queue is full.
KafkaException – For other produce failures.
AvroConsumer (Legacy)¶
- class confluent_kafka.avro.AvroConsumer(config, schema_registry=None, reader_key_schema=None, reader_value_schema=None, **kwargs)[source]¶
Deprecated since version 2.0.2.
This class will be removed in a future version of the library.
Kafka Consumer client which does avro schema decoding of messages. Handles message deserialization.
Constructor arguments:
- Parameters
config (dict) – Config parameters containing url for schema registry (
schema.registry.url
) and the standard Kafka client configuration (bootstrap.servers
et.al)reader_key_schema (schema) – a reader schema for the message key
reader_value_schema (schema) – a reader schema for the message value
- Raises
ValueError – For invalid configurations
- poll(timeout=None)[source]¶
This is an overriden method from confluent_kafka.Consumer class. This handles message deserialization using avro schema
- Parameters
timeout (float) – Poll timeout in seconds (default: indefinite)
- Returns
message object with deserialized key and value as dict objects
- Return type
Transactional API¶
The transactional producer operates on top of the idempotent producer,
and provides full exactly-once semantics (EOS) for Apache Kafka when used
with the transaction aware consumer (isolation.level=read_committed
).
A producer instance is configured for transactions by setting the transactional.id to an identifier unique for the application. This id will be used to fence stale transactions from previous instances of the application, typically following an outage or crash.
After creating the transactional producer instance the transactional state
must be initialized by calling
confluent_kafka.Producer.init_transactions()
.
This is a blocking call that will acquire a runtime producer id from the
transaction coordinator broker as well as abort any stale transactions and
fence any still running producer instances with the same transactional.id
.
Once transactions are initialized the application may begin a new
transaction by calling confluent_kafka.Producer.begin_transaction()
.
A producer instance may only have one single on-going transaction.
Any messages produced after the transaction has been started will
belong to the ongoing transaction and will be committed or aborted
atomically.
It is not permitted to produce messages outside a transaction
boundary, e.g., before confluent_kafka.Producer.begin_transaction()
or after confluent_kafka.commit_transaction()
,
confluent_kafka.Producer.abort_transaction()
, or after the current
transaction has failed.
If consumed messages are used as input to the transaction, the consumer
instance must be configured with enable.auto.commit set to false.
To commit the consumed offsets along with the transaction pass the
list of consumed partitions and the last offset processed + 1 to
confluent_kafka.Producer.send_offsets_to_transaction()
prior to
committing the transaction.
This allows an aborted transaction to be restarted using the previously
committed offsets.
To commit the produced messages, and any consumed offsets, to the
current transaction, call
confluent_kafka.Producer.commit_transaction()
.
This call will block until the transaction has been fully committed or
failed (typically due to fencing by a newer producer instance).
Alternatively, if processing fails, or an abortable transaction error is
raised, the transaction needs to be aborted by calling
confluent_kafka.Producer.abort_transaction()
which marks any produced
messages and offset commits as aborted.
After the current transaction has been committed or aborted a new
transaction may be started by calling
confluent_kafka.Producer.begin_transaction()
again.
Retriable errors
Some error cases allow the attempted operation to be retried, this is
indicated by the error object having the retriable flag set which can
be detected by calling confluent_kafka.KafkaError.retriable()
on
the KafkaError object.
When this flag is set the application may retry the operation immediately
or preferably after a shorter grace period (to avoid busy-looping).
Retriable errors include timeouts, broker transport failures, etc.
Abortable errors
An ongoing transaction may fail permanently due to various errors,
such as transaction coordinator becoming unavailable, write failures to the
Apache Kafka log, under-replicated partitions, etc.
At this point the producer application must abort the current transaction
using confluent_kafka.Producer.abort_transaction()
and optionally
start a new transaction by calling
confluent_kafka.Producer.begin_transaction()
.
Whether an error is abortable or not is detected by calling
confluent_kafka.KafkaError.txn_requires_abort()
.
Fatal errors
While the underlying idempotent producer will typically only raise
fatal errors for unrecoverable cluster errors where the idempotency
guarantees can’t be maintained, most of these are treated as abortable by
the transactional producer since transactions may be aborted and retried
in their entirety;
The transactional producer on the other hand introduces a set of additional
fatal errors which the application needs to handle by shutting down the
producer and terminate. There is no way for a producer instance to recover
from fatal errors.
Whether an error is fatal or not is detected by calling
confluent_kafka.KafkaError.fatal()
.
Handling of other errors
For errors that have neither retriable, abortable or the fatal flag set it is not always obvious how to handle them. While some of these errors may be indicative of bugs in the application code, such as when an invalid parameter is passed to a method, other errors might originate from the broker and be passed thru as-is to the application. The general recommendation is to treat these errors, that have neither the retriable or abortable flags set, as fatal.
Error handling example
while True:
try:
producer.commit_transaction(10.0)
break
except KafkaException as e:
if e.args[0].retriable():
# retriable error, try again
continue
elif e.args[0].txn_requires_abort():
# abort current transaction, begin a new transaction,
# and rewind the consumer to start over.
producer.abort_transaction()
producer.begin_transaction()
rewind_consumer_offsets...()
else:
# treat all other errors as fatal
raise
Kafka Client Configuration¶
Configuration of producer and consumer instances is performed by providing a dict of configuration properties to the instance constructor, e.g.
conf = {'bootstrap.servers': 'mybroker.com',
'group.id': 'mygroup',
'session.timeout.ms': 6000,
'on_commit': my_commit_callback,
'auto.offset.reset': 'earliest'}
consumer = confluent_kafka.Consumer(conf)
The Python client provides the following configuration properties in addition to the properties dictated by the underlying librdkafka C library:
default.topic.config
: value is a dict of client topic-level configuration properties that are applied to all used topics for the instance. DEPRECATED: topic configuration should now be specified in the global top-level configuration.error_cb(kafka.KafkaError)
: Callback for generic/global error events, these errors are typically to be considered informational since the client will automatically try to recover. This callback is served upon callingclient.poll()
orproducer.flush()
.throttle_cb(confluent_kafka.ThrottleEvent)
: Callback for throttled request reporting. This callback is served upon callingclient.poll()
orproducer.flush()
.stats_cb(json_str)
: Callback for statistics data. This callback is triggered by poll() or flush everystatistics.interval.ms
(needs to be configured separately). Function argumentjson_str
is a str instance of a JSON document containing statistics data. This callback is served upon callingclient.poll()
orproducer.flush()
. See https://github.com/edenhill/librdkafka/wiki/Statistics” for more information.oauth_cb(config_str)
: Callback for retrieving OAuth Bearer token. Function argumentconfig_str
is a str from config:sasl.oauthbearer.config
. Return value of this callback is expected to be(token_str, expiry_time)
tuple whereexpiry_time
is the time in seconds since the epoch as a floating point number. This callback is useful only whensasl.mechanisms=OAUTHBEARER
is set and is served to get the initial token before a successful broker connection can be made. The callback can be triggered by callingclient.poll()
orproducer.flush()
.on_delivery(kafka.KafkaError, kafka.Message)
(Producer): value is a Python function reference that is called once for each produced message to indicate the final delivery result (success or failure). This property may also be set per-message by passingcallback=callable
(oron_delivery=callable
) to the confluent_kafka.Producer.produce() function. Currently message headers are not supported on the message returned to the callback. Themsg.headers()
will return None even if the original message had headers set. This callback is served upon callingproducer.poll()
orproducer.flush()
.on_commit(kafka.KafkaError, list(kafka.TopicPartition))
(Consumer): Callback used to indicate success or failure of asynchronous and automatic commit requests. This callback is served upon callingconsumer.poll()
. Is not triggered for synchronous commits. Callback arguments: KafkaError is the commit error, or None on success. list(TopicPartition) is the list of partitions with their committed offsets or per-partition errors.logger=logging.Handler
kwarg: forward logs from the Kafka client to the providedlogging.Handler
instance. To avoid spontaneous calls from non-Python threads the log messages will only be forwarded whenclient.poll()
orproducer.flush()
are called. For example:
mylogger = logging.getLogger()
mylogger.addHandler(logging.StreamHandler())
producer = confluent_kafka.Producer({'bootstrap.servers': 'mybroker.com'}, logger=mylogger)
Note
In the Python client, the logger
configuration property is used for log handler, not log_cb
.
For the full range of configuration properties, please consult librdkafka’s documentation: https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md