librdkafka
The Apache Kafka C/C++ client library
RdKafka::KafkaConsumer Class Referenceabstract

High-level KafkaConsumer (for brokers 0.9 and later) More...

#include <rdkafkacpp.h>

Inheritance diagram for RdKafka::KafkaConsumer:
Collaboration diagram for RdKafka::KafkaConsumer:

Public Member Functions

virtual ErrorCode assignment (std::vector< RdKafka::TopicPartition * > &partitions)=0
 Returns the current partition assignment as set by RdKafka::KafkaConsumer::assign()
 
virtual ErrorCode subscription (std::vector< std::string > &topics)=0
 Returns the current subscription as set by RdKafka::KafkaConsumer::subscribe()
 
virtual ErrorCode subscribe (const std::vector< std::string > &topics)=0
 Update the subscription set to topics. More...
 
virtual ErrorCode unsubscribe ()=0
 Unsubscribe from the current subscription set.
 
virtual ErrorCode assign (const std::vector< TopicPartition * > &partitions)=0
 Update the assignment set to partitions. More...
 
virtual ErrorCode unassign ()=0
 Stop consumption and remove the current assignment.
 
virtual Messageconsume (int timeout_ms)=0
 Consume message or get error event, triggers callbacks. More...
 
virtual ErrorCode commitSync ()=0
 Commit offsets for the current assignment. More...
 
virtual ErrorCode commitAsync ()=0
 Asynchronous version of RdKafka::KafkaConsumer::CommitSync() More...
 
virtual ErrorCode commitSync (Message *message)=0
 Commit offset for a single topic+partition based on message. More...
 
virtual ErrorCode commitAsync (Message *message)=0
 Commit offset for a single topic+partition based on message. More...
 
virtual ErrorCode commitSync (std::vector< TopicPartition * > &offsets)=0
 Commit offsets for the provided list of partitions. More...
 
virtual ErrorCode commitAsync (const std::vector< TopicPartition * > &offsets)=0
 Commit offset for the provided list of partitions. More...
 
virtual ErrorCode commitSync (OffsetCommitCb *offset_commit_cb)=0
 Commit offsets for the current assignment. More...
 
virtual ErrorCode commitSync (std::vector< TopicPartition * > &offsets, OffsetCommitCb *offset_commit_cb)=0
 Commit offsets for the provided list of partitions. More...
 
virtual ErrorCode committed (std::vector< TopicPartition * > &partitions, int timeout_ms)=0
 Retrieve committed offsets for topics+partitions. More...
 
virtual ErrorCode position (std::vector< TopicPartition * > &partitions)=0
 Retrieve current positions (offsets) for topics+partitions. More...
 
virtual ErrorCode close ()=0
 Close and shut down the consumer. More...
 
virtual ErrorCode seek (const TopicPartition &partition, int timeout_ms)=0
 Seek consumer for topic+partition to offset which is either an absolute or logical offset. More...
 
virtual ErrorCode offsets_store (std::vector< TopicPartition * > &offsets)=0
 Store offset offset for topic partition partition. The offset will be committed (written) to the offset store according to auto.commit.interval.ms or the next manual offset-less commit*() More...
 
virtual ConsumerGroupMetadatagroupMetadata ()=0
 
virtual bool assignment_lost ()=0
 Check whether the consumer considers the current assignment to have been lost involuntarily. This method is only applicable for use with a subscribing consumer. Assignments are revoked immediately when determined to have been lost, so this method is only useful within a rebalance callback. Partitions that have been lost may already be owned by other members in the group and therefore commiting offsets, for example, may fail. More...
 
virtual std::string rebalance_protocol ()=0
 The rebalance protocol currently in use. This will be "NONE" if the consumer has not (yet) joined a group, else it will match the rebalance protocol ("EAGER", "COOPERATIVE") of the configured and selected assignor(s). All configured assignors must have the same protocol type, meaning online migration of a consumer group from using one protocol to another (in particular upgading from EAGER to COOPERATIVE) without a restart is not currently supported. More...
 
virtual Errorincremental_assign (const std::vector< TopicPartition * > &partitions)=0
 Incrementally add partitions to the current assignment. More...
 
virtual Errorincremental_unassign (const std::vector< TopicPartition * > &partitions)=0
 Incrementally remove partitions from the current assignment. More...
 
virtual Errorclose (Queue *queue)=0
 Close and shut down the consumer. More...
 
virtual bool closed ()=0
 
- Public Member Functions inherited from RdKafka::Handle
virtual const std::string name () const =0
 
virtual const std::string memberid () const =0
 Returns the client's broker-assigned group member id. More...
 
virtual int poll (int timeout_ms)=0
 Polls the provided kafka handle for events. More...
 
virtual int outq_len ()=0
 Returns the current out queue length. More...
 
virtual ErrorCode metadata (bool all_topics, const Topic *only_rkt, Metadata **metadatap, int timeout_ms)=0
 Request Metadata from broker. More...
 
virtual ErrorCode pause (std::vector< TopicPartition * > &partitions)=0
 Pause producing or consumption for the provided list of partitions. More...
 
virtual ErrorCode resume (std::vector< TopicPartition * > &partitions)=0
 Resume producing or consumption for the provided list of partitions. More...
 
virtual ErrorCode query_watermark_offsets (const std::string &topic, int32_t partition, int64_t *low, int64_t *high, int timeout_ms)=0
 Query broker for low (oldest/beginning) and high (newest/end) offsets for partition. More...
 
virtual ErrorCode get_watermark_offsets (const std::string &topic, int32_t partition, int64_t *low, int64_t *high)=0
 Get last known low (oldest/beginning) and high (newest/end) offsets for partition. More...
 
virtual ErrorCode offsetsForTimes (std::vector< TopicPartition * > &offsets, int timeout_ms)=0
 Look up the offsets for the given partitions by timestamp. More...
 
virtual Queueget_partition_queue (const TopicPartition *partition)=0
 Retrieve queue for a given partition. More...
 
virtual ErrorCode set_log_queue (Queue *queue)=0
 Forward librdkafka logs (and debug) to the specified queue for serving with one of the ..poll() calls. More...
 
virtual void yield ()=0
 Cancels the current callback dispatcher (Handle::poll(), KafkaConsumer::consume(), etc). More...
 
virtual const std::string clusterid (int timeout_ms)=0
 Returns the ClusterId as reported in broker metadata. More...
 
virtual struct rd_kafka_s * c_ptr ()=0
 Returns the underlying librdkafka C rd_kafka_t handle. More...
 
virtual int32_t controllerid (int timeout_ms)=0
 Returns the current ControllerId (controller broker id) as reported in broker metadata. More...
 
virtual ErrorCode fatal_error (std::string &errstr) const =0
 Returns the first fatal error set on this client instance, or ERR_NO_ERROR if no fatal error has occurred. More...
 
virtual ErrorCode oauthbearer_set_token (const std::string &token_value, int64_t md_lifetime_ms, const std::string &md_principal_name, const std::list< std::string > &extensions, std::string &errstr)=0
 Set SASL/OAUTHBEARER token and metadata. More...
 
virtual ErrorCode oauthbearer_set_token_failure (const std::string &errstr)=0
 SASL/OAUTHBEARER token refresh failure indicator. More...
 
virtual Errorsasl_background_callbacks_enable ()=0
 Enable SASL OAUTHBEARER refresh callbacks on the librdkafka background thread. More...
 
virtual Queueget_sasl_queue ()=0
 
virtual Queueget_background_queue ()=0
 
virtual void * mem_malloc (size_t size)=0
 Allocate memory using the same allocator librdkafka uses. More...
 
virtual void mem_free (void *ptr)=0
 Free pointer returned by librdkafka. More...
 

Static Public Member Functions

static KafkaConsumercreate (const Conf *conf, std::string &errstr)
 Creates a KafkaConsumer. More...
 

Detailed Description

High-level KafkaConsumer (for brokers 0.9 and later)

Remarks
Requires Apache Kafka >= 0.9.0 brokers

Currently supports the range and roundrobin partition assignment strategies (see partition.assignment.strategy)

Member Function Documentation

◆ create()

static KafkaConsumer* RdKafka::KafkaConsumer::create ( const Conf conf,
std::string &  errstr 
)
static

Creates a KafkaConsumer.

The conf object must have group.id set to the consumer group to join.

Use RdKafka::KafkaConsumer::close() to shut down the consumer.

See also
RdKafka::RebalanceCb
CONFIGURATION.md for group.id, session.timeout.ms, partition.assignment.strategy, etc.

◆ subscribe()

virtual ErrorCode RdKafka::KafkaConsumer::subscribe ( const std::vector< std::string > &  topics)
pure virtual

Update the subscription set to topics.

Any previous subscription will be unassigned and unsubscribed first.

The subscription set denotes the desired topics to consume and this set is provided to the partition assignor (one of the elected group members) for all clients which then uses the configured partition.assignment.strategy to assign the subscription sets's topics's partitions to the consumers, depending on their subscription.

The result of such an assignment is a rebalancing which is either handled automatically in librdkafka or can be overridden by the application by providing a RdKafka::RebalanceCb.

The rebalancing passes the assigned partition set to RdKafka::KafkaConsumer::assign() to update what partitions are actually being fetched by the KafkaConsumer.

Regex pattern matching automatically performed for topics prefixed with "^" (e.g. "^myPfx[0-9]_.*"

Remarks
A consumer error will be raised for each unavailable topic in the topics. The error will be ERR_UNKNOWN_TOPIC_OR_PART for non-existent topics, and ERR_TOPIC_AUTHORIZATION_FAILED for unauthorized topics. The consumer error will be raised through consume() (et.al.) with the RdKafka::Message::err() returning one of the error codes mentioned above. The subscribe function itself is asynchronous and will not return an error on unavailable topics.
Returns
an error if the provided list of topics is invalid.

◆ assign()

virtual ErrorCode RdKafka::KafkaConsumer::assign ( const std::vector< TopicPartition * > &  partitions)
pure virtual

Update the assignment set to partitions.

The assignment set is the set of partitions actually being consumed by the KafkaConsumer.

◆ consume()

virtual Message* RdKafka::KafkaConsumer::consume ( int  timeout_ms)
pure virtual

Consume message or get error event, triggers callbacks.

Will automatically call registered callbacks for any such queued events, including RdKafka::RebalanceCb, RdKafka::EventCb, RdKafka::OffsetCommitCb, etc.

Remarks
Use delete to free the message.
An application should make sure to call consume() at regular intervals, even if no messages are expected, to serve any queued callbacks waiting to be called. This is especially important when a RebalanceCb has been registered as it needs to be called and handled properly to synchronize internal consumer state.
Application MUST NOT call poll() on KafkaConsumer objects.
Returns
One of:

◆ commitSync() [1/5]

virtual ErrorCode RdKafka::KafkaConsumer::commitSync ( )
pure virtual

Commit offsets for the current assignment.

Remarks
This is the synchronous variant that blocks until offsets are committed or the commit fails (see return value).
If a RdKafka::OffsetCommitCb callback is registered it will be called with commit details on a future call to RdKafka::KafkaConsumer::consume()
Returns
ERR_NO_ERROR or error code.

◆ commitAsync() [1/3]

virtual ErrorCode RdKafka::KafkaConsumer::commitAsync ( )
pure virtual

Asynchronous version of RdKafka::KafkaConsumer::CommitSync()

See also
RdKafka::KafkaConsumer::commitSync()

◆ commitSync() [2/5]

virtual ErrorCode RdKafka::KafkaConsumer::commitSync ( Message message)
pure virtual

Commit offset for a single topic+partition based on message.

Remarks
The offset committed will be the message's offset + 1.
This is the synchronous variant.
See also
RdKafka::KafkaConsumer::commitSync()

◆ commitAsync() [2/3]

virtual ErrorCode RdKafka::KafkaConsumer::commitAsync ( Message message)
pure virtual

Commit offset for a single topic+partition based on message.

Remarks
The offset committed will be the message's offset + 1.
This is the asynchronous variant.
See also
RdKafka::KafkaConsumer::commitSync()

◆ commitSync() [3/5]

virtual ErrorCode RdKafka::KafkaConsumer::commitSync ( std::vector< TopicPartition * > &  offsets)
pure virtual

Commit offsets for the provided list of partitions.

Remarks
The .offset of the partitions in offsets should be the offset where consumption will resume, i.e., the last processed offset + 1.
This is the synchronous variant.

◆ commitAsync() [3/3]

virtual ErrorCode RdKafka::KafkaConsumer::commitAsync ( const std::vector< TopicPartition * > &  offsets)
pure virtual

Commit offset for the provided list of partitions.

Remarks
The .offset of the partitions in offsets should be the offset where consumption will resume, i.e., the last processed offset + 1.
This is the asynchronous variant.

◆ commitSync() [4/5]

virtual ErrorCode RdKafka::KafkaConsumer::commitSync ( OffsetCommitCb offset_commit_cb)
pure virtual

Commit offsets for the current assignment.

Remarks
This is the synchronous variant that blocks until offsets are committed or the commit fails (see return value).
The provided callback will be called from this function.
Returns
ERR_NO_ERROR or error code.

◆ commitSync() [5/5]

virtual ErrorCode RdKafka::KafkaConsumer::commitSync ( std::vector< TopicPartition * > &  offsets,
OffsetCommitCb offset_commit_cb 
)
pure virtual

Commit offsets for the provided list of partitions.

Remarks
This is the synchronous variant that blocks until offsets are committed or the commit fails (see return value).
The provided callback will be called from this function.
Returns
ERR_NO_ERROR or error code.

◆ committed()

virtual ErrorCode RdKafka::KafkaConsumer::committed ( std::vector< TopicPartition * > &  partitions,
int  timeout_ms 
)
pure virtual

Retrieve committed offsets for topics+partitions.

Returns
ERR_NO_ERROR on success in which case the offset or err field of each partitions' element is filled in with the stored offset, or a partition specific error. Else returns an error code.

◆ position()

virtual ErrorCode RdKafka::KafkaConsumer::position ( std::vector< TopicPartition * > &  partitions)
pure virtual

Retrieve current positions (offsets) for topics+partitions.

Returns
ERR_NO_ERROR on success in which case the offset or err field of each partitions' element is filled in with the stored offset, or a partition specific error. Else returns an error code.

◆ close() [1/2]

virtual ErrorCode RdKafka::KafkaConsumer::close ( )
pure virtual

Close and shut down the consumer.

For pausing and resuming consumption, see

See also
RdKafka::Handle::pause() and RdKafka::Handle::resume()

This call will block until the following operations are finished:

  • Trigger a local rebalance to void the current assignment (if any).
  • Stop consumption for current assignment (if any).
  • Commit offsets (if any).
  • Leave group (if applicable).

The maximum blocking time is roughly limited to session.timeout.ms.

Remarks
Callbacks, such as RdKafka::RebalanceCb and RdKafka::OffsetCommitCb, etc, may be called.
The consumer object must later be freed with delete

◆ seek()

virtual ErrorCode RdKafka::KafkaConsumer::seek ( const TopicPartition partition,
int  timeout_ms 
)
pure virtual

Seek consumer for topic+partition to offset which is either an absolute or logical offset.

If timeout_ms is not 0 the call will wait this long for the seek to be performed. If the timeout is reached the internal state will be unknown and this function returns ERR__TIMED_OUT. If timeout_ms is 0 it will initiate the seek but return immediately without any error reporting (e.g., async).

This call triggers a fetch queue barrier flush.

Remarks
Consumption for the given partition must have started for the seek to work. Use assign() to set the starting offset.
Returns
an ErrorCode to indicate success or failure.

◆ offsets_store()

virtual ErrorCode RdKafka::KafkaConsumer::offsets_store ( std::vector< TopicPartition * > &  offsets)
pure virtual

Store offset offset for topic partition partition. The offset will be committed (written) to the offset store according to auto.commit.interval.ms or the next manual offset-less commit*()

Per-partition success/error status propagated through TopicPartition.err()

Remarks
The .offset field is stored as is, it will NOT be + 1.
enable.auto.offset.store must be set to false when using this API.
Returns
RdKafka::ERR_NO_ERROR on success, or RdKafka::ERR___UNKNOWN_PARTITION if none of the offsets could be stored, or RdKafka::ERR___INVALID_ARG if enable.auto.offset.store is true.

◆ groupMetadata()

virtual ConsumerGroupMetadata* RdKafka::KafkaConsumer::groupMetadata ( )
pure virtual
Returns
the current consumer group metadata associated with this consumer, or NULL if the consumer is configured with a group.id. This metadata object should be passed to the transactional producer's RdKafka::Producer::send_offsets_to_transaction() API.
Remarks
The returned object must be deleted by the application.
See also
RdKafka::Producer::send_offsets_to_transaction()

◆ assignment_lost()

virtual bool RdKafka::KafkaConsumer::assignment_lost ( )
pure virtual

Check whether the consumer considers the current assignment to have been lost involuntarily. This method is only applicable for use with a subscribing consumer. Assignments are revoked immediately when determined to have been lost, so this method is only useful within a rebalance callback. Partitions that have been lost may already be owned by other members in the group and therefore commiting offsets, for example, may fail.

Remarks
Calling assign(), incremental_assign() or incremental_unassign() resets this flag.
Returns
Returns true if the current partition assignment is considered lost, false otherwise.

◆ rebalance_protocol()

virtual std::string RdKafka::KafkaConsumer::rebalance_protocol ( )
pure virtual

The rebalance protocol currently in use. This will be "NONE" if the consumer has not (yet) joined a group, else it will match the rebalance protocol ("EAGER", "COOPERATIVE") of the configured and selected assignor(s). All configured assignors must have the same protocol type, meaning online migration of a consumer group from using one protocol to another (in particular upgading from EAGER to COOPERATIVE) without a restart is not currently supported.

Returns
an empty string on error, or one of "NONE", "EAGER", "COOPERATIVE" on success.

◆ incremental_assign()

virtual Error* RdKafka::KafkaConsumer::incremental_assign ( const std::vector< TopicPartition * > &  partitions)
pure virtual

Incrementally add partitions to the current assignment.

If a COOPERATIVE assignor (i.e. incremental rebalancing) is being used, this method should be used in a rebalance callback to adjust the current assignment appropriately in the case where the rebalance type is ERR__ASSIGN_PARTITIONS. The application must pass the partition list passed to the callback (or a copy of it), even if the list is empty. This method may also be used outside the context of a rebalance callback.

Returns
NULL on success, or an error object if the operation was unsuccessful.
Remarks
The returned object must be deleted by the application.

◆ incremental_unassign()

virtual Error* RdKafka::KafkaConsumer::incremental_unassign ( const std::vector< TopicPartition * > &  partitions)
pure virtual

Incrementally remove partitions from the current assignment.

If a COOPERATIVE assignor (i.e. incremental rebalancing) is being used, this method should be used in a rebalance callback to adjust the current assignment appropriately in the case where the rebalance type is ERR__REVOKE_PARTITIONS. The application must pass the partition list passed to the callback (or a copy of it), even if the list is empty. This method may also be used outside the context of a rebalance callback.

Returns
NULL on success, or an error object if the operation was unsuccessful.
Remarks
The returned object must be deleted by the application.

◆ close() [2/2]

virtual Error* RdKafka::KafkaConsumer::close ( Queue queue)
pure virtual

Close and shut down the consumer.

Performs the same actions as RdKafka::KafkaConsumer::close() but in a background thread.

Rebalance events/callbacks (etc) will be forwarded to the application-provided queue. The application must poll this queue until RdKafka::KafkaConsumer::closed() returns true.

Remarks
Depending on consumer group join state there may or may not be rebalance events emitted on rkqu.
Returns
an error object if the consumer close failed, else NULL.
See also
RdKafka::KafkaConsumer::closed()

◆ closed()

virtual bool RdKafka::KafkaConsumer::closed ( )
pure virtual
Returns
true if the consumer is closed, else 0.
See also
RdKafka::KafkaConsumer::close()

The documentation for this class was generated from the following file: