The Apache Kafka C/C++ client library
 All Data Structures Files Functions Variables Typedefs Enumerations Enumerator Macros Pages
RdKafka::RebalanceCb Class Referenceabstract

KafkaConsunmer: Rebalance callback class More...

#include <rdkafkacpp.h>

Public Member Functions

virtual void rebalance_cb (RdKafka::KafkaConsumer *consumer, RdKafka::ErrorCode err, std::vector< TopicPartition * > &partitions)=0
 Group rebalance callback for use with RdKafka::KafkaConsunmer. More...

Detailed Description

KafkaConsunmer: Rebalance callback class

Member Function Documentation

virtual void RdKafka::RebalanceCb::rebalance_cb ( RdKafka::KafkaConsumer consumer,
RdKafka::ErrorCode  err,
std::vector< TopicPartition * > &  partitions 
pure virtual

Group rebalance callback for use with RdKafka::KafkaConsunmer.

Registering a rebalance_cb turns off librdkafka's automatic partition assignment/revocation and instead delegates that responsibility to the application's rebalance_cb.

The rebalance callback is responsible for updating librdkafka's assignment set based on the two events: RdKafka::ERR__ASSIGN_PARTITIONS and RdKafka::ERR__REVOKE_PARTITIONS.

Without a rebalance callback this is done automatically by librdkafka but registering a rebalance callback gives the application flexibility in performing other operations along with the assinging/revocation, such as fetching offsets from an alternate location (on assign) or manually committing offsets (on revoke).

The following example show's the application's responsibilities:

class MyRebalanceCb : public RdKafka::RebalanceCb {
RdKafka::ErrorCode err,
std::vector<RdKafka::TopicPartition*> &partitions) {
if (err == RdKafka::ERR__ASSIGN_PARTITIONS) {
// application may load offets from arbitrary external
// storage here and update \p partitions
} else if (err == RdKafka::ERR__REVOKE_PARTITIONS) {
// Application may commit offsets manually here
// if auto.commit.enable=false
} else {
std::cerr << "Rebalancing error: <<
RdKafka::err2str(err) << std::endl;

The documentation for this class was generated from the following file: