confluent_kafka API

A reliable, performant and feature-rich Python client for Apache Kafka v0.8 and above.

Guides
Client API
Serialization API
Supporting classes
Experimental

These classes are experimental and are likely to be removed, or subject to incompatible API changes in future versions of the library. To avoid breaking changes on upgrading, we recommend using (de)serializers directly, as per the examples applications in the github repo.

Legacy

These classes are deprecated and will be removed in a future version of the library.

Kafka Clients

AdminClient

Kafka admin client: create, view, alter, and delete topics and resources.

class confluent_kafka.admin.AdminClient(conf, **kwargs)[source]

AdminClient provides admin operations for Kafka brokers, topics, groups, and other resource types supported by the broker.

The Admin API methods are asynchronous and return a dict of concurrent.futures.Future objects keyed by the entity. The entity is a topic name for create_topics(), delete_topics(), create_partitions(), and a ConfigResource for alter_configs() and describe_configs().

All the futures for a single API call will currently finish/fail at the same time (backed by the same protocol request), but this might change in future versions of the client.

See examples/adminapi.py for example usage.

For more information see the Java Admin API documentation.

Requires broker version v0.11.0.0 or later.

create_topics(new_topics, **kwargs)[source]

Create one or more new topics.

Parameters
  • new_topics (list(NewTopic)) – A list of specifictions (NewTopic) for the topics that should be created.

  • operation_timeout (float) – The operation timeout in seconds, controlling how long the CreateTopics request will block on the broker waiting for the topic creation to propagate in the cluster. A value of 0 returns immediately. Default: socket.timeout.ms/1000.0

  • request_timeout (float) – The overall request timeout in seconds, including broker lookup, request transmission, operation time on broker, and response. Default: socket.timeout.ms/1000.0

  • validate_only (bool) – If true, the request is only validated without creating the topic. Default: False

Returns

A dict of futures for each topic, keyed by the topic name. The future result() method returns None.

Return type

dict(<topic_name, future>)

Raises
  • KafkaException – Operation failed locally or on broker.

  • TypeException – Invalid input.

  • ValueException – Invalid input.

delete_topics(topics, **kwargs)[source]

Delete one or more topics.

Parameters
  • topics (list(str)) – A list of topics to mark for deletion.

  • operation_timeout (float) – The operation timeout in seconds, controlling how long the DeleteTopics request will block on the broker waiting for the topic deletion to propagate in the cluster. A value of 0 returns immediately. Default: socket.timeout.ms/1000.0

  • request_timeout (float) – The overall request timeout in seconds, including broker lookup, request transmission, operation time on broker, and response. Default: socket.timeout.ms/1000.0

Returns

A dict of futures for each topic, keyed by the topic name. The future result() method returns None.

Return type

dict(<topic_name, future>)

Raises
  • KafkaException – Operation failed locally or on broker.

  • TypeException – Invalid input.

  • ValueException – Invalid input.

list_topics(*args, **kwargs)[source]
list_topics([topic=None][, timeout=-1])[source]

Request metadata from the cluster. This method provides the same information as listTopics(), describeTopics() and describeCluster() in the Java Admin client.

Parameters
  • topic (str) – If specified, only request information about this topic, else return results for all topics in cluster. Warning: If auto.create.topics.enable is set to true on the broker and an unknown topic is specified, it will be created.

  • timeout (float) – The maximum response time before timing out, or -1 for infinite timeout.

Return type

ClusterMetadata

Raises

KafkaException

list_groups(*args, **kwargs)[source]

Deprecated since version 2.0.2: Use list_consumer_groups() and describe_consumer_groups instead.

list_groups([group=None][, timeout=-1])[source]

Request Group Metadata from cluster. This method provides the same information as listGroups(), describeGroups() in the Java Admin client.

Parameters

group (str) – If specified, only request info about this group, else return for all groups in cluster :param float timeout: Maximum response time before timing out, or -1 for infinite timeout.

Return type

GroupMetadata

Raises

KafkaException

create_partitions(new_partitions, **kwargs)[source]

Create additional partitions for the given topics.

Parameters
  • new_partitions (list(NewPartitions)) – New partitions to be created.

  • operation_timeout (float) – The operation timeout in seconds, controlling how long the CreatePartitions request will block on the broker waiting for the partition creation to propagate in the cluster. A value of 0 returns immediately. Default: socket.timeout.ms/1000.0

  • request_timeout (float) – The overall request timeout in seconds, including broker lookup, request transmission, operation time on broker, and response. Default: socket.timeout.ms/1000.0

  • validate_only (bool) – If true, the request is only validated without creating the partitions. Default: False

Returns

A dict of futures for each topic, keyed by the topic name. The future result() method returns None.

Return type

dict(<topic_name, future>)

Raises
  • KafkaException – Operation failed locally or on broker.

  • TypeException – Invalid input.

  • ValueException – Invalid input.

describe_configs(resources, **kwargs)[source]

Get the configuration of the specified resources.

Warning

Multiple resources and resource types may be requested, but at most one resource of type RESOURCE_BROKER is allowed per call since these resource requests must be sent to the broker specified in the resource.

Parameters
  • resources (list(ConfigResource)) – Resources to get the configuration for.

  • request_timeout (float) – The overall request timeout in seconds, including broker lookup, request transmission, operation time on broker, and response. Default: socket.timeout.ms/1000.0

Returns

A dict of futures for each resource, keyed by the ConfigResource. The type of the value returned by the future result() method is dict(<configname, ConfigEntry>).

Return type

dict(<ConfigResource, future>)

Raises
  • KafkaException – Operation failed locally or on broker.

  • TypeException – Invalid input.

  • ValueException – Invalid input.

alter_configs(resources, **kwargs)[source]

Deprecated since version 2.2.0.

Update configuration properties for the specified resources. Updates are not transactional so they may succeed for a subset of the provided resources while the others fail. The configuration for a particular resource is updated atomically, replacing the specified values while reverting unspecified configuration entries to their default values.

Warning

alter_configs() will replace all existing configuration for the provided resources with the new configuration given, reverting all other configuration for the resource back to their default values.

Warning

Multiple resources and resource types may be specified, but at most one resource of type RESOURCE_BROKER is allowed per call since these resource requests must be sent to the broker specified in the resource.

Parameters
  • resources (list(ConfigResource)) – Resources to update configuration of.

  • request_timeout (float) – The overall request timeout in seconds, including broker lookup, request transmission, operation time on broker, and response. Default: socket.timeout.ms/1000.0.

  • validate_only (bool) – If true, the request is validated only, without altering the configuration. Default: False

Returns

A dict of futures for each resource, keyed by the ConfigResource. The future result() method returns None or throws a KafkaException.

Return type

dict(<ConfigResource, future>)

Raises
  • KafkaException – Operation failed locally or on broker.

  • TypeError – Invalid type.

  • ValueError – Invalid value.

incremental_alter_configs(resources, **kwargs)[source]

Update configuration properties for the specified resources. Updates are incremental, i.e only the values mentioned are changed and rest remain as is.

Parameters
  • resources (list(ConfigResource)) – Resources to update configuration of.

  • request_timeout (float) – The overall request timeout in seconds, including broker lookup, request transmission, operation time on broker, and response. Default: socket.timeout.ms/1000.0.

  • validate_only (bool) – If true, the request is validated only, without altering the configuration. Default: False

  • broker (int) – Broker id to send the request to. When altering broker configurations, it’s ignored because the request needs to go to that broker only. Default: controller broker.

Returns

A dict of futures for each resource, keyed by the ConfigResource. The future result() method returns None or throws a KafkaException.

Return type

dict(<ConfigResource, future>)

Raises
  • KafkaException – Operation failed locally or on broker.

  • TypeError – Invalid type.

  • ValueError – Invalid value.

create_acls(acls, **kwargs)[source]

Create one or more ACL bindings.

Parameters
  • acls (list(AclBinding)) – A list of unique ACL binding specifications (AclBinding) to create.

  • request_timeout (float) – The overall request timeout in seconds, including broker lookup, request transmission, operation time on broker, and response. Default: socket.timeout.ms/1000.0

Returns

A dict of futures for each ACL binding, keyed by the AclBinding object. The future result() method returns None on success.

Return type

dict[AclBinding, future]

Raises
  • KafkaException – Operation failed locally or on broker.

  • TypeException – Invalid input.

  • ValueException – Invalid input.

describe_acls(acl_binding_filter, **kwargs)[source]

Match ACL bindings by filter.

Parameters
  • acl_binding_filter (AclBindingFilter) – a filter with attributes that must match. String attributes match exact values or any string if set to None. Enums attributes match exact values or any value if equal to ANY. If ResourcePatternType is set to ResourcePatternType.MATCH returns ACL bindings with: ResourcePatternType.LITERAL pattern type with resource name equal to the given resource name; ResourcePatternType.LITERAL pattern type with wildcard resource name that matches the given resource name; ResourcePatternType.PREFIXED pattern type with resource name that is a prefix of the given resource name

  • request_timeout (float) – The overall request timeout in seconds, including broker lookup, request transmission, operation time on broker, and response. Default: socket.timeout.ms/1000.0

Returns

A future returning a list(AclBinding) as result

Return type

future

Raises
  • KafkaException – Operation failed locally or on broker.

  • TypeException – Invalid input.

  • ValueException – Invalid input.

delete_acls(acl_binding_filters, **kwargs)[source]

Delete ACL bindings matching one or more ACL binding filters.

Parameters
  • acl_binding_filters (list(AclBindingFilter)) – a list of unique ACL binding filters to match ACLs to delete. String attributes match exact values or any string if set to None. Enums attributes match exact values or any value if equal to ANY. If ResourcePatternType is set to ResourcePatternType.MATCH deletes ACL bindings with: ResourcePatternType.LITERAL pattern type with resource name equal to the given resource name; ResourcePatternType.LITERAL pattern type with wildcard resource name that matches the given resource name; ResourcePatternType.PREFIXED pattern type with resource name that is a prefix of the given resource name

  • request_timeout (float) – The overall request timeout in seconds, including broker lookup, request transmission, operation time on broker, and response. Default: socket.timeout.ms/1000.0

Returns

A dict of futures for each ACL binding filter, keyed by the AclBindingFilter object. The future result() method returns a list of AclBinding.

Return type

dict[AclBindingFilter, future]

Raises
  • KafkaException – Operation failed locally or on broker.

  • TypeException – Invalid input.

  • ValueException – Invalid input.

list_consumer_groups(**kwargs)[source]

List consumer groups.

Parameters
  • request_timeout (float) – The overall request timeout in seconds, including broker lookup, request transmission, operation time on broker, and response. Default: socket.timeout.ms/1000.0

  • states (set(ConsumerGroupState)) – only list consumer groups which are currently in these states.

  • types (set(ConsumerGroupType)) – only list consumer groups of these types.

Returns

a future. Result method of the future returns ListConsumerGroupsResult.

Return type

future

Raises
  • KafkaException – Operation failed locally or on broker.

  • TypeException – Invalid input.

  • ValueException – Invalid input.

describe_consumer_groups(group_ids, **kwargs)[source]

Describe consumer groups.

Parameters
  • group_ids (list(str)) – List of group_ids which need to be described.

  • include_authorized_operations (bool) – If True, fetches group AclOperations. Default: False

  • request_timeout (float) – The overall request timeout in seconds, including broker lookup, request transmission, operation time on broker, and response. Default: socket.timeout.ms/1000.0

Returns

A dict of futures for each group, keyed by the group_id. The future result() method returns ConsumerGroupDescription.

Return type

dict[str, future]

Raises
  • KafkaException – Operation failed locally or on broker.

  • TypeException – Invalid input.

  • ValueException – Invalid input.

describe_topics(topics, **kwargs)[source]

Describe topics.

Parameters
  • topics (TopicCollection) – Collection of list of topic names to describe.

  • include_authorized_operations (bool) – If True, fetches topic AclOperations. Default: False

  • request_timeout (float) – The overall request timeout in seconds, including broker lookup, request transmission, operation time on broker, and response. Default: socket.timeout.ms/1000.0

Returns

A dict of futures for each topic, keyed by the topic. The future result() method returns TopicDescription.

Return type

dict[str, future]

Raises
  • KafkaException – Operation failed locally or on broker.

  • TypeError – Invalid input type.

  • ValueError – Invalid input value.

describe_cluster(**kwargs)[source]

Describe cluster.

Parameters
  • include_authorized_operations (bool) – If True, fetches topic AclOperations. Default: False

  • request_timeout (float) – The overall request timeout in seconds, including broker lookup, request transmission, operation time on broker, and response. Default: socket.timeout.ms/1000.0

Returns

A future returning description of the cluster as result

Return type

future containing the description of the cluster in result.

Raises
  • KafkaException – Operation failed locally or on broker.

  • TypeError – Invalid input type.

  • ValueError – Invalid input value.

delete_consumer_groups(group_ids, **kwargs)[source]

Delete the given consumer groups.

Parameters
  • group_ids (list(str)) – List of group_ids which need to be deleted.

  • request_timeout (float) – The overall request timeout in seconds, including broker lookup, request transmission, operation time on broker, and response. Default: socket.timeout.ms/1000.0

Returns

A dict of futures for each group, keyed by the group_id. The future result() method returns None.

Return type

dict[str, future]

Raises
  • KafkaException – Operation failed locally or on broker.

  • TypeError – Invalid input type.

  • ValueError – Invalid input value.

list_consumer_group_offsets(list_consumer_group_offsets_request, **kwargs)[source]

List offset information for the consumer group and (optional) topic partition provided in the request.

Note

Currently, the API supports only a single group.

Parameters
  • list_consumer_group_offsets_request (list(ConsumerGroupTopicPartitions)) – List of ConsumerGroupTopicPartitions which consist of group name and topic partition information for which offset detail is expected. If only group name is provided, then offset information of all the topic and partition associated with that group is returned.

  • require_stable (bool) – If True, fetches stable offsets. Default: False

  • request_timeout (float) – The overall request timeout in seconds, including broker lookup, request transmission, operation time on broker, and response. Default: socket.timeout.ms/1000.0

Returns

A dict of futures for each group, keyed by the group id. The future result() method returns ConsumerGroupTopicPartitions.

Return type

dict[str, future]

Raises
  • KafkaException – Operation failed locally or on broker.

  • TypeException – Invalid input.

  • ValueException – Invalid input.

alter_consumer_group_offsets(alter_consumer_group_offsets_request, **kwargs)[source]

Alter offset for the consumer group and topic partition provided in the request.

Note

Currently, the API supports only a single group.

Parameters
  • alter_consumer_group_offsets_request (list(ConsumerGroupTopicPartitions)) – List of ConsumerGroupTopicPartitions which consist of group name and topic partition; and corresponding offset to be updated.

  • request_timeout (float) – The overall request timeout in seconds, including broker lookup, request transmission, operation time on broker, and response. Default: socket.timeout.ms/1000.0

Returns

A dict of futures for each group, keyed by the group id. The future result() method returns ConsumerGroupTopicPartitions.

Return type

dict[ConsumerGroupTopicPartitions, future]

Raises
  • KafkaException – Operation failed locally or on broker.

  • TypeException – Invalid input.

  • ValueException – Invalid input.

set_sasl_credentials(username, password)[source]

Sets the SASL credentials used for this client. These credentials will overwrite the old ones, and will be used the next time the client needs to authenticate. This method will not disconnect existing broker connections that have been established with the old credentials. This method is applicable only to SASL PLAIN and SCRAM mechanisms.

Parameters
  • username (str) – The username to set.

  • password (str) – The password to set.

Return type

None

Raises
  • KafkaException – Operation failed locally or on broker.

  • TypeException – Invalid input.

describe_user_scram_credentials(users=None, **kwargs)[source]

Describe user SASL/SCRAM credentials.

Parameters
  • users (list(str)) – List of user names to describe. Duplicate users aren’t allowed. Can be None to describe all user’s credentials.

  • request_timeout (float) – The overall request timeout in seconds, including broker lookup, request transmission, operation time on broker, and response. Default: socket.timeout.ms/1000.0

Returns

In case None is passed it returns a single future. The future yields a dict[str, UserScramCredentialsDescription] or raises a KafkaException

In case a list of user names is passed, it returns a dict[str, future[UserScramCredentialsDescription]]. The futures yield a UserScramCredentialsDescription or raise a KafkaException

Return type

Union[future[dict[str, UserScramCredentialsDescription]], dict[str, future[UserScramCredentialsDescription]]]

Raises
  • TypeError – Invalid input type.

  • ValueError – Invalid input value.

alter_user_scram_credentials(alterations, **kwargs)[source]

Alter user SASL/SCRAM credentials.

Parameters
  • alterations (list(UserScramCredentialAlteration)) – List of UserScramCredentialAlteration to apply. The pair (user, mechanism) must be unique among alterations.

  • request_timeout (float) – The overall request timeout in seconds, including broker lookup, request transmission, operation time on broker, and response. Default: socket.timeout.ms/1000.0

Returns

A dict of futures keyed by user name. The future result() method returns None or raises KafkaException

Return type

dict[str, future]

Raises
  • TypeError – Invalid input type.

  • ValueError – Invalid input value.

list_offsets(topic_partition_offsets, **kwargs)[source]

Enables to find the beginning offset, end offset as well as the offset matching a timestamp or the offset with max timestamp in partitions.

Parameters
  • OffsetSpec]) topic_partition_offsets (dict([TopicPartition,) – Dictionary of TopicPartition objects associated with the corresponding OffsetSpec to query for.

  • isolation_level (IsolationLevel) – The isolation level to use when querying.

  • request_timeout (float) – The overall request timeout in seconds, including broker lookup, request transmission, operation time on broker, and response. Default: socket.timeout.ms/1000.0

Returns

A dict of futures keyed by TopicPartition. The future result() method returns ListOffsetsResultInfo raises KafkaException

Return type

dict[TopicPartition, future]

Raises
  • TypeError – Invalid input type.

  • ValueError – Invalid input value.

delete_records(topic_partition_offsets, **kwargs)[source]

Deletes all the records before the specified offsets (not including), in the specified topics and partitions.

Parameters
  • topic_partition_offsets (list(TopicPartition)) – A list of TopicPartition objects having offset field set to the offset before which all the records should be deleted. offset can be set to OFFSET_END (-1) to delete all records in the partition.

  • request_timeout (float) – The overall request timeout in seconds, including broker lookup, request transmission, operation time on broker, and response. Default: socket.timeout.ms/1000.0

  • operation_timeout (float) – The operation timeout in seconds, controlling how long the delete_records request will block on the broker waiting for the record deletion to propagate in the cluster. A value of 0 returns immediately. Default: socket.timeout.ms/1000.0

Returns

A dict of futures keyed by the TopicPartition. The future result() method returns DeletedRecords or raises KafkaException

Return type

dict[TopicPartition, future]

Raises
  • KafkaException – Operation failed locally or on broker.

  • TypeError – Invalid input type.

  • ValueError – Invalid input value.

elect_leaders(election_type, partitions=None, **kwargs)[source]

Perform Preferred or Unclean leader election for all the specified partitions or all partitions in the cluster.

Parameters
  • election_type (ElectionType) – The type of election to perform.

  • partitions (List[TopicPartition]|None) – The topic partitions to perform the election on. Use None to perform on all the topic partitions.

  • request_timeout (float) – The overall request timeout in seconds, including broker lookup, request transmission, operation time on broker, and response. Default: socket.timeout.ms*1000.0

  • operation_timeout (float) – The operation timeout in seconds, controlling how long the ‘elect_leaders’ request will block on the broker waiting for the election to propagate in the cluster. A value of 0 returns immediately. Default: socket.timeout.ms/1000.0

Returns

A future. Method result() of the future returns dict[TopicPartition, KafkaException|None].

Return type

future

Raises
  • KafkaException – Operation failed locally or on broker.

  • TypeError – Invalid input type.

  • ValueError – Invalid input value.

NewTopic

class confluent_kafka.admin.NewTopic

NewTopic specifies per-topic settings for passing to AdminClient.create_topics().

NewTopic(topic[, num_partitions][, replication_factor][, replica_assignment][, config])

Instantiate a NewTopic object.

Parameters
  • topic (string) – Topic name

  • num_partitions (int) – Number of partitions to create, or -1 if replica_assignment is used.

  • replication_factor (int) – Replication factor of partitions, or -1 if replica_assignment is used.

  • replica_assignment (list) – List of lists with the replication assignment for each new partition.

  • config (dict) – Dict (str:str) of topic configuration. See http://kafka.apache.org/documentation.html#topicconfigs

Return type

NewTopic

config

attribute: Optional topic configuration. See http://kafka.apache.org/documentation.html#topicconfigs.

Type

py

num_partitions

attribute: Number of partitions (int). Or -1 if a replica_assignment is specified

Type

py

replica_assignment

attribute: Replication assignment (list of lists). The outer list index represents the partition index, the inner list is the replica assignment (broker ids) for that partition. replication_factor and replica_assignment are mutually exclusive.

Type

py

replication_factor

attribute: Replication factor (int). Must be set to -1 if a replica_assignment is specified.

Type

py

topic

attribute:topic - Topic name (string)

Type

py

NewPartitions

class confluent_kafka.admin.NewPartitions

NewPartitions specifies per-topic settings for passing to passed to AdminClient.create_partitions().

NewPartitions(topic, new_total_count[, replica_assignment])

Instantiate a NewPartitions object.

Parameters
  • topic (string) – Topic name

  • new_total_count (int) – Increase the topic’s partition count to this value.

  • replica_assignment (list) – List of lists with the replication assignment for each new partition.

Return type

NewPartitions

new_total_count

attribute: Total number of partitions (int)

Type

py

replica_assignment

attribute: Replication assignment (list of lists). The outer list index represents the partition index, the inner list is the replica assignment (broker ids) for that partition.

Type

py

topic

attribute:topic - Topic name (string)

Type

py

ConfigSource

class confluent_kafka.admin.ConfigSource(value)[source]

Enumerates the different sources of configuration properties. Used by ConfigEntry to specify the source of configuration properties returned by describe_configs().

UNKNOWN_CONFIG = 0

Unknown

DYNAMIC_TOPIC_CONFIG = 1

Dynamic Topic

DYNAMIC_BROKER_CONFIG = 2

Dynamic Broker

DYNAMIC_DEFAULT_BROKER_CONFIG = 3

Dynamic Default Broker

STATIC_BROKER_CONFIG = 4

Static Broker

DEFAULT_CONFIG = 5

Default

ConfigEntry

class confluent_kafka.admin.ConfigEntry(name, value, source=<ConfigSource.UNKNOWN_CONFIG: 0>, is_read_only=False, is_default=False, is_sensitive=False, is_synonym=False, synonyms=[], incremental_operation=None)[source]

Represents a configuration property. Returned by describe_configs() for each configuration entry of the specified resource.

This class is typically not user instantiated.

name

Configuration property name.

value

Configuration value (or None if not set or is_sensitive==True. Ignored when altering configurations incrementally if incremental_operation is DELETE).

source

Configuration source.

is_read_only

Indicates whether the configuration property is read-only.

is_default

Indicates whether the configuration property is using its default value.

is_sensitive

Indicates whether the configuration property value contains sensitive information (such as security settings), in which case .value is None.

is_synonym

Indicates whether the configuration property is a synonym for the parent configuration entry.

synonyms

A list of synonyms (ConfigEntry) and alternate sources for this configuration property.

incremental_operation

The incremental operation (AlterConfigOpType) to use in incremental_alter_configs.

ConfigResource

class confluent_kafka.admin.ConfigResource(restype, name, set_config=None, described_configs=None, error=None, incremental_configs=None)[source]

Represents a resource that has configuration, and (optionally) a collection of configuration properties for that resource. Used by describe_configs() and alter_configs().

Parameters
  • restype (ConfigResource.Type) – The resource type.

  • name (str) – The resource name, which depends on the resource type. For RESOURCE_BROKER, the resource name is the broker id.

  • set_config (dict) – The configuration to set/overwrite. Dictionary of str, str.

Type

alias of confluent_kafka.admin._resource.ResourceType

set_config(name, value, overwrite=True)[source]

Set/overwrite a configuration value.

When calling alter_configs, any configuration properties that are not included in the request will be reverted to their default values. As a workaround, use describe_configs() to retrieve the current configuration and overwrite the settings you want to change.

Parameters
  • name (str) – Configuration property name

  • value (str) – Configuration value

  • overwrite (bool) – If True, overwrite entry if it already exists (default). If False, do nothing if entry already exists.

add_incremental_config(config_entry)[source]

Add a ConfigEntry for incremental alter configs, using the configured incremental_operation.

Parameters

config_entry (ConfigEntry) – config entry to incrementally alter.

ResourceType

class confluent_kafka.admin.ResourceType(value)[source]

Enumerates the different types of Kafka resources.

UNKNOWN = 0

Resource type is not known or not set.

ANY = 1

Match any resource, used for lookups.

TOPIC = 2

Topic resource. Resource name is topic name.

GROUP = 3

Group resource. Resource name is group.id.

BROKER = 4

Broker resource. Resource name is broker id.

TRANSACTIONAL_ID = 5

Transactional ID resource.

ResourcePatternType

class confluent_kafka.admin.ResourcePatternType(value)[source]

Enumerates the different types of Kafka resource patterns.

UNKNOWN = 0

Resource pattern type is not known or not set.

ANY = 1

Match any resource, used for lookups.

MATCH = 2

will perform pattern matching

Type

Match

LITERAL = 3

A literal resource name

Type

Literal

PREFIXED = 4

A prefixed resource name

Type

Prefixed

AlterConfigOpType

class confluent_kafka.admin.AlterConfigOpType(value)[source]

Set of incremental operations that can be used with incremental alter configs.

SET = 0

Set the value of the configuration entry.

DELETE = 1

Revert the configuration entry to the default value (possibly null).

APPEND = 2

(For list-type configuration entries only.) Add the specified values to the current list of values of the configuration entry.

SUBTRACT = 3

(For list-type configuration entries only.) Removes the specified values from the current list of values of the configuration entry.

AclOperation

class confluent_kafka.admin.AclOperation(value)[source]

Enumerates the different types of ACL operation.

UNKNOWN = 0

Unknown

ANY = 1

In a filter, matches any AclOperation

ALL = 2

ALL the operations

READ = 3

READ operation

WRITE = 4

WRITE operation

CREATE = 5

CREATE operation

DELETE = 6

DELETE operation

ALTER = 7

ALTER operation

DESCRIBE = 8

DESCRIBE operation

CLUSTER_ACTION = 9

CLUSTER_ACTION operation

DESCRIBE_CONFIGS = 10

DESCRIBE_CONFIGS operation

ALTER_CONFIGS = 11

ALTER_CONFIGS operation

IDEMPOTENT_WRITE = 12

IDEMPOTENT_WRITE operation

AclPermissionType

class confluent_kafka.admin.AclPermissionType(value)[source]

Enumerates the different types of ACL permission types.

UNKNOWN = 0

Unknown

ANY = 1

In a filter, matches any AclPermissionType

DENY = 2

Disallows access

ALLOW = 3

Grants access

AclBinding

class confluent_kafka.admin.AclBinding(restype, name, resource_pattern_type, principal, host, operation, permission_type)[source]

Represents an ACL binding that specify the operation and permission type for a specific principal over one or more resources of the same type. Used by AdminClient.create_acls(), returned by AdminClient.describe_acls() and AdminClient.delete_acls().

Parameters
  • restype (ResourceType) – The resource type.

  • name (str) – The resource name, which depends on the resource type. For ResourceType.BROKER, the resource name is the broker id.

  • resource_pattern_type (ResourcePatternType) – The resource pattern, relative to the name.

  • principal (str) – The principal this AclBinding refers to.

  • host (str) – The host that the call is allowed to come from.

  • operation (AclOperation) – The operation/s specified by this binding.

  • permission_type (AclPermissionType) – The permission type for the specified operation.

AclBindingFilter

class confluent_kafka.admin.AclBindingFilter(restype, name, resource_pattern_type, principal, host, operation, permission_type)[source]

Represents an ACL binding filter used to return a list of ACL bindings matching some or all of its attributes. Used by AdminClient.describe_acls() and AdminClient.delete_acls().

Parameters

ScramMechanism

class confluent_kafka.admin.ScramMechanism(value)[source]

Enumerates SASL/SCRAM mechanisms.

UNKNOWN = 0

Unknown SASL/SCRAM mechanism

SCRAM_SHA_256 = 1

SCRAM-SHA-256 mechanism

SCRAM_SHA_512 = 2

SCRAM-SHA-512 mechanism

ScramCredentialInfo

class confluent_kafka.admin.ScramCredentialInfo(mechanism, iterations)[source]

Contains mechanism and iterations for a SASL/SCRAM credential associated with a user.

Parameters
  • mechanism (ScramMechanism) – SASL/SCRAM mechanism.

  • iterations (int) – Positive number of iterations used when creating the credential.

UserScramCredentialsDescription

class confluent_kafka.admin.UserScramCredentialsDescription(user, scram_credential_infos)[source]

Represent all SASL/SCRAM credentials associated with a user that can be retrieved.

Parameters
  • user (str) – The user name.

  • scram_credential_infos (list(ScramCredentialInfo)) – SASL/SCRAM credential representations for the user.

UserScramCredentialAlteration

class confluent_kafka.admin.UserScramCredentialAlteration(user: str)[source]

Base class for SCRAM credential alterations.

Parameters

user (str) – The user name.

UserScramCredentialUpsertion

class confluent_kafka.admin.UserScramCredentialUpsertion(user, scram_credential_info, password, salt=None)[source]

A request to update/insert a SASL/SCRAM credential for a user.

Parameters
  • user (str) – The user name.

  • scram_credential_info (ScramCredentialInfo) – The mechanism and iterations.

  • password (bytes) – Password to HMAC before storage.

  • salt (bytes) – Salt to use. Will be generated randomly if None. (optional)

UserScramCredentialDeletion

class confluent_kafka.admin.UserScramCredentialDeletion(user, mechanism)[source]

A request to delete a SASL/SCRAM credential for a user.

Parameters
  • user (str) – The user name.

  • mechanism (ScramMechanism) – SASL/SCRAM mechanism.

OffsetSpec

class confluent_kafka.admin.OffsetSpec(index)[source]

Used in AdminClient.list_offsets to specify the desired offsets of the partition being queried.

ListOffsetsResultInfo

class confluent_kafka.admin.ListOffsetsResultInfo(offset, timestamp, leader_epoch)[source]

Result of a AdminClient.list_offsets call associated to a partition.

Parameters
  • offset (int) – The offset returned by the list_offsets call.

  • timestamp (int) – The timestamp in milliseconds corresponding to the offset. Not available (-1) when querying for the earliest or the latest offsets.

  • leader_epoch (int) – The leader epoch corresponding to the offset (optional).

TopicDescription

class confluent_kafka.admin.TopicDescription(name, topic_id, is_internal, partitions, authorized_operations=None)[source]

Represents topic description information for a topic used in describe topic operation. Used by AdminClient.describe_topics().

Parameters
  • name (str) – The topic name.

  • topic_id (Uuid) – The topic id of the topic

  • is_internal – Whether the topic is internal or not

  • partitions (list(TopicPartitionInfo)) – Partition information.

  • authorized_operations (list(AclOperation)) – AclOperations allowed for the topic.

DescribeClusterResult

class confluent_kafka.admin.DescribeClusterResult(controller, nodes, cluster_id=None, authorized_operations=None)[source]

Represents cluster description information used in describe cluster operation. Used by AdminClient.describe_cluster().

Parameters
  • controller (Node) – The current controller in the cluster.

  • nodes (list(Node)) – Information about each node in the cluster.

  • cluster_id (str) – The current cluster id in the cluster.

  • authorized_operations (list(AclOperation)) – AclOperations allowed for the cluster.

BrokerMetadata

class confluent_kafka.admin.BrokerMetadata[source]

Provides information about a Kafka broker.

This class is typically not user instantiated.

id

Broker id

host

Broker hostname

port

Broker port

ClusterMetadata

class confluent_kafka.admin.ClusterMetadata[source]

Provides information about the Kafka cluster, brokers, and topics. Returned by list_topics().

This class is typically not user instantiated.

cluster_id

Cluster id string, if supported by the broker, else None.

controller_id

Current controller broker id, or -1.

brokers

Map of brokers indexed by the broker id (int). Value is a BrokerMetadata object.

topics

Map of topics indexed by the topic name. Value is a TopicMetadata object.

orig_broker_id

The broker this metadata originated from.

orig_broker_name

The broker name/address this metadata originated from.

GroupMember

class confluent_kafka.admin.GroupMember[source]

Provides information about a group member.

For more information on the metadata format, refer to: A Guide To The Kafka Protocol.

This class is typically not user instantiated.

id

Member id (generated by broker).

client_id

Client id.

client_host

Client hostname.

metadata

Member metadata(binary), format depends on protocol type.

assignment

Member assignment(binary), format depends on protocol type.

GroupMetadata

class confluent_kafka.admin.GroupMetadata[source]

GroupMetadata provides information about a Kafka consumer group

This class is typically not user instantiated.

broker

Originating broker metadata.

id

Group name.

error

Broker-originated error, or None. Value is a KafkaError object.

state

Group state.

protocol_type

Group protocol type.

protocol

Group protocol.

members

Group members.

PartitionMetadata

class confluent_kafka.admin.PartitionMetadata[source]

Provides information about a Kafka partition.

This class is typically not user instantiated.

Warning

Depending on cluster state the broker ids referenced in leader, replicas and ISRs may temporarily not be reported in ClusterMetadata.brokers. Always check the availability of a broker id in the brokers dict.

id

Partition id.

leader

Current leader broker for this partition, or -1.

replicas

List of replica broker ids for this partition.

isrs

List of in-sync-replica broker ids for this partition.

error

Partition error, or None. Value is a KafkaError object.

TopicMetadata

class confluent_kafka.admin.TopicMetadata[source]

Provides information about a Kafka topic.

This class is typically not user instantiated.

topic

Topic name

partitions

Map of partitions indexed by partition id. Value is a PartitionMetadata object.

error

Topic error, or None. Value is a KafkaError object.

ConsumerGroupListing

class confluent_kafka.admin.ConsumerGroupListing(group_id, is_simple_consumer_group, state=None, type=None)[source]

Represents consumer group listing information for a group used in list consumer group operation. Used by ListConsumerGroupsResult.

Parameters
  • group_id (str) – The consumer group id.

  • is_simple_consumer_group (bool) – Whether a consumer group is simple or not.

  • state (ConsumerGroupState) – Current state of the consumer group.

  • type (ConsumerGroupType) – Type of the consumer group.

ListConsumerGroupsResult

class confluent_kafka.admin.ListConsumerGroupsResult(valid=None, errors=None)[source]

Represents result of List Consumer Group operation. Used by AdminClient.list_consumer_groups().

Parameters
  • valid (list(ConsumerGroupListing)) – List of successful consumer group listing responses.

  • errors (list(KafkaException)) – List of errors encountered during the operation, if any.

ConsumerGroupDescription

class confluent_kafka.admin.ConsumerGroupDescription(group_id, is_simple_consumer_group, members, partition_assignor, state, coordinator, authorized_operations=None)[source]

Represents consumer group description information for a group used in describe consumer group operation. Used by AdminClient.describe_consumer_groups().

Parameters
  • group_id (str) – The consumer group id.

  • is_simple_consumer_group (bool) – Whether a consumer group is simple or not.

  • members (list(MemberDescription)) – Description of the members of the consumer group.

  • partition_assignor (str) – Partition assignor.

  • state (ConsumerGroupState) – Current state of the consumer group.

  • coordinator (Node) – Consumer group coordinator.

  • authorized_operations (list(AclOperation)) – AclOperations allowed for the consumer group.

DeletedRecords

class confluent_kafka.admin.DeletedRecords(low_watermark)[source]

Represents information about deleted records.

Parameters

low_watermark (int) – The “low watermark” for the topic partition on which the deletion was executed.

MemberAssignment

class confluent_kafka.admin.MemberAssignment(topic_partitions=[])[source]

Represents member assignment information. Used by MemberDescription.

Parameters

topic_partitions (list(TopicPartition)) – The topic partitions assigned to a group member.

MemberDescription

class confluent_kafka.admin.MemberDescription(member_id, client_id, host, assignment, group_instance_id=None)[source]

Represents member information. Used by ConsumerGroupDescription.

Parameters
  • member_id (str) – The consumer id of the group member.

  • client_id (str) – The client id of the group member.

  • host (str) – The host where the group member is running.

  • assignment (MemberAssignment) – The assignment of the group member

  • group_instance_id (str) – The instance id of the group member.

Consumer

class confluent_kafka.Consumer

A high-level Apache Kafka consumer

Consumer(config)

Create a new Consumer instance using the provided configuration dict (including properties and callback functions). See Kafka Client Configuration for more information.

Parameters

config (dict) – Configuration properties. At a minimum, group.id must be set and bootstrap.servers should be set.

assign()
assign(partitions)

Set the consumer partition assignment to the provided list of TopicPartition and start consuming.

Parameters

partitions (list(TopicPartition)) – List of topic+partitions and optionally initial offsets to start consuming from.

Raises

KafkaException

Raises

RuntimeError if called on a closed consumer

assignment()

Returns the current partition assignment.

Returns

List of assigned topic+partitions.

Return type

list(TopicPartition)

Raises

KafkaException

Raises

RuntimeError if called on a closed consumer

close()

Close down and terminate the Kafka Consumer.

Actions performed:

  • Stops consuming.

  • Commits offsets, unless the consumer property ‘enable.auto.commit’ is set to False.

  • Leaves the consumer group.

Return type

None

commit()
commit([message=None][, offsets=None][, asynchronous=True])

Commit a message or a list of offsets.

The message and offsets parameters are mutually exclusive. If neither is set, the current partition assignment’s offsets are used instead. Use this method to commit offsets if you have ‘enable.auto.commit’ set to False.

Parameters
  • message (confluent_kafka.Message) – Commit the message’s offset+1. Note: By convention, committed offsets reflect the next message to be consumed, not the last message consumed.

  • offsets (list(TopicPartition)) – List of topic+partitions+offsets to commit.

  • asynchronous (bool) – If true, asynchronously commit, returning None immediately. If False, the commit() call will block until the commit succeeds or fails and the committed offsets will be returned (on success). Note that specific partitions may have failed and the .err field of each partition should be checked for success.

Return type

None|list(TopicPartition)

Raises

KafkaException

Raises

RuntimeError if called on a closed consumer

committed()
committed(partitions[, timeout=None])

Retrieve committed offsets for the specified partitions.

Parameters
  • partitions (list(TopicPartition)) – List of topic+partitions to query for stored offsets.

  • timeout (float) – Request timeout (seconds).

Returns

List of topic+partitions with offset and possibly error set.

Return type

list(TopicPartition)

Raises

KafkaException

Raises

RuntimeError if called on a closed consumer

consume()
consume([num_messages=1][, timeout=-1])

Consumes a list of messages (possibly empty on timeout). Callbacks may be executed as a side effect of calling this method.

The application must check the returned Message object’s Message.error() method to distinguish between proper messages (error() returns None) and errors for each Message in the list (see error().code() for specifics). If the enable.partition.eof configuration property is set to True, partition EOF events will also be exposed as Messages with error().code() set to _PARTITION_EOF.

Parameters
  • num_messages (int) – The maximum number of messages to return (default: 1).

  • timeout (float) – The maximum time to block waiting for message, event or callback (default: infinite (-1)). (Seconds)

Returns

A list of Message objects (possibly empty on timeout)

Return type

list(Message)

Raises
  • RuntimeError – if called on a closed consumer

  • KafkaError – in case of internal error

  • ValueError – if num_messages > 1M

consumer_group_metadata()
consumer_group_metadata()
Returns

An opaque object representing the consumer’s current group metadata for passing to the transactional producer’s send_offsets_to_transaction() API.

get_watermark_offsets()
get_watermark_offsets(partition[, timeout=None][, cached=False])

Retrieve low and high offsets for the specified partition.

Parameters
  • partition (TopicPartition) – Topic+partition to return offsets for.

  • timeout (float) – Request timeout (seconds). Ignored if cached=True.

  • cached (bool) – Instead of querying the broker, use cached information. Cached values: The low offset is updated periodically (if statistics.interval.ms is set) while the high offset is updated on each message fetched from the broker for this partition.

Returns

Tuple of (low,high) on success or None on timeout. The high offset is the offset of the last message + 1.

Return type

tuple(int,int)

Raises

KafkaException

Raises

RuntimeError if called on a closed consumer

incremental_assign()
incremental_assign(partitions)

Incrementally add the provided list of TopicPartition to the current partition assignment. This list must not contain duplicate entries, or any entry corresponding to an already assigned partition. When a COOPERATIVE assignor (i.e. incremental rebalancing) is being used, this method may be used in the on_assign callback to update the current assignment and specify start offsets. The application should pass a list of partitions identical to the list passed to the callback, even if the list is empty. Note that if you do not call incremental_assign in your on_assign handler, this will be done automatically and start offsets will be the last committed offsets, or determined via the auto offset reset policy (auto.offset.reset) if there are none. This method may also be used outside the context of a rebalance callback.

Parameters

partitions (list(TopicPartition)) – List of topic+partitions and optionally initial offsets to start consuming from.

Raises

KafkaException

Raises

RuntimeError if called on a closed consumer

incremental_unassign()
incremental_unassign(partitions)

Incrementally remove the provided list of TopicPartition from the current partition assignment. This list must not contain dupliate entries and all entries specified must be part of the current assignment. When a COOPERATIVE assignor (i.e. incremental rebalancing) is being used, this method may be used in the on_revoke or on_lost callback to update the current assignment. The application should pass a list of partitions identical to the list passed to the callback. This method may also be used outside the context of a rebalance callback. The value of the TopicPartition offset field is ignored by this method.

Parameters

partitions (list(TopicPartition)) – List of topic+partitions to remove from the current assignment.

Raises

KafkaException

Raises

RuntimeError if called on a closed consumer

list_topics()
list_topics([topic=None][, timeout=-1])

Request metadata from the cluster. This method provides the same information as listTopics(), describeTopics() and describeCluster() in the Java Admin client.

Parameters
  • topic (str) – If specified, only request information about this topic, else return results for all topics in cluster. Warning: If auto.create.topics.enable is set to true on the broker and an unknown topic is specified, it will be created.

  • timeout (float) – The maximum response time before timing out, or -1 for infinite timeout.

Return type

ClusterMetadata

Raises

KafkaException

memberid()
memberid()

Return this client’s broker-assigned group member id.

The member id is assigned by the group coordinator and is propagated to the consumer during rebalance.

returns

Member id string or None

rtype

string

raises

RuntimeError if called on a closed consumer

offsets_for_times()
offsets_for_times(partitions[, timeout=None])

Look up offsets by timestamp for the specified partitions.

The returned offset for each partition is the earliest offset whose timestamp is greater than or equal to the given timestamp in the corresponding partition. If the provided timestamp exceeds that of the last message in the partition, a value of -1 will be returned.

param list(TopicPartition) partitions

topic+partitions with timestamps in the TopicPartition.offset field.

param float timeout

Request timeout (seconds).

returns

List of topic+partition with offset field set and possibly error set

rtype

list(TopicPartition)

raises

KafkaException

raises

RuntimeError if called on a closed consumer

pause()
pause(partitions)

Pause consumption for the provided list of partitions.

Parameters

partitions (list(TopicPartition)) – List of topic+partitions to pause.

Return type

None

Raises

KafkaException

poll()
poll([timeout=None])

Consumes a single message, calls callbacks and returns events.

The application must check the returned Message object’s Message.error() method to distinguish between proper messages (error() returns None), or an event or error (see error().code() for specifics).

Parameters

timeout (float) – Maximum time to block waiting for message, event or callback (default: infinite (None translated into -1 in the library)). (Seconds)

Returns

A Message object or None on timeout

Return type

Message or None

Raises

RuntimeError if called on a closed consumer

position()
position(partitions)

Retrieve current positions (offsets) for the specified partitions.

Parameters

partitions (list(TopicPartition)) – List of topic+partitions to return current offsets for. The current offset is the offset of the last consumed message + 1.

Returns

List of topic+partitions with offset and possibly error set.

Return type

list(TopicPartition)

Raises

KafkaException

Raises

RuntimeError if called on a closed consumer

resume()
resume(partitions)

Resume consumption for the provided list of partitions.

Parameters

partitions (list(TopicPartition)) – List of topic+partitions to resume.

Return type

None

Raises

KafkaException

seek()
seek(partition)

Set consume position for partition to offset. The offset may be an absolute (>=0) or a logical offset (OFFSET_BEGINNING et.al).

seek() may only be used to update the consume offset of an actively consumed partition (i.e., after assign()), to set the starting offset of partition not being consumed instead pass the offset in an assign() call.

Parameters

partition (TopicPartition) – Topic+partition+offset to seek to.

Raises

KafkaException

set_sasl_credentials()
set_sasl_credentials(username, password)

Sets the SASL credentials used for this client. These credentials will overwrite the old ones, and will be used the next time the client needs to authenticate. This method will not disconnect existing broker connections that have been established with the old credentials. This method is applicable only to SASL PLAIN and SCRAM mechanisms.

store_offsets()
store_offsets([message=None][, offsets=None])

Store offsets for a message or a list of offsets.

message and offsets are mutually exclusive. The stored offsets will be committed according to ‘auto.commit.interval.ms’ or manual offset-less commit(). Note that ‘enable.auto.offset.store’ must be set to False when using this API.

Parameters
Return type

None

Raises

KafkaException

Raises

RuntimeError if called on a closed consumer

subscribe()
subscribe(topics[, on_assign=None][, on_revoke=None][, on_lost=None])

Set subscription to supplied list of topics This replaces a previous subscription.

Regexp pattern subscriptions are supported by prefixing the topic string with "^", e.g.:

consumer.subscribe(["^my_topic.*", "^another[0-9]-?[a-z]+$", "not_a_regex"])
Parameters
  • topics (list(str)) – List of topics (strings) to subscribe to.

  • on_assign (callable) – callback to provide handling of customized offsets on completion of a successful partition re-assignment.

  • on_revoke (callable) – callback to provide handling of offset commits to a customized store on the start of a rebalance operation.

  • on_lost (callable) – callback to provide handling in the case the partition assignment has been lost. If not specified, lost partition events will be delivered to on_revoke, if specified. Partitions that have been lost may already be owned by other members in the group and therefore committing offsets, for example, may fail.

Raises

KafkaException

Raises

RuntimeError if called on a closed consumer

on_assign(consumer, partitions)
on_revoke(consumer, partitions)
on_lost(consumer, partitions)
Parameters
  • consumer (Consumer) – Consumer instance.

  • partitions (list(TopicPartition)) – Absolute list of partitions being assigned or revoked.

unassign()

Removes the current partition assignment and stops consuming.

Raises
unsubscribe()

Remove current subscription.

Raises

KafkaException

Raises

RuntimeError if called on a closed consumer

DeserializingConsumer (experimental)

class confluent_kafka.DeserializingConsumer(conf)[source]

A high level Kafka consumer with deserialization capabilities.

This class is experimental and likely to be removed, or subject to incompatible API changes in future versions of the library. To avoid breaking changes on upgrading, we recommend using deserializers directly.

Derived from the Consumer class, overriding the Consumer.poll() method to add deserialization capabilities.

Additional configuration properties:

Property Name

Type

Description

key.deserializer

callable

Callable(bytes, SerializationContext) -> obj

Deserializer used for message keys.

value.deserializer

callable

Callable(bytes, SerializationContext) -> obj

Deserializer used for message values.

Deserializers for string, integer and double (StringDeserializer, IntegerDeserializer and DoubleDeserializer) are supplied out-of-the-box in the confluent_kafka.serialization namespace.

Deserializers for Protobuf, JSON Schema and Avro (ProtobufDeserializer, JSONDeserializer and AvroDeserializer) with Confluent Schema Registry integration are supplied out-of-the-box in the confluent_kafka.schema_registry namespace.

See also

  • The Configuration Guide for in depth information on how to configure the client.

  • CONFIGURATION.md for a comprehensive set of configuration properties.

  • STATISTICS.md for detailed information on the statistics provided by stats_cb

  • The Consumer class for inherited methods.

Parameters

conf (dict) – DeserializingConsumer configuration.

Raises

ValueError – if configuration validation fails

Inherited-members

poll(timeout=- 1)[source]

Consume messages and calls callbacks.

Parameters

timeout (float) – Maximum time to block waiting for message(Seconds).

Returns

Message or None on timeout

Raises
consume(num_messages=1, timeout=- 1)[source]

Consumer.consume() not implemented, use DeserializingConsumer.poll() instead

Producer

class confluent_kafka.Producer

Asynchronous Kafka Producer

Producer(config)
Parameters

config (dict) – Configuration properties. At a minimum bootstrap.servers should be set

Create a new Producer instance using the provided configuration dict.

__len__(self)

Producer implements __len__ that can be used as len(producer) to obtain number of messages waiting. :returns: Number of messages and Kafka protocol requests waiting to be delivered to broker. :rtype: int

abort_transaction()
abort_transaction([timeout])

Aborts the current transaction. This function should also be used to recover from non-fatal abortable transaction errors when KafkaError.txn_requires_abort() is True.

Any outstanding messages will be purged and fail with _PURGE_INFLIGHT or _PURGE_QUEUE.

Note: This function will block until all outstanding messages are purged and the transaction abort request has been successfully handled by the transaction coordinator, or until the timeout expires, which ever comes first. On timeout the application may call the function again.

Note: Will automatically call purge() and flush() to ensure all queued and in-flight messages are purged before attempting to abort the transaction.

Parameters

timeout (float) – The maximum amount of time to block waiting for transaction to abort in seconds.

Raises

KafkaError: Use exc.args[0].retriable() to check if the operation may be retried. Treat any other error as a fatal error.

begin_transaction()
begin_transaction()

Begin a new transaction.

init_transactions() must have been called successfully (once) before this function is called.

Any messages produced or offsets sent to a transaction, after the successful return of this function will be part of the transaction and committed or aborted atomically.

Complete the transaction by calling commit_transaction() or Abort the transaction by calling abort_transaction().

Raises

KafkaError: Use exc.args[0].retriable() to check if the operation may be retried, else treat the error as a fatal error.

commit_transaction()
commit_transaction([timeout])

Commmit the current transaction. Any outstanding messages will be flushed (delivered) before actually committing the transaction.

If any of the outstanding messages fail permanently the current transaction will enter the abortable error state and this function will return an abortable error, in this case the application must call abort_transaction() before attempting a new transaction with begin_transaction().

Note: This function will block until all outstanding messages are delivered and the transaction commit request has been successfully handled by the transaction coordinator, or until the timeout expires, which ever comes first. On timeout the application may call the function again.

Note: Will automatically call flush() to ensure all queued messages are delivered before attempting to commit the transaction. Delivery reports and other callbacks may thus be triggered from this method.

Parameters

timeout (float) – The amount of time to block in seconds.

Raises

KafkaError: Use exc.args[0].retriable() to check if the operation may be retried, or exc.args[0].txn_requires_abort() if the current transaction has failed and must be aborted by calling abort_transaction() and then start a new transaction with begin_transaction(). Treat any other error as a fatal error.

flush()
flush([timeout])

Wait for all messages in the Producer queue to be delivered. This is a convenience method that calls poll() until len() is zero or the optional timeout elapses.

Param

float timeout: Maximum time to block (requires librdkafka >= v0.9.4). (Seconds)

Returns

Number of messages still in queue.

Note

See poll() for a description on what callbacks may be triggered.

init_transactions()

Initializes transactions for the producer instance.

This function ensures any transactions initiated by previous instances of the producer with the same transactional.id are completed. If the previous instance failed with a transaction in progress the previous transaction will be aborted. This function needs to be called before any other transactional or produce functions are called when the transactional.id is configured.

If the last transaction had begun completion (following transaction commit) but not yet finished, this function will await the previous transaction’s completion.

When any previous transactions have been fenced this function will acquire the internal producer id and epoch, used in all future transactional messages issued by this producer instance.

Upon successful return from this function the application has to perform at least one of the following operations within transaction.timeout.ms to avoid timing out the transaction on the broker: * produce() (et.al) * send_offsets_to_transaction() * commit_transaction() * abort_transaction()

Parameters

timeout (float) – Maximum time to block in seconds.

Raises

KafkaError: Use exc.args[0].retriable() to check if the operation may be retried, else treat the error as a fatal error.

list_topics()
list_topics([topic=None][, timeout=-1])

Request metadata from the cluster. This method provides the same information as listTopics(), describeTopics() and describeCluster() in the Java Admin client.

Parameters
  • topic (str) – If specified, only request information about this topic, else return results for all topics in cluster. Warning: If auto.create.topics.enable is set to true on the broker and an unknown topic is specified, it will be created.

  • timeout (float) – The maximum response time before timing out, or -1 for infinite timeout.

Return type

ClusterMetadata

Raises

KafkaException

poll()
poll([timeout])

Polls the producer for events and calls the corresponding callbacks (if registered).

Callbacks:

Parameters

timeout (float) – Maximum time to block waiting for events. (Seconds)

Returns

Number of events processed (callbacks served)

Return type

int

produce()
produce(topic[, value][, key][, partition][, on_delivery][, timestamp][, headers])

Produce message to topic. This is an asynchronous operation, an application may use the callback (alias on_delivery) argument to pass a function (or lambda) that will be called from poll() when the message has been successfully delivered or permanently fails delivery.

Currently message headers are not supported on the message returned to the callback. The msg.headers() will return None even if the original message had headers set.

Parameters
  • topic (str) – Topic to produce message to

  • value (str|bytes) – Message payload

  • key (str|bytes) – Message key

  • partition (int) – Partition to produce to, else uses the configured built-in partitioner.

  • on_delivery(err,msg) (func) – Delivery report callback to call (from poll() or flush()) on successful or failed delivery

  • timestamp (int) – Message timestamp (CreateTime) in milliseconds since epoch UTC (requires librdkafka >= v0.9.4, api.version.request=true, and broker >= 0.10.0.0). Default value is current time.

  • headers (dict|list) – Message headers to set on the message. The header key must be a string while the value must be binary, unicode or None. Accepts a list of (key,value) or a dict. (Requires librdkafka >= v0.11.4 and broker version >= 0.11.0.0)

Return type

None

Raises
  • BufferError – if the internal producer message queue is full (queue.buffering.max.messages exceeded)

  • KafkaException – for other errors, see exception code

  • NotImplementedError – if timestamp is specified without underlying library support.

purge()
purge([in_queue=True][, in_flight=True][, blocking=True])

Purge messages currently handled by the producer instance. The application will need to call poll() or flush() afterwards to serve the delivery report callbacks of the purged messages.

Param

bool in_queue: Purge messages from internal queues. By default, true.

Param

bool in_flight: Purge messages in flight to or from the broker. By default, true.

Param

bool blocking: If set to False, will not wait on background thread queue purging to finish. By default, true.

send_offsets_to_transaction()
send_offsets_to_transaction(positions, group_metadata[, timeout])

Sends a list of topic partition offsets to the consumer group coordinator for group_metadata and marks the offsets as part of the current transaction. These offsets will be considered committed only if the transaction is committed successfully.

The offsets should be the next message your application will consume, i.e., the last processed message’s offset + 1 for each partition. Either track the offsets manually during processing or use consumer.position() (on the consumer) to get the current offsets for the partitions assigned to the consumer.

Use this method at the end of a consume-transform-produce loop prior to committing the transaction with commit_transaction().

Note: The consumer must disable auto commits

(set enable.auto.commit to false on the consumer).

Note: Logical and invalid offsets (e.g., OFFSET_INVALID) in offsets will be ignored. If there are no valid offsets in offsets the function will return successfully and no action will be taken.

Parameters
  • offsets (list(TopicPartition)) – current consumer/processing position(offsets) for the list of partitions.

  • group_metadata (object) – consumer group metadata retrieved from the input consumer’s get_consumer_group_metadata().

  • timeout (float) – Amount of time to block in seconds.

Raises

KafkaError: Use exc.args[0].retriable() to check if the operation may be retried, or exc.args[0].txn_requires_abort() if the current transaction has failed and must be aborted by calling abort_transaction() and then start a new transaction with begin_transaction(). Treat any other error as a fatal error.

set_sasl_credentials()
set_sasl_credentials(username, password)

Sets the SASL credentials used for this client. These credentials will overwrite the old ones, and will be used the next time the client needs to authenticate. This method will not disconnect existing broker connections that have been established with the old credentials. This method is applicable only to SASL PLAIN and SCRAM mechanisms.

SerializingProducer (experimental)

class confluent_kafka.SerializingProducer(conf)[source]

A high level Kafka producer with serialization capabilities.

This class is experimental and likely to be removed, or subject to incompatible API changes in future versions of the library. To avoid breaking changes on upgrading, we recommend using serializers directly.

Derived from the Producer class, overriding the Producer.produce() method to add serialization capabilities.

Additional configuration properties:

Property Name

Type

Description

key.serializer

callable

Callable(obj, SerializationContext) -> bytes

Serializer used for message keys.

value.serializer

callable

Callable(obj, SerializationContext) -> bytes

Serializer used for message values.

Serializers for string, integer and double (StringSerializer, IntegerSerializer and DoubleSerializer) are supplied out-of-the-box in the confluent_kafka.serialization namespace.

Serializers for Protobuf, JSON Schema and Avro (ProtobufSerializer, JSONSerializer and AvroSerializer) with Confluent Schema Registry integration are supplied out-of-the-box in the confluent_kafka.schema_registry namespace.

See also

  • The Configuration Guide for in depth information on how to configure the client.

  • CONFIGURATION.md for a comprehensive set of configuration properties.

  • STATISTICS.md for detailed information on the statistics provided by stats_cb

  • The Producer class for inherited methods.

Parameters

conf (producer) – SerializingProducer configuration.

Inherited-members

produce(topic, key=None, value=None, partition=- 1, on_delivery=None, timestamp=0, headers=None)[source]

Produce a message.

This is an asynchronous operation. An application may use the on_delivery argument to pass a function (or lambda) that will be called from SerializingProducer.poll() when the message has been successfully delivered or permanently fails delivery.

Note

Currently message headers are not supported on the message returned to the callback. The msg.headers() will return None even if the original message had headers set.

Parameters
  • topic (str) – Topic to produce message to.

  • key (object, optional) – Message payload key.

  • value (object, optional) – Message payload value.

  • partition (int, optional) – Partition to produce to, else the configured built-in partitioner will be used.

  • on_delivery (callable(KafkaError, Message), optional) – Delivery report callback. Called as a side effect of SerializingProducer.poll() or SerializingProducer.flush() on successful or failed delivery.

  • timestamp (float, optional) – Message timestamp (CreateTime) in milliseconds since Unix epoch UTC (requires broker >= 0.10.0.0). Default value is current time.

  • headers (dict, optional) – Message headers. The header key must be a str while the value must be binary, unicode or None. (Requires broker version >= 0.11.0.0)

Raises
  • BufferError – if the internal producer message queue is full. (queue.buffering.max.messages exceeded). If this happens the application should call SerializingProducer.Poll() and try again.

  • KeySerializationError – If an error occurs during key serialization.

  • ValueSerializationError – If an error occurs during value serialization.

  • KafkaException – For all other errors

SchemaRegistryClient

class confluent_kafka.schema_registry.SchemaRegistryClient(conf)[source]

A Confluent Schema Registry client.

Configuration properties (* indicates a required field):

Property name

type

Description

url *

str

Schema Registry URL.

ssl.ca.location

str

Path to CA certificate file used to verify the Schema Registry’s private key.

ssl.key.location

str

Path to client’s private key (PEM) used for authentication.

ssl.certificate.location must also be set.

ssl.certificate.location

str

Path to client’s public key (PEM) used for authentication.

May be set without ssl.key.location if the private key is stored within the PEM as well.

basic.auth.user.info

str

Client HTTP credentials in the form of username:password.

By default userinfo is extracted from the URL if present.

Parameters

conf (dict) – Schema Registry client configuration.

register_schema(subject_name, schema, normalize_schemas=False)[source]

Registers a schema under subject_name.

Parameters
  • subject_name (str) – subject to register a schema under

  • schema (Schema) – Schema instance to register

Returns

Schema id

Return type

int

Raises

SchemaRegistryError – if Schema violates this subject’s Compatibility policy or is otherwise invalid.

get_schema(schema_id)[source]

Fetches the schema associated with schema_id from the Schema Registry. The result is cached so subsequent attempts will not require an additional round-trip to the Schema Registry.

Parameters

schema_id (int) – Schema id

Returns

Schema instance identified by the schema_id

Return type

Schema

Raises

SchemaRegistryError – If schema can’t be found.

lookup_schema(subject_name, schema, normalize_schemas=False)[source]

Returns schema registration information for subject.

Parameters
  • subject_name (str) – Subject name the schema is registered under

  • schema (Schema) – Schema instance.

Returns

Subject registration information for this schema.

Return type

RegisteredSchema

Raises

SchemaRegistryError – If schema or subject can’t be found

get_subjects()[source]

List all subjects registered with the Schema Registry

Returns

Registered subject names

Return type

list(str)

Raises

SchemaRegistryError – if subjects can’t be found

delete_subject(subject_name, permanent=False)[source]

Deletes the specified subject and its associated compatibility level if registered. It is recommended to use this API only when a topic needs to be recycled or in development environments.

Parameters
  • subject_name (str) – subject name

  • permanent (bool) – True for a hard delete, False (default) for a soft delete

Returns

Versions deleted under this subject

Return type

list(int)

Raises

SchemaRegistryError – if the request was unsuccessful.

get_latest_version(subject_name)[source]

Retrieves latest registered version for subject

Parameters

subject_name (str) – Subject name.

Returns

Registration information for this version.

Return type

RegisteredSchema

Raises

SchemaRegistryError – if the version can’t be found or is invalid.

get_version(subject_name, version)[source]

Retrieves a specific schema registered under subject_name.

Parameters
  • subject_name (str) – Subject name.

  • version (int) – version number. Defaults to latest version.

Returns

Registration information for this version.

Return type

RegisteredSchema

Raises

SchemaRegistryError – if the version can’t be found or is invalid.

get_versions(subject_name)[source]

Get a list of all versions registered with this subject.

Parameters

subject_name (str) – Subject name.

Returns

Registered versions

Return type

list(int)

Raises

SchemaRegistryError – If subject can’t be found

delete_version(subject_name, version)[source]

Deletes a specific version registered to subject_name.

Parameters
  • subject_name (str) –

  • version (int) – Version number

Returns

Version number which was deleted

Return type

int

Raises

SchemaRegistryError – if the subject or version cannot be found.

set_compatibility(subject_name=None, level=None)[source]

Update global or subject level compatibility level.

Parameters
  • level (str) – Compatibility level. See API reference for a list of valid values.

  • subject_name (str, optional) – Subject to update. Sets compatibility level policy if not set.

Returns

The newly configured compatibility level.

Return type

str

Raises

SchemaRegistryError – If the compatibility level is invalid.

get_compatibility(subject_name=None)[source]

Get the current compatibility level.

Parameters

subject_name (str, optional) – Subject name. Returns global policy if left unset.

Returns

Compatibility level for the subject if set, otherwise the global compatibility level.

Return type

str

Raises

SchemaRegistryError – if the request was unsuccessful.

test_compatibility(subject_name, schema, version='latest')[source]

Test the compatibility of a candidate schema for a given subject and version

Parameters
  • subject_name (str) – Subject name the schema is registered under

  • schema (Schema) – Schema instance.

  • version (int or str, optional) – Version number, or the string “latest”. Defaults to “latest”.

Returns

True if the schema is compatible with the specified version

Return type

bool

Raises

SchemaRegistryError – if the request was unsuccessful.

Serialization API

Deserializer

class confluent_kafka.serialization.Deserializer[source]

Extensible class from which all Deserializer implementations derive. Deserializers instruct Kafka clients on how to convert bytes to objects.

See built-in implementations, listed below, for an example of how to extend this class.

Note

This class is not directly instantiable. The derived classes must be used instead.

The following implementations are provided by this module.

Note

Unless noted elsewhere all numeric types are signed and serialization is big-endian.

Name

Type

Binary Format

DoubleDeserializer

float

IEEE 764 binary64

IntegerDeserializer

int

int32

StringDeserializer

unicode

unicode(encoding)

__call__(value, ctx=None)[source]

Convert bytes to object

Parameters
  • value (bytes) – bytes to be deserialized

  • ctx (SerializationContext) – Metadata pertaining to the serialization operation

Raises

SerializerError if an error occurs during deserialization

Returns

object if data is not None, otherwise None

AvroDeserializer

class confluent_kafka.schema_registry.avro.AvroDeserializer(schema_registry_client, schema_str=None, from_dict=None, return_record_name=False)[source]

Deserializer for Avro binary encoded data with Confluent Schema Registry framing.

Note

By default, Avro complex types are returned as dicts. This behavior can be overriden by registering a callable from_dict with the deserializer to convert the dicts to the desired type.

See avro_consumer.py in the examples directory in the examples directory for example usage.

Parameters
  • schema_registry_client (SchemaRegistryClient) – Confluent Schema Registry client instance.

  • schema_str (str, Schema, optional) – Avro reader schema declaration Accepts either a string or a Schema instance. If not provided, the writer schema will be used as the reader schema. Note that string definitions cannot reference other schemas. For referencing other schemas, use a Schema instance.

  • from_dict (callable, optional) – Callable(dict, SerializationContext) -> object. Converts a dict to an instance of some object.

  • return_record_name (bool) – If True, when reading a union of records, the result will be a tuple where the first value is the name of the record and the second value is the record itself. Defaults to False.

__call__(data, ctx)[source]

Deserialize Avro binary encoded data with Confluent Schema Registry framing to a dict, or object instance according to from_dict, if specified.

Parameters
  • data (bytes) – bytes

  • ctx (SerializationContext) – Metadata relevant to the serialization operation.

Raises

SerializerError – if an error occurs parsing data.

Returns

If data is None, then None. Else, a dict, or object instance according

to from_dict, if specified.

Return type

object

DoubleDeserializer

class confluent_kafka.serialization.DoubleDeserializer[source]

Deserializes float to IEEE 764 binary64.

__call__(value, ctx=None)[source]

Deserializes float from IEEE 764 binary64 bytes.

Parameters
  • value (bytes) – bytes to be deserialized

  • ctx (SerializationContext) – Metadata pertaining to the serialization operation

Raises

SerializerError if an error occurs during deserialization.

Returns

float if data is not None, otherwise None

IntegerDeserializer

class confluent_kafka.serialization.IntegerDeserializer[source]

Deserializes int to int32 bytes.

__call__(value, ctx=None)[source]

Deserializes int from int32 bytes.

Parameters
  • value (bytes) – bytes to be deserialized

  • ctx (SerializationContext) – Metadata pertaining to the serialization operation

Raises

SerializerError if an error occurs during deserialization.

Returns

int if data is not None, otherwise None

JSONDeserializer

class confluent_kafka.schema_registry.json_schema.JSONDeserializer(schema_str, from_dict=None, schema_registry_client=None)[source]

Deserializer for JSON encoded data with Confluent Schema Registry framing.

Parameters
  • schema_str (str, Schema, optional) – JSON schema definition Accepts schema as either a string or a Schema instance. Note that string definitions cannot reference other schemas. For referencing other schemas, use a Schema instance. If not provided, schemas will be retrieved from schema_registry_client based on the schema ID in the wire header of each message.

  • from_dict (callable, optional) – Callable(dict, SerializationContext) -> object. Converts a dict to a Python object instance.

  • schema_registry_client (SchemaRegistryClient, optional) – Schema Registry client instance. Needed if schema_str is a schema referencing other schemas or is not provided.

__call__(data, ctx)[source]

Deserialize a JSON encoded record with Confluent Schema Registry framing to a dict, or object instance according to from_dict if from_dict is specified.

Parameters
  • data (bytes) – A JSON serialized record with Confluent Schema Regsitry framing.

  • ctx (SerializationContext) – Metadata relevant to the serialization operation.

Returns

A dict, or object instance according to from_dict if from_dict is specified.

Raises

SerializerError – If there was an error reading the Confluent framing data, or if data was not successfully validated with the configured schema.

ProtobufDeserializer

class confluent_kafka.schema_registry.protobuf.ProtobufDeserializer(message_type, conf=None)[source]

Deserializer for Protobuf serialized data with Confluent Schema Registry framing.

Parameters
  • message_type (Message derived type) – Protobuf Message type.

  • conf (dict) – Configuration dictionary.

ProtobufDeserializer configuration properties:

Property Name

Type

Description

use.deprecated.format

bool

Specifies whether the Protobuf deserializer should deserialize message indexes without zig-zag encoding. This option must be explicitly configured as older and newer Protobuf producers are incompatible. If Protobuf messages in the topic to consume were produced with confluent-kafka-python <1.8 then this property must be set to True until all old messages have been processed and producers have been upgraded. Warning: This configuration property will be removed in a future version of the client.

See Also: Protobuf API reference

__call__(data, ctx)[source]

Deserialize a serialized protobuf message with Confluent Schema Registry framing.

Parameters
  • data (bytes) – Serialized protobuf message with Confluent Schema Registry framing.

  • ctx (SerializationContext) – Metadata relevant to the serialization operation.

Returns

Protobuf Message instance.

Return type

Message

Raises

SerializerError – If there was an error reading the Confluent framing data, or parsing the protobuf serialized message.

StringDeserializer

class confluent_kafka.serialization.StringDeserializer(codec='utf_8')[source]

Deserializes a str(py2:unicode) from bytes.

Parameters

codec (str, optional) – encoding scheme. Defaults to utf_8

__call__(value, ctx=None)[source]

Serializes unicode to bytes per the configured codec. Defaults to utf_8.

Compatibility Note:

Python 2 str objects must be converted to unicode objects by the application prior to using this serializer.

Python 3 all str objects are already unicode objects.

Parameters
  • value (bytes) – bytes to be deserialized

  • ctx (SerializationContext) – Metadata pertaining to the serialization operation

Raises

SerializerError if an error occurs during deserialization.

Returns

unicode if data is not None, otherwise None

Serializer

class confluent_kafka.serialization.Serializer[source]

Extensible class from which all Serializer implementations derive. Serializers instruct Kafka clients on how to convert Python objects to bytes.

See built-in implementations, listed below, for an example of how to extend this class.

Note

This class is not directly instantiable. The derived classes must be used instead.

The following implementations are provided by this module.

Note

Unless noted elsewhere all numeric types are signed and serialization is big-endian.

Name

Type

Binary Format

DoubleSerializer

float

IEEE 764 binary64

IntegerSerializer

int

int32

StringSerializer

unicode

unicode(encoding)

__call__(obj, ctx=None)[source]

Converts obj to bytes.

Parameters
  • obj (object) – object to be serialized

  • ctx (SerializationContext) – Metadata pertaining to the serialization operation

Raises

SerializerError if an error occurs during serialization

Returns

bytes if obj is not None, otherwise None

AvroSerializer

class confluent_kafka.schema_registry.avro.AvroSerializer(schema_registry_client, schema_str, to_dict=None, conf=None)[source]

Serializer that outputs Avro binary encoded data with Confluent Schema Registry framing.

Configuration properties:

Property Name

Type

Description

auto.register.schemas

bool

If True, automatically register the configured schema with Confluent Schema Registry if it has not previously been associated with the relevant subject (determined via subject.name.strategy).

Defaults to True.

normalize.schemas

bool

Whether to normalize schemas, which will transform schemas to have a consistent format, including ordering properties and references.

use.latest.version

bool

Whether to use the latest subject version for serialization.

WARNING: There is no check that the latest schema is backwards compatible with the object being serialized.

Defaults to False.

subject.name.strategy

callable

Callable(SerializationContext, str) -> str

Defines how Schema Registry subject names are constructed. Standard naming strategies are defined in the confluent_kafka.schema_registry namespace.

Defaults to topic_subject_name_strategy.

Schemas are registered against subject names in Confluent Schema Registry that define a scope in which the schemas can be evolved. By default, the subject name is formed by concatenating the topic name with the message field (key or value) separated by a hyphen.

i.e. {topic name}-{message field}

Alternative naming strategies may be configured with the property subject.name.strategy.

Supported subject name strategies:

Subject Name Strategy

Output Format

topic_subject_name_strategy(default)

{topic name}-{message field}

topic_record_subject_name_strategy

{topic name}-{record name}

record_subject_name_strategy

{record name}

See Subject name strategy for additional details.

Note

Prior to serialization, all values must first be converted to a dict instance. This may handled manually prior to calling Producer.produce() or by registering a to_dict callable with AvroSerializer.

See avro_producer.py in the examples directory for example usage.

Note

Tuple notation can be used to determine which branch of an ambiguous union to take.

See fastavro notation

Parameters
  • schema_registry_client (SchemaRegistryClient) – Schema Registry client instance.

  • schema_str (str or Schema) – Avro Schema Declaration. Accepts either a string or a Schema instance. Note that string definitions cannot reference other schemas. For referencing other schemas, use a Schema instance.

  • to_dict (callable, optional) – Callable(object, SerializationContext) -> dict. Converts object to a dict.

  • conf (dict) – AvroSerializer configuration.

__call__(obj, ctx)[source]

Serializes an object to Avro binary format, prepending it with Confluent Schema Registry framing.

Parameters
  • obj (object) – The object instance to serialize.

  • ctx (SerializationContext) – Metadata pertaining to the serialization operation.

Raises
  • SerializerError – If any error occurs serializing obj.

  • SchemaRegistryError – If there was an error registering the schema with Schema Registry, or auto.register.schemas is false and the schema was not registered.

Returns

Confluent Schema Registry encoded Avro bytes

Return type

bytes

DoubleSerializer

class confluent_kafka.serialization.DoubleSerializer[source]

Serializes float to IEEE 764 binary64.

__call__(obj, ctx=None)[source]
Parameters
  • obj (object) – object to be serialized

  • ctx (SerializationContext) – Metadata pertaining to the serialization operation

Note

None objects are represented as Kafka Null.

Raises

SerializerError if an error occurs during serialization.

Returns

IEEE 764 binary64 bytes if obj is not None, otherwise None

IntegerSerializer

class confluent_kafka.serialization.IntegerSerializer[source]

Serializes int to int32 bytes.

__call__(obj, ctx=None)[source]

Serializes int as int32 bytes.

Parameters
  • obj (object) – object to be serialized

  • ctx (SerializationContext) – Metadata pertaining to the serialization operation

Note

None objects are represented as Kafka Null.

Raises

SerializerError if an error occurs during serialization

Returns

int32 bytes if obj is not None, else None

JSONSerializer

class confluent_kafka.schema_registry.json_schema.JSONSerializer(schema_str, schema_registry_client, to_dict=None, conf=None)[source]

Serializer that outputs JSON encoded data with Confluent Schema Registry framing.

Configuration properties:

Property Name

Type

Description

auto.register.schemas

bool

If True, automatically register the configured schema with Confluent Schema Registry if it has not previously been associated with the relevant subject (determined via subject.name.strategy).

Defaults to True.

Raises SchemaRegistryError if the schema was not registered against the subject, or could not be successfully registered.

normalize.schemas

bool

Whether to normalize schemas, which will transform schemas to have a consistent format, including ordering properties and references.

use.latest.version

bool

Whether to use the latest subject version for serialization.

WARNING: There is no check that the latest schema is backwards compatible with the object being serialized.

Defaults to False.

subject.name.strategy

callable

Callable(SerializationContext, str) -> str

Defines how Schema Registry subject names are constructed. Standard naming strategies are defined in the confluent_kafka.schema_registry namespace.

Defaults to topic_subject_name_strategy.

Schemas are registered against subject names in Confluent Schema Registry that define a scope in which the schemas can be evolved. By default, the subject name is formed by concatenating the topic name with the message field (key or value) separated by a hyphen.

i.e. {topic name}-{message field}

Alternative naming strategies may be configured with the property subject.name.strategy.

Supported subject name strategies:

Subject Name Strategy

Output Format

topic_subject_name_strategy(default)

{topic name}-{message field}

topic_record_subject_name_strategy

{topic name}-{record name}

record_subject_name_strategy

{record name}

See Subject name strategy for additional details.

Notes

The title annotation, referred to elsewhere as a record name is not strictly required by the JSON Schema specification. It is however required by this serializer in order to register the schema with Confluent Schema Registry.

Prior to serialization, all objects must first be converted to a dict instance. This may be handled manually prior to calling Producer.produce() or by registering a to_dict callable with JSONSerializer.

Parameters
  • schema_str (str, Schema) – JSON Schema definition. Accepts schema as either a string or a Schema instance. Note that string definitions cannot reference other schemas. For referencing other schemas, use a Schema instance.

  • schema_registry_client (SchemaRegistryClient) – Schema Registry client instance.

  • to_dict (callable, optional) – Callable(object, SerializationContext) -> dict. Converts object to a dict.

  • conf (dict) – JsonSerializer configuration.

__call__(obj, ctx)[source]

Serializes an object to JSON, prepending it with Confluent Schema Registry framing.

Parameters
  • obj (object) – The object instance to serialize.

  • ctx (SerializationContext) – Metadata relevant to the serialization operation.

Raises

SerializerError if any error occurs serializing obj.

Returns

None if obj is None, else a byte array containing the JSON serialized data with Confluent Schema Registry framing.

Return type

bytes

ProtobufSerializer

class confluent_kafka.schema_registry.protobuf.ProtobufSerializer(msg_type, schema_registry_client, conf=None)[source]

Serializer for Protobuf Message derived classes. Serialization format is Protobuf, with Confluent Schema Registry framing.

Configuration properties:

Property Name

Type

Description

auto.register.schemas

bool

If True, automatically register the configured schema with Confluent Schema Registry if it has not previously been associated with the relevant subject (determined via subject.name.strategy).

Defaults to True.

Raises SchemaRegistryError if the schema was not registered against the subject, or could not be successfully registered.

normalize.schemas

bool

Whether to normalize schemas, which will transform schemas to have a consistent format, including ordering properties and references.

use.latest.version

bool

Whether to use the latest subject version for serialization.

WARNING: There is no check that the latest schema is backwards compatible with the object being serialized.

Defaults to False.

skip.known.types

bool

Whether or not to skip known types when resolving schema dependencies.

Defaults to False.

subject.name.strategy

callable

Callable(SerializationContext, str) -> str

Defines how Schema Registry subject names are constructed. Standard naming strategies are defined in the confluent_kafka.schema_registry namespace.

Defaults to topic_subject_name_strategy.

reference.subject.name.strategy

callable

Callable(SerializationContext, str) -> str

Defines how Schema Registry subject names for schema references are constructed.

Defaults to reference_subject_name_strategy

use.deprecated.format

bool

Specifies whether the Protobuf serializer should serialize message indexes without zig-zag encoding. This option must be explicitly configured as older and newer Protobuf producers are incompatible. If the consumers of the topic being produced to are using confluent-kafka-python <1.8 then this property must be set to True until all old consumers have have been upgraded.

Warning: This configuration property will be removed in a future version of the client.

Schemas are registered against subject names in Confluent Schema Registry that define a scope in which the schemas can be evolved. By default, the subject name is formed by concatenating the topic name with the message field (key or value) separated by a hyphen.

i.e. {topic name}-{message field}

Alternative naming strategies may be configured with the property subject.name.strategy.

Supported subject name strategies

Subject Name Strategy

Output Format

topic_subject_name_strategy(default)

{topic name}-{message field}

topic_record_subject_name_strategy

{topic name}-{record name}

record_subject_name_strategy

{record name}

See Subject name strategy for additional details.

Parameters
  • msg_type (GeneratedProtocolMessageType) – Protobuf Message type.

  • schema_registry_client (SchemaRegistryClient) – Schema Registry client instance.

  • conf (dict) – ProtobufSerializer configuration.

__call__(message, ctx)[source]

Serializes an instance of a class derived from Protobuf Message, and prepends it with Confluent Schema Registry framing.

Parameters
  • message (Message) – An instance of a class derived from Protobuf Message.

  • ctx (SerializationContext) – Metadata relevant to the serialization. operation.

Raises

SerializerError if any error occurs during serialization.

Returns

None if messages is None, else a byte array containing the Protobuf serialized message with Confluent Schema Registry framing.

StringSerializer

class confluent_kafka.serialization.StringSerializer(codec='utf_8')[source]

Serializes unicode to bytes per the configured codec. Defaults to utf_8.

Note

None objects are represented as Kafka Null.

Parameters

codec (str, optional) – encoding scheme. Defaults to utf_8

__call__(obj, ctx=None)[source]

Serializes a str(py2:unicode) to bytes.

Compatibility Note:

Python 2 str objects must be converted to unicode objects. Python 3 all str objects are already unicode objects.

Parameters
  • obj (object) – object to be serialized

  • ctx (SerializationContext) – Metadata pertaining to the serialization operation

Raises

SerializerError if an error occurs during serialization.

Returns

serialized bytes if obj is not None, otherwise None

Supporting Classes

Message

class confluent_kafka.Message

The Message object represents either a single consumed or produced message, or an event (error() is not None).

An application must check with error() to see if the object is a proper message (error() returns None) or an error/event.

This class is not user-instantiable.

len()
Returns

Message value (payload) size in bytes

Return type

int

error()

The message object is also used to propagate errors and events, an application must check error() to determine if the Message is a proper message (error() returns None) or an error or event (error() returns a KafkaError object)

Return type

None or KafkaError

headers()

Retrieve the headers set on a message. Each header is a key valuepair. Please note that header keys are ordered and can repeat.

Returns

list of two-tuples, one (key, value) pair for each header.

Return type

[(str, bytes),…] or None.

key()
Returns

message key or None if not available.

Return type

str|bytes or None

latency()

Retrieve the time it took to produce the message, from calling produce() to the time the acknowledgement was received from the broker. Must only be used with the producer for successfully produced messages.

returns

latency as float seconds, or None if latency information is not available (such as for errored messages).

rtype

float or None

leader_epoch()
Returns

message offset leader epoch or None if not available.

Return type

int or None

offset()
Returns

message offset or None if not available.

Return type

int or None

partition()
Returns

partition number or None if not available.

Return type

int or None

set_headers()

Set the field ‘Message.headers’ with new value.

Parameters

value (object) – Message.headers.

Returns

None.

Return type

None

set_key()

Set the field ‘Message.key’ with new value.

Parameters

value (object) – Message.key.

Returns

None.

Return type

None

set_value()

Set the field ‘Message.value’ with new value.

Parameters

value (object) – Message.value.

Returns

None.

Return type

None

timestamp()

Retrieve timestamp type and timestamp from message. The timestamp type is one of:

  • TIMESTAMP_NOT_AVAILABLE - Timestamps not supported by broker.

  • TIMESTAMP_CREATE_TIME - Message creation time (or source / producer time).

  • TIMESTAMP_LOG_APPEND_TIME - Broker receive time.

The returned timestamp should be ignored if the timestamp type is TIMESTAMP_NOT_AVAILABLE.

The timestamp is the number of milliseconds since the epoch (UTC).

Timestamps require broker version 0.10.0.0 or later and {'api.version.request': True} configured on the client.

returns

tuple of message timestamp type, and timestamp.

rtype

(int, int)

topic()
Returns

topic name or None if not available.

Return type

str or None

value()
Returns

message value (payload) or None if not available.

Return type

str|bytes or None

TopicPartition

class confluent_kafka.TopicPartition

TopicPartition is a generic type to hold a single partition and various information about it.

It is typically used to provide a list of topics or partitions for various operations, such as Consumer.assign().

TopicPartition(topic[, partition][, offset][, metadata][, leader_epoch])

Instantiate a TopicPartition object.

Parameters
  • topic (string) – Topic name

  • partition (int) – Partition id

  • offset (int) – Initial partition offset

  • metadata (string) – Offset metadata

  • leader_epoch (int) – Offset leader epoch

Return type

TopicPartition

error

Indicates an error (with KafkaError) unless None.

Type

attribute error

leader_epoch

Offset leader epoch (int), or None

Type

attribute leader_epoch

metadata

Optional application metadata committed with the offset (string)

Type

attribute metadata

offset

Offset (long)

Either an absolute offset (>=0) or a logical offset: OFFSET_BEGINNING, OFFSET_END, OFFSET_STORED, OFFSET_INVALID

Type

attribute offset

partition

Partition number (int)

Type

attribute partition

topic

Topic name (string)

Type

attribute topic

TopicCollection

class confluent_kafka.TopicCollection(topic_names)[source]

Represents collection of topics in the form of different identifiers for the topic.

Parameters

topic_names (list(str)) – List of topic names.

TopicPartitionInfo

class confluent_kafka.TopicPartitionInfo(id, leader, replicas, isr)[source]

Represents partition information. Used by TopicDescription.

Parameters
  • id (int) – Id of the partition.

  • leader (Node) – Leader broker for the partition.

  • replicas (list(Node)) – Replica brokers for the partition.

  • isr (list(Node)) – In-Sync-Replica brokers for the partition.

Node

class confluent_kafka.Node(id, host, port, rack=None)[source]

Represents node information. Used by ConsumerGroupDescription

Parameters
  • id (int) – The node id of this node.

  • id_string – String representation of the node id.

  • host – The host name for this node.

  • port (int) – The port for this node.

  • rack (str) – The rack for this node.

ConsumerGroupTopicPartitions

class confluent_kafka.ConsumerGroupTopicPartitions(group_id, topic_partitions=None)[source]

Represents consumer group and its topic partition information. Used by AdminClient.list_consumer_group_offsets() and AdminClient.alter_consumer_group_offsets().

Parameters
  • group_id (str) – Id of the consumer group.

  • topic_partitions (list(TopicPartition)) – List of topic partitions information.

ConsumerGroupState

class confluent_kafka.ConsumerGroupState(value)[source]

Enumerates the different types of Consumer Group State.

Note that the state UNKOWN (typo one) is deprecated and will be removed in future major release. Use UNKNOWN instead.

UNKNOWN = 0

State is not known or not set

UNKOWN = 0

Deprecated since version 2.3.0.

Use UNKNOWN instead.

PREPARING_REBALANCING = 1

Preparing rebalance for the consumer group.

COMPLETING_REBALANCING = 2

Consumer Group is completing rebalancing.

STABLE = 3

Consumer Group is stable.

DEAD = 4

Consumer Group is dead.

EMPTY = 5

Consumer Group is empty.

Uuid

class confluent_kafka.Uuid

Generic Uuid. Being used in various identifiers including topic_id.

Uuid(most_significant_bits, least_significant_bits)

Instantiate a Uuid object.

Parameters
  • most_significant_bits (long) – Most significant 64 bits of the 128 bits Uuid.

  • least_significant_bits (long) – Least significant 64 bits of the 128 bits Uuid.

Return type

Uuid

get_least_significant_bits()
Returns

Least significant 64 bits of the 128 bits Uuid

Return type

int

get_most_significant_bits()
Returns

Most significant 64 bits of the 128 bits Uuid

Return type

int

ElectionType

class confluent_kafka.ElectionType(value)[source]

Enumerates the different types of leader elections.

PREFERRED = 0

Preferred election

UNCLEAN = 1

Unclean election

MessageField

class confluent_kafka.serialization.MessageField[source]

Enum like object for identifying Message fields.

KEY

Message key

Type

str

VALUE

Message value

Type

str

SerializationContext

class confluent_kafka.serialization.SerializationContext(topic, field, headers=None)[source]

SerializationContext provides additional context to the serializer/deserializer about the data it’s serializing/deserializing.

Parameters
  • topic (str) – Topic data is being produce to or consumed from.

  • field (MessageField) – Describes what part of the message is being serialized.

  • headers (list) – List of message header tuples. Defaults to None.

Schema

class confluent_kafka.schema_registry.Schema(schema_str, schema_type, references=[])[source]

An unregistered Schema.

Parameters
  • schema_str (str) – String representation of the schema.

  • schema_type (str) – The schema type: AVRO, PROTOBUF or JSON.

  • references ([SchemaReference]) – SchemaReferences used in this schema.

RegisteredSchema

class confluent_kafka.schema_registry.RegisteredSchema(schema_id, schema, subject, version)[source]

Schema registration information.

Represents a Schema registered with a subject. Use this class when you need a specific version of a subject such as forming a SchemaReference.

Parameters
  • schema_id (int) – Registered Schema id

  • schema (Schema) – Registered Schema

  • subject (str) – Subject this schema is registered under

  • version (int) – Version of this subject this schema is registered to

SchemaRegistryError

class confluent_kafka.schema_registry.error.SchemaRegistryError(http_status_code, error_code, error_message)[source]

Represents an error returned by the Confluent Schema Registry

Parameters
  • http_status_code (int) – HTTP status code

  • error_code (int) – Schema Registry error code; -1 represents an unknown error.

  • error_message (str) – Description of the error

KafkaError

class confluent_kafka.KafkaError

Kafka error and event object

The KafkaError class serves multiple purposes

  • Propagation of errors

  • Propagation of events

  • Exceptions

Parameters
  • error (KafkaError) – Error code indicating the type of error.

  • reason (str) – Alternative message to describe the error.

  • fatal (bool) – Set to true if a fatal error.

  • retriable (bool) – Set to true if operation is retriable.

  • txn_requires_abort (bool) – Set to true if this is an abortable

  • error. (transaction) –

Error and event constants:

Constant

Description

_BAD_MSG

Local: Bad message format

_BAD_COMPRESSION

Local: Invalid compressed data

_DESTROY

Local: Broker handle destroyed

_FAIL

Local: Communication failure with broker

_TRANSPORT

Local: Broker transport failure

_CRIT_SYS_RESOURCE

Local: Critical system resource failure

_RESOLVE

Local: Host resolution failure

_MSG_TIMED_OUT

Local: Message timed out

_PARTITION_EOF

Broker: No more messages

_UNKNOWN_PARTITION

Local: Unknown partition

_FS

Local: File or filesystem error

_UNKNOWN_TOPIC

Local: Unknown topic

_ALL_BROKERS_DOWN

Local: All broker connections are down

_INVALID_ARG

Local: Invalid argument or configuration

_TIMED_OUT

Local: Timed out

_QUEUE_FULL

Local: Queue full

_ISR_INSUFF

Local: ISR count insufficient

_NODE_UPDATE

Local: Broker node update

_SSL

Local: SSL error

_WAIT_COORD

Local: Waiting for coordinator

_UNKNOWN_GROUP

Local: Unknown group

_IN_PROGRESS

Local: Operation in progress

_PREV_IN_PROGRESS

Local: Previous operation in progress

_EXISTING_SUBSCRIPTION

Local: Existing subscription

_ASSIGN_PARTITIONS

Local: Assign partitions

_REVOKE_PARTITIONS

Local: Revoke partitions

_CONFLICT

Local: Conflicting use

_STATE

Local: Erroneous state

_UNKNOWN_PROTOCOL

Local: Unknown protocol

_NOT_IMPLEMENTED

Local: Not implemented

_AUTHENTICATION

Local: Authentication failure

_NO_OFFSET

Local: No offset stored

_OUTDATED

Local: Outdated

_TIMED_OUT_QUEUE

Local: Timed out in queue

_UNSUPPORTED_FEATURE

Local: Required feature not supported by broker

_WAIT_CACHE

Local: Awaiting cache update

_INTR

Local: Operation interrupted

_KEY_SERIALIZATION

Local: Key serialization error

_VALUE_SERIALIZATION

Local: Value serialization error

_KEY_DESERIALIZATION

Local: Key deserialization error

_VALUE_DESERIALIZATION

Local: Value deserialization error

_PARTIAL

Local: Partial response

_READ_ONLY

Local: Read-only object

_NOENT

Local: No such entry

_UNDERFLOW

Local: Read underflow

_INVALID_TYPE

Local: Invalid type

_RETRY

Local: Retry operation

_PURGE_QUEUE

Local: Purged in queue

_PURGE_INFLIGHT

Local: Purged in flight

_FATAL

Local: Fatal error

_INCONSISTENT

Local: Inconsistent state

_GAPLESS_GUARANTEE

Local: Gap-less ordering would not be guaranteed if proceeding

_MAX_POLL_EXCEEDED

Local: Maximum application poll interval (max.poll.interval.ms) exceeded

_UNKNOWN_BROKER

Local: Unknown broker

_NOT_CONFIGURED

Local: Functionality not configured

_FENCED

Local: This instance has been fenced by a newer instance

_APPLICATION

Local: Application generated error

_ASSIGNMENT_LOST

Local: Group partition assignment lost

_NOOP

Local: No operation performed

_AUTO_OFFSET_RESET

Local: No offset to automatically reset to

_LOG_TRUNCATION

Local: Partition log truncation detected

_INVALID_DIFFERENT_RECORD

Local: an invalid record in the same batch caused the failure of this message too.

UNKNOWN

Unknown broker error

NO_ERROR

Success

OFFSET_OUT_OF_RANGE

Broker: Offset out of range

INVALID_MSG

Broker: Invalid message

UNKNOWN_TOPIC_OR_PART

Broker: Unknown topic or partition

INVALID_MSG_SIZE

Broker: Invalid message size

LEADER_NOT_AVAILABLE

Broker: Leader not available

NOT_LEADER_FOR_PARTITION

Broker: Not leader for partition

REQUEST_TIMED_OUT

Broker: Request timed out

BROKER_NOT_AVAILABLE

Broker: Broker not available

REPLICA_NOT_AVAILABLE

Broker: Replica not available

MSG_SIZE_TOO_LARGE

Broker: Message size too large

STALE_CTRL_EPOCH

Broker: StaleControllerEpochCode

OFFSET_METADATA_TOO_LARGE

Broker: Offset metadata string too large

NETWORK_EXCEPTION

Broker: Broker disconnected before response received

COORDINATOR_LOAD_IN_PROGRESS

Broker: Coordinator load in progress

COORDINATOR_NOT_AVAILABLE

Broker: Coordinator not available

NOT_COORDINATOR

Broker: Not coordinator

TOPIC_EXCEPTION

Broker: Invalid topic

RECORD_LIST_TOO_LARGE

Broker: Message batch larger than configured server segment size

NOT_ENOUGH_REPLICAS

Broker: Not enough in-sync replicas

NOT_ENOUGH_REPLICAS_AFTER_APPEND

Broker: Message(s) written to insufficient number of in-sync replicas

INVALID_REQUIRED_ACKS

Broker: Invalid required acks value

ILLEGAL_GENERATION

Broker: Specified group generation id is not valid

INCONSISTENT_GROUP_PROTOCOL

Broker: Inconsistent group protocol

INVALID_GROUP_ID

Broker: Invalid group.id

UNKNOWN_MEMBER_ID

Broker: Unknown member

INVALID_SESSION_TIMEOUT

Broker: Invalid session timeout

REBALANCE_IN_PROGRESS

Broker: Group rebalance in progress

INVALID_COMMIT_OFFSET_SIZE

Broker: Commit offset data size is not valid

TOPIC_AUTHORIZATION_FAILED

Broker: Topic authorization failed

GROUP_AUTHORIZATION_FAILED

Broker: Group authorization failed

CLUSTER_AUTHORIZATION_FAILED

Broker: Cluster authorization failed

INVALID_TIMESTAMP

Broker: Invalid timestamp

UNSUPPORTED_SASL_MECHANISM

Broker: Unsupported SASL mechanism

ILLEGAL_SASL_STATE

Broker: Request not valid in current SASL state

UNSUPPORTED_VERSION

Broker: API version not supported

TOPIC_ALREADY_EXISTS

Broker: Topic already exists

INVALID_PARTITIONS

Broker: Invalid number of partitions

INVALID_REPLICATION_FACTOR

Broker: Invalid replication factor

INVALID_REPLICA_ASSIGNMENT

Broker: Invalid replica assignment

INVALID_CONFIG

Broker: Configuration is invalid

NOT_CONTROLLER

Broker: Not controller for cluster

INVALID_REQUEST

Broker: Invalid request

UNSUPPORTED_FOR_MESSAGE_FORMAT

Broker: Message format on broker does not support request

POLICY_VIOLATION

Broker: Policy violation

OUT_OF_ORDER_SEQUENCE_NUMBER

Broker: Broker received an out of order sequence number

DUPLICATE_SEQUENCE_NUMBER

Broker: Broker received a duplicate sequence number

INVALID_PRODUCER_EPOCH

Broker: Producer attempted an operation with an old epoch

INVALID_TXN_STATE

Broker: Producer attempted a transactional operation in an invalid state

INVALID_PRODUCER_ID_MAPPING

Broker: Producer attempted to use a producer id which is not currently assigned to its transactional

INVALID_TRANSACTION_TIMEOUT

Broker: Transaction timeout is larger than the maximum value allowed by the broker’s max.transaction

CONCURRENT_TRANSACTIONS

Broker: Producer attempted to update a transaction while another concurrent operation on the same tr

TRANSACTION_COORDINATOR_FENCED

Broker: Indicates that the transaction coordinator sending a WriteTxnMarker is no longer the current

TRANSACTIONAL_ID_AUTHORIZATION_FAILED

Broker: Transactional Id authorization failed

SECURITY_DISABLED

Broker: Security features are disabled

OPERATION_NOT_ATTEMPTED

Broker: Operation not attempted

KAFKA_STORAGE_ERROR

Broker: Disk error when trying to access log file on disk

LOG_DIR_NOT_FOUND

Broker: The user-specified log directory is not found in the broker config

SASL_AUTHENTICATION_FAILED

Broker: SASL Authentication failed

UNKNOWN_PRODUCER_ID

Broker: Unknown Producer Id

REASSIGNMENT_IN_PROGRESS

Broker: Partition reassignment is in progress

DELEGATION_TOKEN_AUTH_DISABLED

Broker: Delegation Token feature is not enabled

DELEGATION_TOKEN_NOT_FOUND

Broker: Delegation Token is not found on server

DELEGATION_TOKEN_OWNER_MISMATCH

Broker: Specified Principal is not valid Owner/Renewer

DELEGATION_TOKEN_REQUEST_NOT_ALLOWED

Broker: Delegation Token requests are not allowed on this connection

DELEGATION_TOKEN_AUTHORIZATION_FAILED

Broker: Delegation Token authorization failed

DELEGATION_TOKEN_EXPIRED

Broker: Delegation Token is expired

INVALID_PRINCIPAL_TYPE

Broker: Supplied principalType is not supported

NON_EMPTY_GROUP

Broker: The group is not empty

GROUP_ID_NOT_FOUND

Broker: The group id does not exist

FETCH_SESSION_ID_NOT_FOUND

Broker: The fetch session ID was not found

INVALID_FETCH_SESSION_EPOCH

Broker: The fetch session epoch is invalid

LISTENER_NOT_FOUND

Broker: No matching listener

TOPIC_DELETION_DISABLED

Broker: Topic deletion is disabled

FENCED_LEADER_EPOCH

Broker: Leader epoch is older than broker epoch

UNKNOWN_LEADER_EPOCH

Broker: Leader epoch is newer than broker epoch

UNSUPPORTED_COMPRESSION_TYPE

Broker: Unsupported compression type

STALE_BROKER_EPOCH

Broker: Broker epoch has changed

OFFSET_NOT_AVAILABLE

Broker: Leader high watermark is not caught up

MEMBER_ID_REQUIRED

Broker: Group member needs a valid member ID

PREFERRED_LEADER_NOT_AVAILABLE

Broker: Preferred leader was not available

GROUP_MAX_SIZE_REACHED

Broker: Consumer group has reached maximum size

FENCED_INSTANCE_ID

Broker: Static consumer fenced by other consumer with same group.instance.id

ELIGIBLE_LEADERS_NOT_AVAILABLE

Broker: Eligible partition leaders are not available

ELECTION_NOT_NEEDED

Broker: Leader election not needed for topic partition

NO_REASSIGNMENT_IN_PROGRESS

Broker: No partition reassignment is in progress

GROUP_SUBSCRIBED_TO_TOPIC

Broker: Deleting offsets of a topic while the consumer group is subscribed to it

INVALID_RECORD

Broker: Broker failed to validate record

UNSTABLE_OFFSET_COMMIT

Broker: There are unstable offsets that need to be cleared

THROTTLING_QUOTA_EXCEEDED

Broker: Throttling quota has been exceeded

PRODUCER_FENCED

Broker: There is a newer producer with the same transactionalId which fences the current one

RESOURCE_NOT_FOUND

Broker: Request illegally referred to resource that does not exist

DUPLICATE_RESOURCE

Broker: Request illegally referred to the same resource twice

UNACCEPTABLE_CREDENTIAL

Broker: Requested credential would not meet criteria for acceptability

INCONSISTENT_VOTER_SET

Broker: Indicates that the either the sender or recipient of a voter-only request is not one of the

INVALID_UPDATE_VERSION

Broker: Invalid update version

FEATURE_UPDATE_FAILED

Broker: Unable to update finalized features due to server error

PRINCIPAL_DESERIALIZATION_FAILURE

Broker: Request principal deserialization failed during forwarding

UNKNOWN_TOPIC_ID

Broker: Unknown topic id

FENCED_MEMBER_EPOCH

Broker: The member epoch is fenced by the group coordinator

UNRELEASED_INSTANCE_ID

Broker: The instance ID is still used by another member in the consumer group

UNSUPPORTED_ASSIGNOR

Broker: The assignor or its version range is not supported by the consumer group

STALE_MEMBER_EPOCH

Broker: The member epoch is stale

UNKNOWN_SUBSCRIPTION_ID

Broker: Client sent a push telemetry request with an invalid or outdated subscription ID

TELEMETRY_TOO_LARGE

Broker: Client sent a push telemetry request larger than the maximum size the broker will accept

code()

Returns the error/event code for comparison toKafkaError.<ERR_CONSTANTS>.

Returns

error/event code

Return type

int

fatal()
Returns

True if this a fatal error, else False.

Return type

bool

name()

Returns the enum name for error/event.

Returns

error/event enum name string

Return type

str

retriable()
Returns

True if the operation that failed may be retried, else False.

Return type

bool

str()

Returns the human-readable error/event string.

Returns

error/event message string

Return type

str

txn_requires_abort()
Returns

True if the error is an abortable transaction error in which case application must abort the current transaction with abort_transaction() and start a new transaction with begin_transaction() if it wishes to proceed with transactional operations. This will only return true for errors from the transactional producer API.

Return type

bool

KafkaException

class confluent_kafka.KafkaException

Kafka exception that wraps the KafkaError class.

Use exception.args[0] to extract the KafkaError object

ConsumeError

class confluent_kafka.error.ConsumeError(kafka_error, exception=None, kafka_message=None)[source]

Wraps all errors encountered during the consumption of a message.

Note

In the event of a serialization error the original message contents may be retrieved from the kafka_message attribute.

Parameters
  • kafka_error (KafkaError) – KafkaError instance.

  • exception (Exception, optional) – The original exception

  • kafka_message (Message, optional) – The Kafka Message

  • by the broker. (returned) –

ProduceError

class confluent_kafka.error.ProduceError(kafka_error, exception=None)[source]

Wraps all errors encountered when Producing messages.

Parameters
  • kafka_error (KafkaError) – KafkaError instance.

  • exception (Exception, optional) – The original exception.

SerializationError

class confluent_kafka.error.SerializationError[source]

Generic error from serializer package

KeySerializationError

class confluent_kafka.error.KeySerializationError(exception=None)[source]

Wraps all errors encountered during the serialization of a Message key.

Parameters

exception (Exception) – The exception that occurred during serialization.

ValueSerializationError

class confluent_kafka.error.ValueSerializationError(exception=None)[source]

Wraps all errors encountered during the serialization of a Message value.

Parameters

exception (Exception) – The exception that occurred during serialization.

KeyDeserializationError

class confluent_kafka.error.KeyDeserializationError(exception=None, kafka_message=None)[source]

Wraps all errors encountered during the deserialization of a Kafka Message’s key.

Parameters
  • exception (Exception, optional) – The original exception

  • kafka_message (Message, optional) – The Kafka Message returned

  • the broker. (by) –

ValueDeserializationError

class confluent_kafka.error.ValueDeserializationError(exception=None, kafka_message=None)[source]

Wraps all errors encountered during the deserialization of a Kafka Message’s value.

Parameters
  • exception (Exception, optional) – The original exception

  • kafka_message (Message, optional) – The Kafka Message returned

  • the broker. (by) –

Offset

Logical offset constants:

  • OFFSET_BEGINNING - Beginning of partition (oldest offset)

  • OFFSET_END - End of partition (next offset)

  • OFFSET_STORED - Use stored/committed offset

  • OFFSET_INVALID - Invalid/Default offset

ThrottleEvent

class confluent_kafka.ThrottleEvent(broker_name, broker_id, throttle_time)[source]

ThrottleEvent contains details about a throttled request. Set up a throttle callback by setting the throttle_cb configuration property to a callable that takes a ThrottleEvent object as its only argument. The callback will be triggered from poll(), consume() or flush() when a request has been throttled by the broker.

This class is typically not user instantiated.

Variables
  • broker_name (str) – The hostname of the broker which throttled the request

  • broker_id (int) – The broker id

  • throttle_time (float) – The amount of time (in seconds) the broker throttled (delayed) the request

IsolationLevel

class confluent_kafka.IsolationLevel(value)[source]

Enum for Kafka isolation levels.

READ_UNCOMMITTED = 0

Receive all the offsets.

READ_COMMITTED = 1

Skip offsets belonging to an aborted transaction.

AvroProducer (Legacy)

class confluent_kafka.avro.AvroProducer(config, default_key_schema=None, default_value_schema=None, schema_registry=None, **kwargs)[source]

Deprecated since version 2.0.2.

This class will be removed in a future version of the library.

Kafka Producer client which does avro schema encoding to messages. Handles schema registration, Message serialization.

Constructor arguments:

Parameters
  • config (dict) – Config parameters containing url for schema registry (schema.registry.url) and the standard Kafka client configuration (bootstrap.servers et.al).

  • default_key_schema (str) – Optional default avro schema for key

  • default_value_schema (str) – Optional default avro schema for value

produce(**kwargs)[source]

Asynchronously sends message to Kafka by encoding with specified or default avro schema.

Parameters
  • topic (str) – topic name

  • value (object) – An object to serialize

  • value_schema (str) – Avro schema for value

  • key (object) – An object to serialize

  • key_schema (str) – Avro schema for key

Plus any other parameters accepted by confluent_kafka.Producer.produce

Raises
  • SerializerError – On serialization failure

  • BufferError – If producer queue is full.

  • KafkaException – For other produce failures.

AvroConsumer (Legacy)

class confluent_kafka.avro.AvroConsumer(config, schema_registry=None, reader_key_schema=None, reader_value_schema=None, **kwargs)[source]

Deprecated since version 2.0.2.

This class will be removed in a future version of the library.

Kafka Consumer client which does avro schema decoding of messages. Handles message deserialization.

Constructor arguments:

Parameters
  • config (dict) – Config parameters containing url for schema registry (schema.registry.url) and the standard Kafka client configuration (bootstrap.servers et.al)

  • reader_key_schema (schema) – a reader schema for the message key

  • reader_value_schema (schema) – a reader schema for the message value

Raises

ValueError – For invalid configurations

poll(timeout=None)[source]

This is an overriden method from confluent_kafka.Consumer class. This handles message deserialization using avro schema

Parameters

timeout (float) – Poll timeout in seconds (default: indefinite)

Returns

message object with deserialized key and value as dict objects

Return type

Message

Transactional API

The transactional producer operates on top of the idempotent producer, and provides full exactly-once semantics (EOS) for Apache Kafka when used with the transaction aware consumer (isolation.level=read_committed).

A producer instance is configured for transactions by setting the transactional.id to an identifier unique for the application. This id will be used to fence stale transactions from previous instances of the application, typically following an outage or crash.

After creating the transactional producer instance the transactional state must be initialized by calling confluent_kafka.Producer.init_transactions(). This is a blocking call that will acquire a runtime producer id from the transaction coordinator broker as well as abort any stale transactions and fence any still running producer instances with the same transactional.id.

Once transactions are initialized the application may begin a new transaction by calling confluent_kafka.Producer.begin_transaction(). A producer instance may only have one single on-going transaction.

Any messages produced after the transaction has been started will belong to the ongoing transaction and will be committed or aborted atomically. It is not permitted to produce messages outside a transaction boundary, e.g., before confluent_kafka.Producer.begin_transaction() or after confluent_kafka.commit_transaction(), confluent_kafka.Producer.abort_transaction(), or after the current transaction has failed.

If consumed messages are used as input to the transaction, the consumer instance must be configured with enable.auto.commit set to false. To commit the consumed offsets along with the transaction pass the list of consumed partitions and the last offset processed + 1 to confluent_kafka.Producer.send_offsets_to_transaction() prior to committing the transaction. This allows an aborted transaction to be restarted using the previously committed offsets.

To commit the produced messages, and any consumed offsets, to the current transaction, call confluent_kafka.Producer.commit_transaction(). This call will block until the transaction has been fully committed or failed (typically due to fencing by a newer producer instance).

Alternatively, if processing fails, or an abortable transaction error is raised, the transaction needs to be aborted by calling confluent_kafka.Producer.abort_transaction() which marks any produced messages and offset commits as aborted.

After the current transaction has been committed or aborted a new transaction may be started by calling confluent_kafka.Producer.begin_transaction() again.

Retriable errors

Some error cases allow the attempted operation to be retried, this is indicated by the error object having the retriable flag set which can be detected by calling confluent_kafka.KafkaError.retriable() on the KafkaError object. When this flag is set the application may retry the operation immediately or preferably after a shorter grace period (to avoid busy-looping). Retriable errors include timeouts, broker transport failures, etc.

Abortable errors

An ongoing transaction may fail permanently due to various errors, such as transaction coordinator becoming unavailable, write failures to the Apache Kafka log, under-replicated partitions, etc. At this point the producer application must abort the current transaction using confluent_kafka.Producer.abort_transaction() and optionally start a new transaction by calling confluent_kafka.Producer.begin_transaction(). Whether an error is abortable or not is detected by calling confluent_kafka.KafkaError.txn_requires_abort().

Fatal errors

While the underlying idempotent producer will typically only raise fatal errors for unrecoverable cluster errors where the idempotency guarantees can’t be maintained, most of these are treated as abortable by the transactional producer since transactions may be aborted and retried in their entirety; The transactional producer on the other hand introduces a set of additional fatal errors which the application needs to handle by shutting down the producer and terminate. There is no way for a producer instance to recover from fatal errors. Whether an error is fatal or not is detected by calling confluent_kafka.KafkaError.fatal().

Handling of other errors

For errors that have neither retriable, abortable or the fatal flag set it is not always obvious how to handle them. While some of these errors may be indicative of bugs in the application code, such as when an invalid parameter is passed to a method, other errors might originate from the broker and be passed thru as-is to the application. The general recommendation is to treat these errors, that have neither the retriable or abortable flags set, as fatal.

Error handling example

while True:
   try:
       producer.commit_transaction(10.0)
       break
   except KafkaException as e:
       if e.args[0].retriable():
          # retriable error, try again
          continue
       elif e.args[0].txn_requires_abort():
          # abort current transaction, begin a new transaction,
          # and rewind the consumer to start over.
          producer.abort_transaction()
          producer.begin_transaction()
          rewind_consumer_offsets...()
       else:
           # treat all other errors as fatal
           raise

Kafka Client Configuration

Configuration of producer and consumer instances is performed by providing a dict of configuration properties to the instance constructor, e.g.

conf = {'bootstrap.servers': 'mybroker.com',
        'group.id': 'mygroup',
        'session.timeout.ms': 6000,
        'on_commit': my_commit_callback,
        'auto.offset.reset': 'earliest'}
consumer = confluent_kafka.Consumer(conf)

The Python client provides the following configuration properties in addition to the properties dictated by the underlying librdkafka C library:

  • default.topic.config: value is a dict of client topic-level configuration properties that are applied to all used topics for the instance. DEPRECATED: topic configuration should now be specified in the global top-level configuration.

  • error_cb(kafka.KafkaError): Callback for generic/global error events, these errors are typically to be considered informational since the client will automatically try to recover. This callback is served upon calling client.poll() or producer.flush().

  • throttle_cb(confluent_kafka.ThrottleEvent): Callback for throttled request reporting. This callback is served upon calling client.poll() or producer.flush().

  • stats_cb(json_str): Callback for statistics data. This callback is triggered by poll() or flush every statistics.interval.ms (needs to be configured separately). Function argument json_str is a str instance of a JSON document containing statistics data. This callback is served upon calling client.poll() or producer.flush(). See https://github.com/edenhill/librdkafka/wiki/Statistics” for more information.

  • oauth_cb(config_str): Callback for retrieving OAuth Bearer token. Function argument config_str is a str from config: sasl.oauthbearer.config. Return value of this callback is expected to be (token_str, expiry_time) tuple where expiry_time is the time in seconds since the epoch as a floating point number. This callback is useful only when sasl.mechanisms=OAUTHBEARER is set and is served to get the initial token before a successful broker connection can be made. The callback can be triggered by calling client.poll() or producer.flush().

  • on_delivery(kafka.KafkaError, kafka.Message) (Producer): value is a Python function reference that is called once for each produced message to indicate the final delivery result (success or failure). This property may also be set per-message by passing callback=callable (or on_delivery=callable) to the confluent_kafka.Producer.produce() function. Currently message headers are not supported on the message returned to the callback. The msg.headers() will return None even if the original message had headers set. This callback is served upon calling producer.poll() or producer.flush().

  • on_commit(kafka.KafkaError, list(kafka.TopicPartition)) (Consumer): Callback used to indicate success or failure of asynchronous and automatic commit requests. This callback is served upon calling consumer.poll(). Is not triggered for synchronous commits. Callback arguments: KafkaError is the commit error, or None on success. list(TopicPartition) is the list of partitions with their committed offsets or per-partition errors.

  • logger=logging.Handler kwarg: forward logs from the Kafka client to the provided logging.Handler instance. To avoid spontaneous calls from non-Python threads the log messages will only be forwarded when client.poll() or producer.flush() are called. For example:

mylogger = logging.getLogger()
mylogger.addHandler(logging.StreamHandler())
producer = confluent_kafka.Producer({'bootstrap.servers': 'mybroker.com'}, logger=mylogger)

Note

In the Python client, the logger configuration property is used for log handler, not log_cb.

For the full range of configuration properties, please consult librdkafka’s documentation: https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md