# Copyright 2022 Confluent Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import List, Optional, Union
from confluent_kafka.cimpl import TopicPartition
from .._model import ConsumerGroupState, ConsumerGroupType, Node
from .._util import ConversionUtil
from ._acl import AclOperation
[docs]
class ConsumerGroupListing:
"""
Represents consumer group listing information for a group used in list consumer group operation.
Used by :class:`ListConsumerGroupsResult`.
Parameters
----------
group_id : str
The consumer group id.
is_simple_consumer_group : bool
Whether a consumer group is simple or not.
state : ConsumerGroupState
Current state of the consumer group.
type : ConsumerGroupType
Type of the consumer group.
"""
def __init__(
self,
group_id: str,
is_simple_consumer_group: bool,
state: Optional[Union[ConsumerGroupState, str, int]] = None,
type: Optional[Union[ConsumerGroupType, str, int]] = None,
) -> None:
self.group_id = group_id
self.is_simple_consumer_group = is_simple_consumer_group
if state is not None:
self.state = ConversionUtil.convert_to_enum(state, ConsumerGroupState)
if type is not None:
self.type = ConversionUtil.convert_to_enum(type, ConsumerGroupType)
[docs]
class ListConsumerGroupsResult:
"""
Represents result of List Consumer Group operation.
Used by :meth:`AdminClient.list_consumer_groups`.
Parameters
----------
valid : list(ConsumerGroupListing)
List of successful consumer group listing responses.
errors : list(KafkaException)
List of errors encountered during the operation, if any.
"""
def __init__(
self, valid: Optional[List[ConsumerGroupListing]] = None, errors: Optional[List[Exception]] = None
) -> None:
self.valid = valid
self.errors = errors
[docs]
class MemberAssignment:
"""
Represents member assignment information.
Used by :class:`MemberDescription`.
Parameters
----------
topic_partitions : list(TopicPartition)
The topic partitions assigned to a group member.
"""
def __init__(self, topic_partitions: Optional[List[TopicPartition]]) -> None:
self.topic_partitions = topic_partitions or []
[docs]
class MemberDescription:
"""
Represents member information.
Used by :class:`ConsumerGroupDescription`.
Parameters
----------
member_id : str
The consumer id of the group member.
client_id : str
The client id of the group member.
host: str
The host where the group member is running.
assignment: MemberAssignment
The assignment of the group member
target_assignment: MemberAssignment
The target assignment of the group member
group_instance_id : str
The instance id of the group member.
"""
def __init__(
self,
member_id: str,
client_id: str,
host: str,
assignment: MemberAssignment,
group_instance_id: Optional[str] = None,
target_assignment: Optional[MemberAssignment] = None,
) -> None:
self.member_id = member_id
self.client_id = client_id
self.host = host
self.assignment = assignment
self.target_assignment = target_assignment
self.group_instance_id = group_instance_id
[docs]
class ConsumerGroupDescription:
"""
Represents consumer group description information for a group used in describe consumer group operation.
Used by :meth:`AdminClient.describe_consumer_groups`.
Parameters
----------
group_id : str
The consumer group id.
is_simple_consumer_group : bool
Whether a consumer group is simple or not.
members: list(MemberDescription)
Description of the members of the consumer group.
partition_assignor: str
Partition assignor.
state : ConsumerGroupState
Current state of the consumer group.
type : ConsumerGroupType
Type of the consumer group.
coordinator: Node
Consumer group coordinator.
authorized_operations: list(AclOperation)
AclOperations allowed for the consumer group.
"""
def __init__(
self,
group_id: str,
is_simple_consumer_group: bool,
members: List[MemberDescription],
partition_assignor: str,
state: Optional[Union[ConsumerGroupState, str, int]],
coordinator: Node,
authorized_operations: Optional[List[Union[AclOperation, str, int]]] = None,
type: Union[ConsumerGroupType, str, int] = ConsumerGroupType.UNKNOWN,
) -> None:
self.group_id = group_id
self.is_simple_consumer_group = is_simple_consumer_group
self.members = members
self.authorized_operations = None
if authorized_operations:
self.authorized_operations = []
for op in authorized_operations:
self.authorized_operations.append(ConversionUtil.convert_to_enum(op, AclOperation))
self.partition_assignor = partition_assignor
if state is not None:
self.state = ConversionUtil.convert_to_enum(state, ConsumerGroupState)
if type is not None:
self.type = ConversionUtil.convert_to_enum(type, ConsumerGroupType)
self.coordinator = coordinator