#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Copyright 2020 Confluent Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from .deserializing_consumer import DeserializingConsumer
from .serializing_producer import SerializingProducer
from .error import KafkaException, KafkaError
from ._model import (
Node, # noqa: F401
ConsumerGroupTopicPartitions,
ConsumerGroupState,
ConsumerGroupType,
TopicCollection,
TopicPartitionInfo,
IsolationLevel,
)
import os
from .cimpl import (
Producer,
Consumer,
Message,
TopicPartition,
Uuid,
libversion,
version,
TIMESTAMP_NOT_AVAILABLE,
TIMESTAMP_CREATE_TIME,
TIMESTAMP_LOG_APPEND_TIME,
OFFSET_BEGINNING,
OFFSET_END,
OFFSET_STORED,
OFFSET_INVALID,
)
__all__ = [
"admin",
"Consumer",
"experimental",
"KafkaError",
"KafkaException",
"kafkatest",
"libversion",
"Message",
"OFFSET_BEGINNING",
"OFFSET_END",
"OFFSET_INVALID",
"OFFSET_STORED",
"Producer",
"DeserializingConsumer",
"SerializingProducer",
"TIMESTAMP_CREATE_TIME",
"TIMESTAMP_LOG_APPEND_TIME",
"TIMESTAMP_NOT_AVAILABLE",
"TopicPartition",
"Node",
"ConsumerGroupTopicPartitions",
"ConsumerGroupState",
"ConsumerGroupType",
"Uuid",
"IsolationLevel",
"TopicCollection",
"TopicPartitionInfo",
]
__version__ = version()
[docs]class ThrottleEvent(object):
"""
ThrottleEvent contains details about a throttled request.
Set up a throttle callback by setting the ``throttle_cb`` configuration
property to a callable that takes a ThrottleEvent object as its only argument.
The callback will be triggered from poll(), consume() or flush() when a request
has been throttled by the broker.
This class is typically not user instantiated.
:ivar str broker_name: The hostname of the broker which throttled the request
:ivar int broker_id: The broker id
:ivar float throttle_time: The amount of time (in seconds) the broker throttled (delayed) the request
"""
def __init__(self, broker_name, broker_id, throttle_time):
self.broker_name = broker_name
self.broker_id = broker_id
self.throttle_time = throttle_time
def __str__(self):
return "{}/{} throttled for {} ms".format(
self.broker_name, self.broker_id, int(self.throttle_time * 1000)
)
def _resolve_plugins(plugins):
"""Resolve embedded plugins from the wheel's library directory.
For internal module use only.
:param str plugins: The plugin.library.paths value
"""
from sys import platform
# Location of __init__.py and the embedded library directory
basedir = os.path.dirname(__file__)
if platform in ("win32", "cygwin"):
paths_sep = ";"
ext = ".dll"
libdir = basedir
elif platform in ("linux", "linux2"):
paths_sep = ":"
ext = ".so"
libdir = os.path.join(basedir, ".libs")
elif platform == "darwin":
paths_sep = ":"
ext = ".dylib"
libdir = os.path.join(basedir, ".dylibs")
else:
# Unknown platform, there are probably no embedded plugins.
return plugins
if not os.path.isdir(libdir):
# No embedded library directory, probably not a wheel installation.
return plugins
resolved = []
for plugin in plugins.split(paths_sep):
if "/" in plugin or "\\" in plugin:
# Path specified, leave unchanged
resolved.append(plugin)
continue
# See if the plugin can be found in the wheel's
# embedded library directory.
# The user might not have supplied a file extension, so try both.
good = None
for file in [plugin, plugin + ext]:
fpath = os.path.join(libdir, file)
if os.path.isfile(fpath):
good = fpath
break
if good is not None:
resolved.append(good)
else:
resolved.append(plugin)
return paths_sep.join(resolved)