librdkafka
The Apache Kafka C/C++ client library
 All Data Structures Files Functions Variables Typedefs Enumerations Enumerator Macros Pages
RdKafka::RebalanceCb Class Referenceabstract

KafkaConsumer: Rebalance callback class More...

#include <rdkafkacpp.h>

Public Member Functions

virtual void rebalance_cb (RdKafka::KafkaConsumer *consumer, RdKafka::ErrorCode err, std::vector< TopicPartition * > &partitions)=0
 Group rebalance callback for use with RdKafka::KafkaConsumer. More...
 

Detailed Description

KafkaConsumer: Rebalance callback class

Member Function Documentation

virtual void RdKafka::RebalanceCb::rebalance_cb ( RdKafka::KafkaConsumer consumer,
RdKafka::ErrorCode  err,
std::vector< TopicPartition * > &  partitions 
)
pure virtual

Group rebalance callback for use with RdKafka::KafkaConsumer.

Registering a rebalance_cb turns off librdkafka's automatic partition assignment/revocation and instead delegates that responsibility to the application's rebalance_cb.

The rebalance callback is responsible for updating librdkafka's assignment set based on the two events: RdKafka::ERR__ASSIGN_PARTITIONS and RdKafka::ERR__REVOKE_PARTITIONS but should also be able to handle arbitrary rebalancing failures where err is neither of those.

Remarks
In this latter case (arbitrary error), the application must call unassign() to synchronize state.

For eager/non-cooperative partition.assignment.strategy assignors, such as range and roundrobin, the application must use assign assign() to set and unassign() to clear the entire assignment. For the cooperative assignors, such as cooperative-sticky, the application must use incremental_assign() for ERR__ASSIGN_PARTITIONS and incremental_unassign() for ERR__REVOKE_PARTITIONS.

Without a rebalance callback this is done automatically by librdkafka but registering a rebalance callback gives the application flexibility in performing other operations along with the assinging/revocation, such as fetching offsets from an alternate location (on assign) or manually committing offsets (on revoke).

See also
RdKafka::KafkaConsumer::assign()
RdKafka::KafkaConsumer::incremental_assign()
RdKafka::KafkaConsumer::incremental_unassign()
RdKafka::KafkaConsumer::assignment_lost()
RdKafka::KafkaConsumer::rebalance_protocol()

The following example show's the application's responsibilities:

class MyRebalanceCb : public RdKafka::RebalanceCb {
public:
RdKafka::ErrorCode err,
std::vector<RdKafka::TopicPartition*> &partitions) {
if (err == RdKafka::ERR__ASSIGN_PARTITIONS) {
// application may load offets from arbitrary external
// storage here and update \p partitions
if (consumer->rebalance_protocol() == "COOPERATIVE")
consumer->incremental_assign(partitions);
else
consumer->assign(partitions);
} else if (err == RdKafka::ERR__REVOKE_PARTITIONS) {
// Application may commit offsets manually here
// if auto.commit.enable=false
if (consumer->rebalance_protocol() == "COOPERATIVE")
consumer->incremental_unassign(partitions);
else
consumer->unassign();
} else {
std::cerr << "Rebalancing error: " <<
RdKafka::err2str(err) << std::endl;
consumer->unassign();
}
}
}
Remarks
The above example lacks error handling for assign calls, see the examples/ directory.

The documentation for this class was generated from the following file: