Source code for confluent_kafka.admin._config

# Copyright 2022 Confluent Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from enum import Enum
import functools
from .. import cimpl as _cimpl
from ._resource import ResourceType


[docs]class AlterConfigOpType(Enum): """ Set of incremental operations that can be used with incremental alter configs. """ #: Set the value of the configuration entry. SET = _cimpl.ALTER_CONFIG_OP_TYPE_SET #: Revert the configuration entry #: to the default value (possibly null). DELETE = _cimpl.ALTER_CONFIG_OP_TYPE_DELETE #: (For list-type configuration entries only.) #: Add the specified values #: to the current list of values #: of the configuration entry. APPEND = _cimpl.ALTER_CONFIG_OP_TYPE_APPEND #: (For list-type configuration entries only.) #: Removes the specified values #: from the current list of values #: of the configuration entry. SUBTRACT = _cimpl.ALTER_CONFIG_OP_TYPE_SUBTRACT
[docs]class ConfigSource(Enum): """ Enumerates the different sources of configuration properties. Used by ConfigEntry to specify the source of configuration properties returned by `describe_configs()`. """ UNKNOWN_CONFIG = _cimpl.CONFIG_SOURCE_UNKNOWN_CONFIG #: Unknown DYNAMIC_TOPIC_CONFIG = _cimpl.CONFIG_SOURCE_DYNAMIC_TOPIC_CONFIG #: Dynamic Topic DYNAMIC_BROKER_CONFIG = _cimpl.CONFIG_SOURCE_DYNAMIC_BROKER_CONFIG #: Dynamic Broker DYNAMIC_DEFAULT_BROKER_CONFIG = _cimpl.CONFIG_SOURCE_DYNAMIC_DEFAULT_BROKER_CONFIG #: Dynamic Default Broker STATIC_BROKER_CONFIG = _cimpl.CONFIG_SOURCE_STATIC_BROKER_CONFIG #: Static Broker DEFAULT_CONFIG = _cimpl.CONFIG_SOURCE_DEFAULT_CONFIG #: Default
[docs]class ConfigEntry(object): """ Represents a configuration property. Returned by describe_configs() for each configuration entry of the specified resource. This class is typically not user instantiated. """ def __init__(self, name, value, source=ConfigSource.UNKNOWN_CONFIG, is_read_only=False, is_default=False, is_sensitive=False, is_synonym=False, synonyms=[], incremental_operation=None): """ This class is typically not user instantiated. """ super(ConfigEntry, self).__init__() self.name = name """Configuration property name.""" self.value = value """Configuration value (or None if not set or is_sensitive==True. Ignored when altering configurations incrementally if incremental_operation is DELETE).""" self.source = source """Configuration source.""" self.is_read_only = bool(is_read_only) """Indicates whether the configuration property is read-only.""" self.is_default = bool(is_default) """Indicates whether the configuration property is using its default value.""" self.is_sensitive = bool(is_sensitive) """ Indicates whether the configuration property value contains sensitive information (such as security settings), in which case .value is None.""" self.is_synonym = bool(is_synonym) """Indicates whether the configuration property is a synonym for the parent configuration entry.""" self.synonyms = synonyms """A list of synonyms (ConfigEntry) and alternate sources for this configuration property.""" self.incremental_operation = incremental_operation """The incremental operation (AlterConfigOpType) to use in incremental_alter_configs.""" def __repr__(self): return "ConfigEntry(%s=\"%s\")" % (self.name, self.value) def __str__(self): return "%s=\"%s\"" % (self.name, self.value)
[docs]@functools.total_ordering class ConfigResource(object): """ Represents a resource that has configuration, and (optionally) a collection of configuration properties for that resource. Used by describe_configs() and alter_configs(). Parameters ---------- restype : `ConfigResource.Type` The resource type. name : `str` The resource name, which depends on the resource type. For RESOURCE_BROKER, the resource name is the broker id. set_config : `dict` The configuration to set/overwrite. Dictionary of str, str. """ Type = ResourceType def __init__(self, restype, name, set_config=None, described_configs=None, error=None, incremental_configs=None): """ :param ConfigResource.Type restype: Resource type. :param str name: The resource name, which depends on restype. For RESOURCE_BROKER, the resource name is the broker id. :param dict set_config: The configuration to set/overwrite. Dictionary of str, str. :param list(ConfigEntry) incremental_configs: The configuration entries to alter incrementally. :param dict described_configs: For internal use only. :param KafkaError error: For internal use only. """ super(ConfigResource, self).__init__() if name is None: raise ValueError("Expected resource name to be a string") if isinstance(restype, str): # Allow resource type to be specified as case-insensitive string, for convenience. try: restype = ConfigResource.Type[restype.upper()] except KeyError: raise ValueError("Unknown resource type \"%s\": should be a ConfigResource.Type" % restype) elif isinstance(restype, int): # The C-code passes restype as an int, convert to Type. restype = ConfigResource.Type(restype) self.restype = restype self.restype_int = int(self.restype.value) # for the C code self.name = name if set_config is not None: self.set_config_dict = set_config.copy() else: self.set_config_dict = dict() self.incremental_configs = list(incremental_configs or []) self.configs = described_configs self.error = error def __repr__(self): if self.error is not None: return "ConfigResource(%s,%s,%r)" % (self.restype, self.name, self.error) else: return "ConfigResource(%s,%s)" % (self.restype, self.name) def __hash__(self): return hash((self.restype, self.name)) def __lt__(self, other): if self.restype < other.restype: return True return self.name.__lt__(other.name) def __eq__(self, other): return self.restype == other.restype and self.name == other.name def __len__(self): """ :rtype: int :returns: number of configuration entries/operations """ return len(self.set_config_dict)
[docs] def set_config(self, name, value, overwrite=True): """ Set/overwrite a configuration value. When calling alter_configs, any configuration properties that are not included in the request will be reverted to their default values. As a workaround, use describe_configs() to retrieve the current configuration and overwrite the settings you want to change. :param str name: Configuration property name :param str value: Configuration value :param bool overwrite: If True, overwrite entry if it already exists (default). If False, do nothing if entry already exists. """ if not overwrite and name in self.set_config_dict: return self.set_config_dict[name] = value
[docs] def add_incremental_config(self, config_entry): """ Add a ConfigEntry for incremental alter configs, using the configured incremental_operation. :param ConfigEntry config_entry: config entry to incrementally alter. """ self.incremental_configs.append(config_entry)