Source code for confluent_kafka.admin._listoffsets

# Copyright 2023 Confluent Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from abc import ABC, abstractmethod
from .. import cimpl


[docs]class OffsetSpec(ABC): """ Used in `AdminClient.list_offsets` to specify the desired offsets of the partition being queried. """ _values = {} @property @abstractmethod def _value(self): pass @classmethod def _fill_values(cls): cls._max_timestamp = MaxTimestampSpec() cls._earliest = EarliestSpec() cls._latest = LatestSpec() cls._values.update({ cimpl.OFFSET_SPEC_MAX_TIMESTAMP: cls._max_timestamp, cimpl.OFFSET_SPEC_EARLIEST: cls._earliest, cimpl.OFFSET_SPEC_LATEST: cls._latest, }) @classmethod def earliest(cls): return cls._earliest @classmethod def latest(cls): return cls._latest @classmethod def max_timestamp(cls): return cls._max_timestamp @classmethod def for_timestamp(cls, timestamp): return TimestampSpec(timestamp) def __new__(cls, index): # Trying to instantiate returns one of the subclasses. # Subclasses can be instantiated but aren't accessible externally. if index < 0: return cls._values[index] else: return cls.for_timestamp(index) def __lt__(self, other): if not isinstance(other, OffsetSpec): return NotImplemented return self._value < other._value
class TimestampSpec(OffsetSpec): """ Used in a `AdminClient.list_offsets` call to retrieve the earliest offset whose timestamp is greater than or equal to the given timestamp in the corresponding partition. Parameters ---------- timestamp: int timestamp in milliseconds. """ @property def _value(self): return self.timestamp def __new__(cls, _): return object.__new__(cls) def __init__(self, timestamp): self.timestamp = timestamp class MaxTimestampSpec(OffsetSpec): """ Used in a `AdminClient.list_offsets` call to retrieve the offset with the largest timestamp, that could not correspond to the latest one as timestamps can be specified client-side. """ def __new__(cls): return object.__new__(cls) @property def _value(self): return cimpl.OFFSET_SPEC_MAX_TIMESTAMP class LatestSpec(OffsetSpec): """ Used in a `AdminClient.list_offsets` call to retrieve the queried partition latest offset. """ def __new__(cls): return object.__new__(cls) @property def _value(self): return cimpl.OFFSET_SPEC_LATEST class EarliestSpec(OffsetSpec): """ Used in a `AdminClient.list_offsets` call to retrieve the queried partition earliest offset. """ def __new__(cls): return object.__new__(cls) @property def _value(self): return cimpl.OFFSET_SPEC_EARLIEST OffsetSpec._fill_values()
[docs]class ListOffsetsResultInfo: """ ListOffsetsResultInfo Result of a `AdminClient.list_offsets` call associated to a partition. Parameters ---------- offset: int The offset returned by the list_offsets call. timestamp: int The timestamp in milliseconds corresponding to the offset. Not available (-1) when querying for the earliest or the latest offsets. leader_epoch: int The leader epoch corresponding to the offset (optional). """ def __init__(self, offset, timestamp, leader_epoch): self.offset = offset self.timestamp = timestamp self.leader_epoch = leader_epoch if self.leader_epoch < 0: self.leader_epoch = None