"""
Kafka admin client: create, view, alter, and delete topics and resources.
"""
from ..cimpl import (KafkaException, # noqa
_AdminClientImpl,
NewTopic,
NewPartitions,
CONFIG_SOURCE_UNKNOWN_CONFIG,
CONFIG_SOURCE_DYNAMIC_TOPIC_CONFIG,
CONFIG_SOURCE_DYNAMIC_BROKER_CONFIG,
CONFIG_SOURCE_DYNAMIC_DEFAULT_BROKER_CONFIG,
CONFIG_SOURCE_STATIC_BROKER_CONFIG,
CONFIG_SOURCE_DEFAULT_CONFIG,
RESOURCE_UNKNOWN,
RESOURCE_ANY,
RESOURCE_TOPIC,
RESOURCE_GROUP,
RESOURCE_BROKER)
import concurrent.futures
import functools
from enum import Enum
[docs]class ConfigSource(Enum):
"""
Enumerates the different sources of configuration properties.
Used by ConfigEntry to specify the
source of configuration properties returned by `describe_configs()`.
"""
UNKNOWN_CONFIG = CONFIG_SOURCE_UNKNOWN_CONFIG #: Unknown
DYNAMIC_TOPIC_CONFIG = CONFIG_SOURCE_DYNAMIC_TOPIC_CONFIG #: Dynamic Topic
DYNAMIC_BROKER_CONFIG = CONFIG_SOURCE_DYNAMIC_BROKER_CONFIG #: Dynamic Broker
DYNAMIC_DEFAULT_BROKER_CONFIG = CONFIG_SOURCE_DYNAMIC_DEFAULT_BROKER_CONFIG #: Dynamic Default Broker
STATIC_BROKER_CONFIG = CONFIG_SOURCE_STATIC_BROKER_CONFIG #: Static Broker
DEFAULT_CONFIG = CONFIG_SOURCE_DEFAULT_CONFIG #: Default
[docs]class ConfigEntry(object):
"""
Represents a configuration property. Returned by describe_configs() for each configuration
entry of the specified resource.
This class is typically not user instantiated.
"""
def __init__(self, name, value,
source=ConfigSource.UNKNOWN_CONFIG,
is_read_only=False,
is_default=False,
is_sensitive=False,
is_synonym=False,
synonyms=[]):
"""
This class is typically not user instantiated.
"""
super(ConfigEntry, self).__init__()
self.name = name
"""Configuration property name."""
self.value = value
"""Configuration value (or None if not set or is_sensitive==True)."""
self.source = source
"""Configuration source."""
self.is_read_only = bool(is_read_only)
"""Indicates whether the configuration property is read-only."""
self.is_default = bool(is_default)
"""Indicates whether the configuration property is using its default value."""
self.is_sensitive = bool(is_sensitive)
"""
Indicates whether the configuration property value contains
sensitive information (such as security settings), in which
case .value is None."""
self.is_synonym = bool(is_synonym)
"""Indicates whether the configuration property is a synonym for the parent configuration entry."""
self.synonyms = synonyms
"""A list of synonyms (ConfigEntry) and alternate sources for this configuration property."""
def __repr__(self):
return "ConfigEntry(%s=\"%s\")" % (self.name, self.value)
def __str__(self):
return "%s=\"%s\"" % (self.name, self.value)
[docs]@functools.total_ordering
class ConfigResource(object):
"""
Represents a resource that has configuration, and (optionally)
a collection of configuration properties for that resource. Used by
describe_configs() and alter_configs().
Parameters
----------
restype : `ConfigResource.Type`
The resource type.
name : `str`
The resource name, which depends on the resource type. For RESOURCE_BROKER, the resource name is the broker id.
set_config : `dict`
The configuration to set/overwrite. Dictionary of str, str.
"""
[docs] class Type(Enum):
"""
Enumerates the different types of Kafka resources.
"""
UNKNOWN = RESOURCE_UNKNOWN #: Resource type is not known or not set.
ANY = RESOURCE_ANY #: Match any resource, used for lookups.
TOPIC = RESOURCE_TOPIC #: Topic resource. Resource name is topic name.
GROUP = RESOURCE_GROUP #: Group resource. Resource name is group.id.
BROKER = RESOURCE_BROKER #: Broker resource. Resource name is broker id.
def __init__(self, restype, name,
set_config=None, described_configs=None, error=None):
"""
:param ConfigResource.Type restype: Resource type.
:param str name: The resource name, which depends on restype.
For RESOURCE_BROKER, the resource name is the broker id.
:param dict set_config: The configuration to set/overwrite. Dictionary of str, str.
:param dict described_configs: For internal use only.
:param KafkaError error: For internal use only.
"""
super(ConfigResource, self).__init__()
if name is None:
raise ValueError("Expected resource name to be a string")
if type(restype) == str:
# Allow resource type to be specified as case-insensitive string, for convenience.
try:
restype = ConfigResource.Type[restype.upper()]
except KeyError:
raise ValueError("Unknown resource type \"%s\": should be a ConfigResource.Type" % restype)
elif type(restype) == int:
# The C-code passes restype as an int, convert to Type.
restype = ConfigResource.Type(restype)
self.restype = restype
self.restype_int = int(self.restype.value) # for the C code
self.name = name
if set_config is not None:
self.set_config_dict = set_config.copy()
else:
self.set_config_dict = dict()
self.configs = described_configs
self.error = error
def __repr__(self):
if self.error is not None:
return "ConfigResource(%s,%s,%r)" % (self.restype, self.name, self.error)
else:
return "ConfigResource(%s,%s)" % (self.restype, self.name)
def __hash__(self):
return hash((self.restype, self.name))
def __lt__(self, other):
if self.restype < other.restype:
return True
return self.name.__lt__(other.name)
def __eq__(self, other):
return self.restype == other.restype and self.name == other.name
def __len__(self):
"""
:rtype: int
:returns: number of configuration entries/operations
"""
return len(self.set_config_dict)
[docs] def set_config(self, name, value, overwrite=True):
"""
Set/overwrite a configuration value.
When calling alter_configs, any configuration properties that are not included
in the request will be reverted to their default values. As a workaround, use
describe_configs() to retrieve the current configuration and overwrite the
settings you want to change.
:param str name: Configuration property name
:param str value: Configuration value
:param bool overwrite: If True, overwrite entry if it already exists (default).
If False, do nothing if entry already exists.
"""
if not overwrite and name in self.set_config_dict:
return
self.set_config_dict[name] = value
[docs]class AdminClient (_AdminClientImpl):
"""
AdminClient provides admin operations for Kafka brokers, topics, groups,
and other resource types supported by the broker.
The Admin API methods are asynchronous and return a dict of
concurrent.futures.Future objects keyed by the entity.
The entity is a topic name for create_topics(), delete_topics(), create_partitions(),
and a ConfigResource for alter_configs() and describe_configs().
All the futures for a single API call will currently finish/fail at
the same time (backed by the same protocol request), but this might
change in future versions of the client.
See examples/adminapi.py for example usage.
For more information see the `Java Admin API documentation
<https://docs.confluent.io/current/clients/javadocs/org/apache/kafka/clients/admin/package-frame.html>`_.
Requires broker version v0.11.0.0 or later.
"""
def __init__(self, conf):
"""
Create a new AdminClient using the provided configuration dictionary.
The AdminClient is a standard Kafka protocol client, supporting
the standard librdkafka configuration properties as specified at
https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
At least 'bootstrap.servers' should be configured.
"""
super(AdminClient, self).__init__(conf)
@staticmethod
def _make_topics_result(f, futmap):
"""
Map per-topic results to per-topic futures in futmap.
The result value of each (successful) future is None.
"""
try:
result = f.result()
for topic, error in result.items():
fut = futmap.get(topic, None)
if fut is None:
raise RuntimeError("Topic {} not found in future-map: {}".format(topic, futmap))
if error is not None:
# Topic-level exception
fut.set_exception(KafkaException(error))
else:
# Topic-level success
fut.set_result(None)
except Exception as e:
# Request-level exception, raise the same for all topics
for topic, fut in futmap.items():
fut.set_exception(e)
@staticmethod
def _make_resource_result(f, futmap):
"""
Map per-resource results to per-resource futures in futmap.
The result value of each (successful) future is a ConfigResource.
"""
try:
result = f.result()
for resource, configs in result.items():
fut = futmap.get(resource, None)
if fut is None:
raise RuntimeError("Resource {} not found in future-map: {}".format(resource, futmap))
if resource.error is not None:
# Resource-level exception
fut.set_exception(KafkaException(resource.error))
else:
# Resource-level success
# configs will be a dict for describe_configs()
# and None for alter_configs()
fut.set_result(configs)
except Exception as e:
# Request-level exception, raise the same for all resources
for resource, fut in futmap.items():
fut.set_exception(e)
@staticmethod
def _make_futures(futmap_keys, class_check, make_result_fn):
"""
Create futures and a futuremap for the keys in futmap_keys,
and create a request-level future to be bassed to the C API.
"""
futmap = {}
for key in futmap_keys:
if class_check is not None and not isinstance(key, class_check):
raise ValueError("Expected list of {}".format(type(class_check)))
futmap[key] = concurrent.futures.Future()
if not futmap[key].set_running_or_notify_cancel():
raise RuntimeError("Future was cancelled prematurely")
# Create an internal future for the entire request,
# this future will trigger _make_..._result() and set result/exception
# per topic,future in futmap.
f = concurrent.futures.Future()
f.add_done_callback(lambda f: make_result_fn(f, futmap))
if not f.set_running_or_notify_cancel():
raise RuntimeError("Future was cancelled prematurely")
return f, futmap
[docs] def create_topics(self, new_topics, **kwargs):
"""
Create one or more new topics.
:param list(NewTopic) new_topics: A list of specifictions (NewTopic) for
the topics that should be created.
:param float operation_timeout: The operation timeout in seconds,
controlling how long the CreateTopics request will block
on the broker waiting for the topic creation to propagate
in the cluster. A value of 0 returns immediately. Default: 0
:param float request_timeout: The overall request timeout in seconds,
including broker lookup, request transmission, operation time
on broker, and response. Default: `socket.timeout.ms*1000.0`
:param bool validate_only: If true, the request is only validated
without creating the topic. Default: False
:returns: A dict of futures for each topic, keyed by the topic name.
The future result() method returns None.
:rtype: dict(<topic_name, future>)
:raises KafkaException: Operation failed locally or on broker.
:raises TypeException: Invalid input.
:raises ValueException: Invalid input.
"""
f, futmap = AdminClient._make_futures([x.topic for x in new_topics],
None,
AdminClient._make_topics_result)
super(AdminClient, self).create_topics(new_topics, f, **kwargs)
return futmap
[docs] def delete_topics(self, topics, **kwargs):
"""
Delete one or more topics.
:param list(str) topics: A list of topics to mark for deletion.
:param float operation_timeout: The operation timeout in seconds,
controlling how long the DeleteTopics request will block
on the broker waiting for the topic deletion to propagate
in the cluster. A value of 0 returns immediately. Default: 0
:param float request_timeout: The overall request timeout in seconds,
including broker lookup, request transmission, operation time
on broker, and response. Default: `socket.timeout.ms*1000.0`
:returns: A dict of futures for each topic, keyed by the topic name.
The future result() method returns None.
:rtype: dict(<topic_name, future>)
:raises KafkaException: Operation failed locally or on broker.
:raises TypeException: Invalid input.
:raises ValueException: Invalid input.
"""
f, futmap = AdminClient._make_futures(topics, None,
AdminClient._make_topics_result)
super(AdminClient, self).delete_topics(topics, f, **kwargs)
return futmap
[docs] def list_topics(self, **kwargs):
return super(AdminClient, self).list_topics(**kwargs)
[docs] def list_groups(self, **kwargs):
return super(AdminClient, self).list_groups(**kwargs)
[docs] def create_partitions(self, new_partitions, **kwargs):
"""
Create additional partitions for the given topics.
:param list(NewPartitions) new_partitions: New partitions to be created.
:param float operation_timeout: The operation timeout in seconds,
controlling how long the CreatePartitions request will block
on the broker waiting for the partition creation to propagate
in the cluster. A value of 0 returns immediately. Default: 0
:param float request_timeout: The overall request timeout in seconds,
including broker lookup, request transmission, operation time
on broker, and response. Default: `socket.timeout.ms*1000.0`
:param bool validate_only: If true, the request is only validated
without creating the partitions. Default: False
:returns: A dict of futures for each topic, keyed by the topic name.
The future result() method returns None.
:rtype: dict(<topic_name, future>)
:raises KafkaException: Operation failed locally or on broker.
:raises TypeException: Invalid input.
:raises ValueException: Invalid input.
"""
f, futmap = AdminClient._make_futures([x.topic for x in new_partitions],
None,
AdminClient._make_topics_result)
super(AdminClient, self).create_partitions(new_partitions, f, **kwargs)
return futmap
[docs] def describe_configs(self, resources, **kwargs):
"""
Get the configuration of the specified resources.
:warning: Multiple resources and resource types may be requested,
but at most one resource of type RESOURCE_BROKER is allowed
per call since these resource requests must be sent to the
broker specified in the resource.
:param list(ConfigResource) resources: Resources to get the configuration for.
:param float request_timeout: The overall request timeout in seconds,
including broker lookup, request transmission, operation time
on broker, and response. Default: `socket.timeout.ms*1000.0`
:returns: A dict of futures for each resource, keyed by the ConfigResource.
The type of the value returned by the future result() method is
dict(<configname, ConfigEntry>).
:rtype: dict(<ConfigResource, future>)
:raises KafkaException: Operation failed locally or on broker.
:raises TypeException: Invalid input.
:raises ValueException: Invalid input.
"""
f, futmap = AdminClient._make_futures(resources, ConfigResource,
AdminClient._make_resource_result)
super(AdminClient, self).describe_configs(resources, f, **kwargs)
return futmap
[docs] def alter_configs(self, resources, **kwargs):
"""
Update configuration properties for the specified resources.
Updates are not transactional so they may succeed for a subset
of the provided resources while the others fail.
The configuration for a particular resource is updated atomically,
replacing the specified values while reverting unspecified configuration
entries to their default values.
:warning: alter_configs() will replace all existing configuration for
the provided resources with the new configuration given,
reverting all other configuration for the resource back
to their default values.
:warning: Multiple resources and resource types may be specified,
but at most one resource of type RESOURCE_BROKER is allowed
per call since these resource requests must be sent to the
broker specified in the resource.
:param list(ConfigResource) resources: Resources to update configuration of.
:param float request_timeout: The overall request timeout in seconds,
including broker lookup, request transmission, operation time
on broker, and response. Default: `socket.timeout.ms*1000.0`.
:param bool validate_only: If true, the request is validated only,
without altering the configuration. Default: False
:returns: A dict of futures for each resource, keyed by the ConfigResource.
The future result() method returns None.
:rtype: dict(<ConfigResource, future>)
:raises KafkaException: Operation failed locally or on broker.
:raises TypeException: Invalid input.
:raises ValueException: Invalid input.
"""
f, futmap = AdminClient._make_futures(resources, ConfigResource,
AdminClient._make_resource_result)
super(AdminClient, self).alter_configs(resources, f, **kwargs)
return futmap
[docs]class GroupMember(object):
"""Provides information about a group member.
For more information on the metadata format, refer to:
`A Guide To The Kafka Protocol <https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-GroupMembershipAPI>`_.
This class is typically not user instantiated.
""" # noqa: E501
def __init__(self,):
self.id = None
"""Member id (generated by broker)."""
self.client_id = None
"""Client id."""
self.client_host = None
"""Client hostname."""
self.metadata = None
"""Member metadata(binary), format depends on protocol type."""
self.assignment = None
"""Member assignment(binary), format depends on protocol type."""