librdkafka
The Apache Kafka C/C++ client library
RdKafka::Consumer Class Referenceabstract

Simple Consumer (legacy) More...

#include <rdkafkacpp.h>

Inheritance diagram for RdKafka::Consumer:
Collaboration diagram for RdKafka::Consumer:

Public Member Functions

virtual ErrorCode start (Topic *topic, int32_t partition, int64_t offset)=0
 Start consuming messages for topic and partition at offset offset which may either be a proper offset (0..N) or one of the the special offsets: OFFSET_BEGINNING or OFFSET_END. More...
 
virtual ErrorCode start (Topic *topic, int32_t partition, int64_t offset, Queue *queue)=0
 Start consuming messages for topic and partition on queue queue. More...
 
virtual ErrorCode stop (Topic *topic, int32_t partition)=0
 Stop consuming messages for topic and partition, purging all messages currently in the local queue. More...
 
virtual ErrorCode seek (Topic *topic, int32_t partition, int64_t offset, int timeout_ms)=0
 Seek consumer for topic+partition to offset which is either an absolute or logical offset. More...
 
virtual Messageconsume (Topic *topic, int32_t partition, int timeout_ms)=0
 Consume a single message from topic and partition. More...
 
virtual Messageconsume (Queue *queue, int timeout_ms)=0
 Consume a single message from the specified queue. More...
 
virtual int consume_callback (Topic *topic, int32_t partition, int timeout_ms, ConsumeCb *consume_cb, void *opaque)=0
 Consumes messages from topic and partition, calling the provided callback for each consumed messsage. More...
 
virtual int consume_callback (Queue *queue, int timeout_ms, RdKafka::ConsumeCb *consume_cb, void *opaque)=0
 Consumes messages from queue, calling the provided callback for each consumed messsage. More...
 
- Public Member Functions inherited from RdKafka::Handle
virtual const std::string name () const =0
 
virtual const std::string memberid () const =0
 Returns the client's broker-assigned group member id. More...
 
virtual int poll (int timeout_ms)=0
 Polls the provided kafka handle for events. More...
 
virtual int outq_len ()=0
 Returns the current out queue length. More...
 
virtual ErrorCode metadata (bool all_topics, const Topic *only_rkt, Metadata **metadatap, int timeout_ms)=0
 Request Metadata from broker. More...
 
virtual ErrorCode pause (std::vector< TopicPartition * > &partitions)=0
 Pause producing or consumption for the provided list of partitions. More...
 
virtual ErrorCode resume (std::vector< TopicPartition * > &partitions)=0
 Resume producing or consumption for the provided list of partitions. More...
 
virtual ErrorCode query_watermark_offsets (const std::string &topic, int32_t partition, int64_t *low, int64_t *high, int timeout_ms)=0
 Query broker for low (oldest/beginning) and high (newest/end) offsets for partition. More...
 
virtual ErrorCode get_watermark_offsets (const std::string &topic, int32_t partition, int64_t *low, int64_t *high)=0
 Get last known low (oldest/beginning) and high (newest/end) offsets for partition. More...
 
virtual ErrorCode offsetsForTimes (std::vector< TopicPartition * > &offsets, int timeout_ms)=0
 Look up the offsets for the given partitions by timestamp. More...
 
virtual Queueget_partition_queue (const TopicPartition *partition)=0
 Retrieve queue for a given partition. More...
 
virtual ErrorCode set_log_queue (Queue *queue)=0
 Forward librdkafka logs (and debug) to the specified queue for serving with one of the ..poll() calls. More...
 
virtual void yield ()=0
 Cancels the current callback dispatcher (Handle::poll(), KafkaConsumer::consume(), etc). More...
 
virtual const std::string clusterid (int timeout_ms)=0
 Returns the ClusterId as reported in broker metadata. More...
 
virtual struct rd_kafka_s * c_ptr ()=0
 Returns the underlying librdkafka C rd_kafka_t handle. More...
 
virtual int32_t controllerid (int timeout_ms)=0
 Returns the current ControllerId (controller broker id) as reported in broker metadata. More...
 
virtual ErrorCode fatal_error (std::string &errstr) const =0
 Returns the first fatal error set on this client instance, or ERR_NO_ERROR if no fatal error has occurred. More...
 
virtual ErrorCode oauthbearer_set_token (const std::string &token_value, int64_t md_lifetime_ms, const std::string &md_principal_name, const std::list< std::string > &extensions, std::string &errstr)=0
 Set SASL/OAUTHBEARER token and metadata. More...
 
virtual ErrorCode oauthbearer_set_token_failure (const std::string &errstr)=0
 SASL/OAUTHBEARER token refresh failure indicator. More...
 
virtual void * mem_malloc (size_t size)=0
 Allocate memory using the same allocator librdkafka uses. More...
 
virtual void mem_free (void *ptr)=0
 Free pointer returned by librdkafka. More...
 

Static Public Member Functions

static Consumercreate (const Conf *conf, std::string &errstr)
 Creates a new Kafka consumer handle. More...
 
static int64_t OffsetTail (int64_t offset)
 Converts an offset into the logical offset from the tail of a topic. More...
 

Detailed Description

Simple Consumer (legacy)

A simple non-balanced, non-group-aware, consumer.

Member Function Documentation

◆ create()

static Consumer* RdKafka::Consumer::create ( const Conf conf,
std::string &  errstr 
)
static

Creates a new Kafka consumer handle.

conf is an optional object that will be used instead of the default configuration. The conf object is reusable after this call.

Returns
the new handle on success or NULL on error in which case errstr is set to a human readable error message.

◆ start() [1/2]

virtual ErrorCode RdKafka::Consumer::start ( Topic topic,
int32_t  partition,
int64_t  offset 
)
pure virtual

Start consuming messages for topic and partition at offset offset which may either be a proper offset (0..N) or one of the the special offsets: OFFSET_BEGINNING or OFFSET_END.

rdkafka will attempt to keep queued.min.messages (config property) messages in the local queue by repeatedly fetching batches of messages from the broker until the threshold is reached.

The application shall use one of the ..->consume*() functions to consume messages from the local queue, each kafka message being represented as a RdKafka::Message * object.

..->start() must not be called multiple times for the same topic and partition without stopping consumption first with ..->stop().

Returns
an ErrorCode to indicate success or failure.

◆ start() [2/2]

virtual ErrorCode RdKafka::Consumer::start ( Topic topic,
int32_t  partition,
int64_t  offset,
Queue queue 
)
pure virtual

Start consuming messages for topic and partition on queue queue.

See also
RdKafka::Consumer::start()

◆ stop()

virtual ErrorCode RdKafka::Consumer::stop ( Topic topic,
int32_t  partition 
)
pure virtual

Stop consuming messages for topic and partition, purging all messages currently in the local queue.

The application needs to be stop all consumers before destroying the Consumer handle.

Returns
an ErrorCode to indicate success or failure.

◆ seek()

virtual ErrorCode RdKafka::Consumer::seek ( Topic topic,
int32_t  partition,
int64_t  offset,
int  timeout_ms 
)
pure virtual

Seek consumer for topic+partition to offset which is either an absolute or logical offset.

If timeout_ms is not 0 the call will wait this long for the seek to be performed. If the timeout is reached the internal state will be unknown and this function returns ERR__TIMED_OUT. If timeout_ms is 0 it will initiate the seek but return immediately without any error reporting (e.g., async).

This call triggers a fetch queue barrier flush.

Returns
an ErrorCode to indicate success or failure.

◆ consume() [1/2]

virtual Message* RdKafka::Consumer::consume ( Topic topic,
int32_t  partition,
int  timeout_ms 
)
pure virtual

Consume a single message from topic and partition.

timeout_ms is maximum amount of time to wait for a message to be received. Consumer must have been previously started with ..->start().

Returns
a Message object, the application needs to check if message is an error or a proper message RdKafka::Message::err() and checking for ERR_NO_ERROR.

The message object must be destroyed when the application is done with it.

Errors (in RdKafka::Message::err()):

  • ERR__TIMED_OUT - timeout_ms was reached with no new messages fetched.
  • ERR__PARTITION_EOF - End of partition reached, not an error.

◆ consume() [2/2]

virtual Message* RdKafka::Consumer::consume ( Queue queue,
int  timeout_ms 
)
pure virtual

Consume a single message from the specified queue.

timeout_ms is maximum amount of time to wait for a message to be received. Consumer must have been previously started on the queue with ..->start().

Returns
a Message object, the application needs to check if message is an error or a proper message Message->err() and checking for ERR_NO_ERROR.

The message object must be destroyed when the application is done with it.

Errors (in RdKafka::Message::err()):

  • ERR__TIMED_OUT - timeout_ms was reached with no new messages fetched

Note that Message->topic() may be nullptr after certain kinds of errors, so applications should check that it isn't null before dereferencing it.

◆ consume_callback() [1/2]

virtual int RdKafka::Consumer::consume_callback ( Topic topic,
int32_t  partition,
int  timeout_ms,
ConsumeCb consume_cb,
void *  opaque 
)
pure virtual

Consumes messages from topic and partition, calling the provided callback for each consumed messsage.

consume_callback() provides higher throughput performance than consume().

timeout_ms is the maximum amount of time to wait for one or more messages to arrive.

The provided consume_cb instance has its consume_cb function called for every message received.

The opaque argument is passed to the consume_cb as opaque.

Returns
the number of messages processed or -1 on error.
See also
RdKafka::Consumer::consume()

◆ consume_callback() [2/2]

virtual int RdKafka::Consumer::consume_callback ( Queue queue,
int  timeout_ms,
RdKafka::ConsumeCb consume_cb,
void *  opaque 
)
pure virtual

Consumes messages from queue, calling the provided callback for each consumed messsage.

See also
RdKafka::Consumer::consume_callback()

◆ OffsetTail()

static int64_t RdKafka::Consumer::OffsetTail ( int64_t  offset)
static

Converts an offset into the logical offset from the tail of a topic.

offset is the (positive) number of items from the end.

Returns
the logical offset for message offset from the tail, this value may be passed to Consumer::start, et.al.
Remarks
The returned logical offset is specific to librdkafka.

The documentation for this class was generated from the following file: