librdkafka
The Apache Kafka C/C++ client library
 All Data Structures Files Functions Variables Typedefs Enumerations Enumerator Macros Pages
RdKafka::KafkaConsumer Class Referenceabstract

High-level KafkaConsumer (for brokers 0.9 and later) More...

#include <rdkafkacpp.h>

Inheritance diagram for RdKafka::KafkaConsumer:
Collaboration diagram for RdKafka::KafkaConsumer:

Public Member Functions

virtual ErrorCode assignment (std::vector< RdKafka::TopicPartition * > &partitions)=0
 Returns the current partition assignment as set by RdKafka::KafkaConsumer::assign()
 
virtual ErrorCode subscription (std::vector< std::string > &topics)=0
 Returns the current subscription as set by RdKafka::KafkaConsumer::subscribe()
 
virtual ErrorCode subscribe (const std::vector< std::string > &topics)=0
 Update the subscription set to topics. More...
 
virtual ErrorCode unsubscribe ()=0
 Unsubscribe from the current subscription set.
 
virtual ErrorCode assign (const std::vector< TopicPartition * > &partitions)=0
 Update the assignment set to partitions. More...
 
virtual ErrorCode unassign ()=0
 Stop consumption and remove the current assignment.
 
virtual Messageconsume (int timeout_ms)=0
 Consume message or get error event, triggers callbacks. More...
 
virtual ErrorCode commitSync ()=0
 Commit offsets for the current assignment. More...
 
virtual ErrorCode commitAsync ()=0
 Asynchronous version of RdKafka::KafkaConsumer::CommitSync() More...
 
virtual ErrorCode commitSync (Message *message)=0
 Commit offset for a single topic+partition based on message. More...
 
virtual ErrorCode commitAsync (Message *message)=0
 Commit offset for a single topic+partition based on message. More...
 
virtual ErrorCode close ()=0
 Close and shut down the proper. More...
 
- Public Member Functions inherited from RdKafka::Handle
virtual const std::string name () const =0
 
virtual const std::string memberid () const =0
 Returns the client's broker-assigned group member id. More...
 
virtual int poll (int timeout_ms)=0
 Polls the provided kafka handle for events. More...
 
virtual int outq_len ()=0
 Returns the current out queue length. More...
 
virtual ErrorCode metadata (bool all_topics, const Topic *only_rkt, Metadata **metadatap, int timeout_ms)=0
 Request Metadata from broker. More...
 

Static Public Member Functions

static KafkaConsumercreate (Conf *conf, std::string &errstr)
 Creates a KafkaConsumer. More...
 

Detailed Description

High-level KafkaConsumer (for brokers 0.9 and later)

Remarks
Requires Apache Kafka >= 0.9.0 brokers

Currently supports the range and roundrobin partition assignment strategies (see partition.assignment.strategy)

Member Function Documentation

static KafkaConsumer* RdKafka::KafkaConsumer::create ( Conf conf,
std::string &  errstr 
)
static

Creates a KafkaConsumer.

The conf object must have group.id set to the consumer group to join.

Use RdKafka::KafkaConsumer::close() to shut down the consumer.

See also
RdKafka::RebalanceCb
CONFIGURATION.md for group.id, session.timeout.ms, partition.assignment.strategy, etc.
virtual ErrorCode RdKafka::KafkaConsumer::subscribe ( const std::vector< std::string > &  topics)
pure virtual

Update the subscription set to topics.

Any previous subscription will be unassigned and unsubscribed first.

The subscription set denotes the desired topics to consume and this set is provided to the partition assignor (one of the elected group members) for all clients which then uses the configured partition.assignment.strategy to assign the subscription sets's topics's partitions to the consumers, depending on their subscription.

The result of such an assignment is a rebalancing which is either handled automatically in librdkafka or can be overriden by the application by providing a RdKafka::RebalanceCb.

The rebalancing passes the assigned partition set to RdKafka::KafkaConsumer::assign() to update what partitions are actually being fetched by the KafkaConsumer.

Regex pattern matching automatically performed for topics prefixed with "^" (e.g. "^myPfx[0-9]_.*"

virtual ErrorCode RdKafka::KafkaConsumer::assign ( const std::vector< TopicPartition * > &  partitions)
pure virtual

Update the assignment set to partitions.

The assignment set is the set of partitions actually being consumed by the KafkaConsumer.

virtual Message* RdKafka::KafkaConsumer::consume ( int  timeout_ms)
pure virtual

Consume message or get error event, triggers callbacks.

Will automatically call registered callbacks for any such queued events, including RdKafka::RebalanceCb, RdKafka::EventCb, RdKafka::OffsetCommitCb, etc.

Remarks
Use delete to free the message.
Returns
One of:
virtual ErrorCode RdKafka::KafkaConsumer::commitSync ( )
pure virtual

Commit offsets for the current assignment.

Remarks
This is the synchronous variant that blocks until offsets are committed or the commit fails (see return value).
If a RdKafka::OffsetCommitCb callback is registered it will be called with commit details on a future call to RdKafka::KafkaConsumer::consume()
Returns
ERR_NO_ERROR or error code.
virtual ErrorCode RdKafka::KafkaConsumer::commitAsync ( )
pure virtual

Asynchronous version of RdKafka::KafkaConsumer::CommitSync()

See also
RdKafka::KafkaConsummer::commitSync()
virtual ErrorCode RdKafka::KafkaConsumer::commitSync ( Message message)
pure virtual

Commit offset for a single topic+partition based on message.

Remarks
This is the synchronous variant.
See also
RdKafka::KafkaConsummer::commitSync()
virtual ErrorCode RdKafka::KafkaConsumer::commitAsync ( Message message)
pure virtual

Commit offset for a single topic+partition based on message.

Remarks
This is the asynchronous variant.
See also
RdKafka::KafkaConsummer::commitSync()
virtual ErrorCode RdKafka::KafkaConsumer::close ( )
pure virtual

Close and shut down the proper.

This call will block until the following operations are finished:

  • Trigger a local rebalance to void the current assignment
  • Stop consumption for current assignment
  • Commit offsets
  • Leave group
Remarks
Callbacks, such as RdKafka::RebalanceCb and RdKafka::OffsetCommitCb, etc, may be called.
The consumer object must later be freed with delete

The documentation for this class was generated from the following file: