librdkafka
The Apache Kafka C/C++ client library
RdKafka::RebalanceCb Class Referenceabstract

KafkaConsumer: Rebalance callback class More...

#include <rdkafkacpp.h>

Public Member Functions

virtual void rebalance_cb (RdKafka::KafkaConsumer *consumer, RdKafka::ErrorCode err, std::vector< TopicPartition * > &partitions)=0
 Group rebalance callback for use with RdKafka::KafkaConsumer. More...
 

Detailed Description

KafkaConsumer: Rebalance callback class

Member Function Documentation

◆ rebalance_cb()

virtual void RdKafka::RebalanceCb::rebalance_cb ( RdKafka::KafkaConsumer consumer,
RdKafka::ErrorCode  err,
std::vector< TopicPartition * > &  partitions 
)
pure virtual

Group rebalance callback for use with RdKafka::KafkaConsumer.

Registering a rebalance_cb turns off librdkafka's automatic partition assignment/revocation and instead delegates that responsibility to the application's rebalance_cb.

The rebalance callback is responsible for updating librdkafka's assignment set based on the two events: RdKafka::ERR__ASSIGN_PARTITIONS and RdKafka::ERR__REVOKE_PARTITIONS but should also be able to handle arbitrary rebalancing failures where err is neither of those.

Remarks
In this latter case (arbitrary error), the application must call unassign() to synchronize state.

For eager/non-cooperative partition.assignment.strategy assignors, such as range and roundrobin, the application must use assign assign() to set and unassign() to clear the entire assignment. For the cooperative assignors, such as cooperative-sticky, the application must use incremental_assign() for ERR__ASSIGN_PARTITIONS and incremental_unassign() for ERR__REVOKE_PARTITIONS.

Without a rebalance callback this is done automatically by librdkafka but registering a rebalance callback gives the application flexibility in performing other operations along with the assinging/revocation, such as fetching offsets from an alternate location (on assign) or manually committing offsets (on revoke).

See also
RdKafka::KafkaConsumer::assign()
RdKafka::KafkaConsumer::incremental_assign()
RdKafka::KafkaConsumer::incremental_unassign()
RdKafka::KafkaConsumer::assignment_lost()
RdKafka::KafkaConsumer::rebalance_protocol()

The following example show's the application's responsibilities:

class MyRebalanceCb : public RdKafka::RebalanceCb {
public:
std::vector<RdKafka::TopicPartition*> &partitions) {
if (err == RdKafka::ERR__ASSIGN_PARTITIONS) {
// application may load offets from arbitrary external
// storage here and update \p partitions
if (consumer->rebalance_protocol() == "COOPERATIVE")
consumer->incremental_assign(partitions);
else
consumer->assign(partitions);
} else if (err == RdKafka::ERR__REVOKE_PARTITIONS) {
// Application may commit offsets manually here
// if auto.commit.enable=false
if (consumer->rebalance_protocol() == "COOPERATIVE")
consumer->incremental_unassign(partitions);
else
consumer->unassign();
} else {
std::cerr << "Rebalancing error: " <<
RdKafka::err2str(err) << std::endl;
consumer->unassign();
}
}
}
Remarks
The above example lacks error handling for assign calls, see the examples/ directory.

The documentation for this class was generated from the following file:
RdKafka::KafkaConsumer::assign
virtual ErrorCode assign(const std::vector< TopicPartition * > &partitions)=0
Update the assignment set to partitions.
RdKafka::KafkaConsumer
High-level KafkaConsumer (for brokers 0.9 and later)
Definition: rdkafkacpp.h:2482
RdKafka::RebalanceCb
KafkaConsumer: Rebalance callback class
Definition: rdkafkacpp.h:946
RdKafka::ErrorCode
ErrorCode
Error codes.
Definition: rdkafkacpp.h:200
RdKafka::KafkaConsumer::incremental_unassign
virtual Error * incremental_unassign(const std::vector< TopicPartition * > &partitions)=0
Incrementally remove partitions from the current assignment.
RdKafka::RebalanceCb::rebalance_cb
virtual void rebalance_cb(RdKafka::KafkaConsumer *consumer, RdKafka::ErrorCode err, std::vector< TopicPartition * > &partitions)=0
Group rebalance callback for use with RdKafka::KafkaConsumer.
RdKafka::KafkaConsumer::unassign
virtual ErrorCode unassign()=0
Stop consumption and remove the current assignment.
RdKafka::KafkaConsumer::rebalance_protocol
virtual std::string rebalance_protocol()=0
The rebalance protocol currently in use. This will be "NONE" if the consumer has not (yet) joined a g...
RdKafka::err2str
RD_EXPORT std::string err2str(RdKafka::ErrorCode err)
Returns a human readable representation of a kafka error.
RdKafka::KafkaConsumer::incremental_assign
virtual Error * incremental_assign(const std::vector< TopicPartition * > &partitions)=0
Incrementally add partitions to the current assignment.