librdkafka
The Apache Kafka C/C++ client library
RdKafka::RebalanceCb Class Referenceabstract

KafkaConsumer: Rebalance callback class More...

#include <rdkafkacpp.h>

Public Member Functions

virtual void rebalance_cb (RdKafka::KafkaConsumer *consumer, RdKafka::ErrorCode err, std::vector< TopicPartition * > &partitions)=0
 Group rebalance callback for use with RdKafka::KafkaConsumer. More...
 

Detailed Description

KafkaConsumer: Rebalance callback class

Member Function Documentation

◆ rebalance_cb()

virtual void RdKafka::RebalanceCb::rebalance_cb ( RdKafka::KafkaConsumer consumer,
RdKafka::ErrorCode  err,
std::vector< TopicPartition * > &  partitions 
)
pure virtual

Group rebalance callback for use with RdKafka::KafkaConsumer.

Registering a rebalance_cb turns off librdkafka's automatic partition assignment/revocation and instead delegates that responsibility to the application's rebalance_cb.

The rebalance callback is responsible for updating librdkafka's assignment set based on the two events: RdKafka::ERR__ASSIGN_PARTITIONS and RdKafka::ERR__REVOKE_PARTITIONS but should also be able to handle arbitrary rebalancing failures where err is neither of those.

Remarks
In this latter case (arbitrary error), the application must call unassign() to synchronize state.

For eager/non-cooperative partition.assignment.strategy assignors, such as range and roundrobin, the application must use assign assign() to set and unassign() to clear the entire assignment. For the cooperative assignors, such as cooperative-sticky, the application must use incremental_assign() for ERR__ASSIGN_PARTITIONS and incremental_unassign() for ERR__REVOKE_PARTITIONS.

Without a rebalance callback this is done automatically by librdkafka but registering a rebalance callback gives the application flexibility in performing other operations along with the assinging/revocation, such as fetching offsets from an alternate location (on assign) or manually committing offsets (on revoke).

See also
RdKafka::KafkaConsumer::assign()
RdKafka::KafkaConsumer::incremental_assign()
RdKafka::KafkaConsumer::incremental_unassign()
RdKafka::KafkaConsumer::assignment_lost()
RdKafka::KafkaConsumer::rebalance_protocol()

The following example show's the application's responsibilities:

class MyRebalanceCb : public RdKafka::RebalanceCb {
public:
std::vector<RdKafka::TopicPartition*> &partitions) {
if (err == RdKafka::ERR__ASSIGN_PARTITIONS) {
// application may load offets from arbitrary external
// storage here and update \p partitions
if (consumer->rebalance_protocol() == "COOPERATIVE")
consumer->incremental_assign(partitions);
else
consumer->assign(partitions);
} else if (err == RdKafka::ERR__REVOKE_PARTITIONS) {
// Application may commit offsets manually here
// if auto.commit.enable=false
if (consumer->rebalance_protocol() == "COOPERATIVE")
consumer->incremental_unassign(partitions);
else
consumer->unassign();
} else {
std::cerr << "Rebalancing error: " <<
RdKafka::err2str(err) << std::endl;
consumer->unassign();
}
}
}
High-level KafkaConsumer (for brokers 0.9 and later)
Definition: rdkafkacpp.h:2565
virtual ErrorCode unassign()=0
Stop consumption and remove the current assignment.
virtual Error * incremental_assign(const std::vector< TopicPartition * > &partitions)=0
Incrementally add partitions to the current assignment.
virtual std::string rebalance_protocol()=0
The rebalance protocol currently in use. This will be "NONE" if the consumer has not (yet) joined a g...
virtual Error * incremental_unassign(const std::vector< TopicPartition * > &partitions)=0
Incrementally remove partitions from the current assignment.
virtual ErrorCode assign(const std::vector< TopicPartition * > &partitions)=0
Update the assignment set to partitions.
KafkaConsumer: Rebalance callback class
Definition: rdkafkacpp.h:955
virtual void rebalance_cb(RdKafka::KafkaConsumer *consumer, RdKafka::ErrorCode err, std::vector< TopicPartition * > &partitions)=0
Group rebalance callback for use with RdKafka::KafkaConsumer.
RD_EXPORT std::string err2str(RdKafka::ErrorCode err)
Returns a human readable representation of a kafka error.
ErrorCode
Error codes.
Definition: rdkafkacpp.h:200
Remarks
The above example lacks error handling for assign calls, see the examples/ directory.

The documentation for this class was generated from the following file: