librdkafka
The Apache Kafka C/C++ client library
|
Simple Consumer (legacy) More...
#include <rdkafkacpp.h>
Public Member Functions | |
virtual ErrorCode | start (Topic *topic, int32_t partition, int64_t offset)=0 |
Start consuming messages for topic and partition at offset offset which may either be a proper offset (0..N) or one of the the special offsets: OFFSET_BEGINNING or OFFSET_END . More... | |
virtual ErrorCode | start (Topic *topic, int32_t partition, int64_t offset, Queue *queue)=0 |
Start consuming messages for topic and partition on queue queue . More... | |
virtual ErrorCode | stop (Topic *topic, int32_t partition)=0 |
Stop consuming messages for topic and partition , purging all messages currently in the local queue. More... | |
virtual ErrorCode | seek (Topic *topic, int32_t partition, int64_t offset, int timeout_ms)=0 |
Seek consumer for topic+partition to offset which is either an absolute or logical offset. More... | |
virtual Message * | consume (Topic *topic, int32_t partition, int timeout_ms)=0 |
Consume a single message from topic and partition . More... | |
virtual Message * | consume (Queue *queue, int timeout_ms)=0 |
Consume a single message from the specified queue. More... | |
virtual int | consume_callback (Topic *topic, int32_t partition, int timeout_ms, ConsumeCb *consume_cb, void *opaque)=0 |
Consumes messages from topic and partition , calling the provided callback for each consumed messsage. More... | |
virtual int | consume_callback (Queue *queue, int timeout_ms, RdKafka::ConsumeCb *consume_cb, void *opaque)=0 |
Consumes messages from queue , calling the provided callback for each consumed messsage. More... | |
Public Member Functions inherited from RdKafka::Handle | |
virtual std::string | name () const =0 |
virtual std::string | memberid () const =0 |
Returns the client's broker-assigned group member id. More... | |
virtual int | poll (int timeout_ms)=0 |
Polls the provided kafka handle for events. More... | |
virtual int | outq_len ()=0 |
Returns the current out queue length. More... | |
virtual ErrorCode | metadata (bool all_topics, const Topic *only_rkt, Metadata **metadatap, int timeout_ms)=0 |
Request Metadata from broker. More... | |
virtual ErrorCode | pause (std::vector< TopicPartition * > &partitions)=0 |
Pause producing or consumption for the provided list of partitions. More... | |
virtual ErrorCode | resume (std::vector< TopicPartition * > &partitions)=0 |
Resume producing or consumption for the provided list of partitions. More... | |
virtual ErrorCode | query_watermark_offsets (const std::string &topic, int32_t partition, int64_t *low, int64_t *high, int timeout_ms)=0 |
Query broker for low (oldest/beginning) and high (newest/end) offsets for partition. More... | |
virtual ErrorCode | get_watermark_offsets (const std::string &topic, int32_t partition, int64_t *low, int64_t *high)=0 |
Get last known low (oldest/beginning) and high (newest/end) offsets for partition. More... | |
virtual ErrorCode | offsetsForTimes (std::vector< TopicPartition * > &offsets, int timeout_ms)=0 |
Look up the offsets for the given partitions by timestamp. More... | |
virtual Queue * | get_partition_queue (const TopicPartition *partition)=0 |
Retrieve queue for a given partition. More... | |
virtual ErrorCode | set_log_queue (Queue *queue)=0 |
Forward librdkafka logs (and debug) to the specified queue for serving with one of the ..poll() calls. More... | |
virtual void | yield ()=0 |
Cancels the current callback dispatcher (Handle::poll(), KafkaConsumer::consume(), etc). More... | |
virtual std::string | clusterid (int timeout_ms)=0 |
Returns the ClusterId as reported in broker metadata. More... | |
virtual struct rd_kafka_s * | c_ptr ()=0 |
Returns the underlying librdkafka C rd_kafka_t handle. More... | |
virtual int32_t | controllerid (int timeout_ms)=0 |
Returns the current ControllerId (controller broker id) as reported in broker metadata. More... | |
virtual ErrorCode | fatal_error (std::string &errstr) const =0 |
Returns the first fatal error set on this client instance, or ERR_NO_ERROR if no fatal error has occurred. More... | |
virtual ErrorCode | oauthbearer_set_token (const std::string &token_value, int64_t md_lifetime_ms, const std::string &md_principal_name, const std::list< std::string > &extensions, std::string &errstr)=0 |
Set SASL/OAUTHBEARER token and metadata. More... | |
virtual ErrorCode | oauthbearer_set_token_failure (const std::string &errstr)=0 |
SASL/OAUTHBEARER token refresh failure indicator. More... | |
virtual Error * | sasl_background_callbacks_enable ()=0 |
Enable SASL OAUTHBEARER refresh callbacks on the librdkafka background thread. More... | |
virtual Queue * | get_sasl_queue ()=0 |
virtual Queue * | get_background_queue ()=0 |
virtual void * | mem_malloc (size_t size)=0 |
Allocate memory using the same allocator librdkafka uses. More... | |
virtual void | mem_free (void *ptr)=0 |
Free pointer returned by librdkafka. More... | |
virtual Error * | sasl_set_credentials (const std::string &username, const std::string &password)=0 |
Sets SASL credentials used for SASL PLAIN and SCRAM mechanisms by this Kafka client. More... | |
Static Public Member Functions | |
static Consumer * | create (const Conf *conf, std::string &errstr) |
Creates a new Kafka consumer handle. More... | |
static int64_t | OffsetTail (int64_t offset) |
Converts an offset into the logical offset from the tail of a topic. More... | |
Simple Consumer (legacy)
A simple non-balanced, non-group-aware, consumer.
Creates a new Kafka consumer handle.
conf
is an optional object that will be used instead of the default configuration. The conf
object is reusable after this call.
errstr
is set to a human readable error message.
|
pure virtual |
Start consuming messages for topic and partition
at offset offset
which may either be a proper offset (0..N) or one of the the special offsets: OFFSET_BEGINNING
or OFFSET_END
.
rdkafka will attempt to keep queued.min.messages
(config property) messages in the local queue by repeatedly fetching batches of messages from the broker until the threshold is reached.
The application shall use one of the ..->consume*() functions to consume messages from the local queue, each kafka message being represented as a
RdKafka::Message *
object.
..->start() must not be called multiple times for the same topic and partition without stopping consumption first with
..->stop().
|
pure virtual |
Start consuming messages for topic and partition
on queue queue
.
Stop consuming messages for topic and partition
, purging all messages currently in the local queue.
The application needs to be stop all consumers before destroying the Consumer handle.
|
pure virtual |
Seek consumer for topic+partition to offset
which is either an absolute or logical offset.
If timeout_ms
is not 0 the call will wait this long for the seek to be performed. If the timeout is reached the internal state will be unknown and this function returns ERR__TIMED_OUT
. If timeout_ms
is 0 it will initiate the seek but return immediately without any error reporting (e.g., async).
This call triggers a fetch queue barrier flush.
|
pure virtual |
Consume a single message from topic
and partition
.
timeout_ms
is maximum amount of time to wait for a message to be received. Consumer must have been previously started with ..->start().
ERR_NO_ERROR
.The message object must be destroyed when the application is done with it.
Errors (in RdKafka::Message::err()):
timeout_ms
was reached with no new messages fetched.Consume a single message from the specified queue.
timeout_ms
is maximum amount of time to wait for a message to be received. Consumer must have been previously started on the queue with ..->start().
Message->err()
and checking for ERR_NO_ERROR
.The message object must be destroyed when the application is done with it.
Errors (in RdKafka::Message::err()):
timeout_ms
was reached with no new messages fetchedNote that Message->topic() may be nullptr after certain kinds of errors, so applications should check that it isn't null before dereferencing it.
|
pure virtual |
Consumes messages from topic
and partition
, calling the provided callback for each consumed messsage.
consume_callback()
provides higher throughput performance than consume()
.
timeout_ms
is the maximum amount of time to wait for one or more messages to arrive.
The provided consume_cb
instance has its consume_cb
function called for every message received.
The opaque
argument is passed to the consume_cb
as opaque
.
|
pure virtual |
Consumes messages from queue
, calling the provided callback for each consumed messsage.
|
static |
Converts an offset into the logical offset from the tail of a topic.
offset
is the (positive) number of items from the end.
offset
from the tail, this value may be passed to Consumer::start, et.al.