librdkafka
The Apache Kafka C/C++ client library
|
High-level KafkaConsumer (for brokers 0.9 and later) More...
#include <rdkafkacpp.h>
Public Member Functions | |
virtual ErrorCode | assignment (std::vector< RdKafka::TopicPartition * > &partitions)=0 |
Returns the current partition assignment as set by RdKafka::KafkaConsumer::assign() | |
virtual ErrorCode | subscription (std::vector< std::string > &topics)=0 |
Returns the current subscription as set by RdKafka::KafkaConsumer::subscribe() | |
virtual ErrorCode | subscribe (const std::vector< std::string > &topics)=0 |
Update the subscription set to topics . More... | |
virtual ErrorCode | unsubscribe ()=0 |
Unsubscribe from the current subscription set. | |
virtual ErrorCode | assign (const std::vector< TopicPartition * > &partitions)=0 |
Update the assignment set to partitions . More... | |
virtual ErrorCode | unassign ()=0 |
Stop consumption and remove the current assignment. | |
virtual Message * | consume (int timeout_ms)=0 |
Consume message or get error event, triggers callbacks. More... | |
virtual ErrorCode | commitSync ()=0 |
Commit offsets for the current assignment. More... | |
virtual ErrorCode | commitAsync ()=0 |
Asynchronous version of RdKafka::KafkaConsumer::CommitSync() More... | |
virtual ErrorCode | commitSync (Message *message)=0 |
Commit offset for a single topic+partition based on message . More... | |
virtual ErrorCode | commitAsync (Message *message)=0 |
Commit offset for a single topic+partition based on message . More... | |
virtual ErrorCode | commitSync (std::vector< TopicPartition * > &offsets)=0 |
Commit offsets for the provided list of partitions. More... | |
virtual ErrorCode | commitAsync (const std::vector< TopicPartition * > &offsets)=0 |
Commit offset for the provided list of partitions. More... | |
virtual ErrorCode | commitSync (OffsetCommitCb *offset_commit_cb)=0 |
Commit offsets for the current assignment. More... | |
virtual ErrorCode | commitSync (std::vector< TopicPartition * > &offsets, OffsetCommitCb *offset_commit_cb)=0 |
Commit offsets for the provided list of partitions. More... | |
virtual ErrorCode | committed (std::vector< TopicPartition * > &partitions, int timeout_ms)=0 |
Retrieve committed offsets for topics+partitions. More... | |
virtual ErrorCode | position (std::vector< TopicPartition * > &partitions)=0 |
Retrieve current positions (offsets) for topics+partitions. More... | |
virtual ErrorCode | close ()=0 |
Close and shut down the consumer. More... | |
virtual ErrorCode | seek (const TopicPartition &partition, int timeout_ms)=0 |
Seek consumer for topic+partition to offset which is either an absolute or logical offset. More... | |
virtual ErrorCode | offsets_store (std::vector< TopicPartition * > &offsets)=0 |
Store offset offset for topic partition partition . The offset will be committed (written) to the offset store according to auto.commit.interval.ms or the next manual offset-less commit*() More... | |
virtual ConsumerGroupMetadata * | groupMetadata ()=0 |
virtual bool | assignment_lost ()=0 |
Check whether the consumer considers the current assignment to have been lost involuntarily. This method is only applicable for use with a subscribing consumer. Assignments are revoked immediately when determined to have been lost, so this method is only useful within a rebalance callback. Partitions that have been lost may already be owned by other members in the group and therefore commiting offsets, for example, may fail. More... | |
virtual std::string | rebalance_protocol ()=0 |
The rebalance protocol currently in use. This will be "NONE" if the consumer has not (yet) joined a group, else it will match the rebalance protocol ("EAGER", "COOPERATIVE") of the configured and selected assignor(s). All configured assignors must have the same protocol type, meaning online migration of a consumer group from using one protocol to another (in particular upgading from EAGER to COOPERATIVE) without a restart is not currently supported. More... | |
virtual Error * | incremental_assign (const std::vector< TopicPartition * > &partitions)=0 |
Incrementally add partitions to the current assignment. More... | |
virtual Error * | incremental_unassign (const std::vector< TopicPartition * > &partitions)=0 |
Incrementally remove partitions from the current assignment. More... | |
virtual Error * | close (Queue *queue)=0 |
Close and shut down the consumer. More... | |
virtual bool | closed ()=0 |
Public Member Functions inherited from RdKafka::Handle | |
virtual std::string | name () const =0 |
virtual std::string | memberid () const =0 |
Returns the client's broker-assigned group member id. More... | |
virtual int | poll (int timeout_ms)=0 |
Polls the provided kafka handle for events. More... | |
virtual int | outq_len ()=0 |
Returns the current out queue length. More... | |
virtual ErrorCode | metadata (bool all_topics, const Topic *only_rkt, Metadata **metadatap, int timeout_ms)=0 |
Request Metadata from broker. More... | |
virtual ErrorCode | pause (std::vector< TopicPartition * > &partitions)=0 |
Pause producing or consumption for the provided list of partitions. More... | |
virtual ErrorCode | resume (std::vector< TopicPartition * > &partitions)=0 |
Resume producing or consumption for the provided list of partitions. More... | |
virtual ErrorCode | query_watermark_offsets (const std::string &topic, int32_t partition, int64_t *low, int64_t *high, int timeout_ms)=0 |
Query broker for low (oldest/beginning) and high (newest/end) offsets for partition. More... | |
virtual ErrorCode | get_watermark_offsets (const std::string &topic, int32_t partition, int64_t *low, int64_t *high)=0 |
Get last known low (oldest/beginning) and high (newest/end) offsets for partition. More... | |
virtual ErrorCode | offsetsForTimes (std::vector< TopicPartition * > &offsets, int timeout_ms)=0 |
Look up the offsets for the given partitions by timestamp. More... | |
virtual Queue * | get_partition_queue (const TopicPartition *partition)=0 |
Retrieve queue for a given partition. More... | |
virtual ErrorCode | set_log_queue (Queue *queue)=0 |
Forward librdkafka logs (and debug) to the specified queue for serving with one of the ..poll() calls. More... | |
virtual void | yield ()=0 |
Cancels the current callback dispatcher (Handle::poll(), KafkaConsumer::consume(), etc). More... | |
virtual std::string | clusterid (int timeout_ms)=0 |
Returns the ClusterId as reported in broker metadata. More... | |
virtual struct rd_kafka_s * | c_ptr ()=0 |
Returns the underlying librdkafka C rd_kafka_t handle. More... | |
virtual int32_t | controllerid (int timeout_ms)=0 |
Returns the current ControllerId (controller broker id) as reported in broker metadata. More... | |
virtual ErrorCode | fatal_error (std::string &errstr) const =0 |
Returns the first fatal error set on this client instance, or ERR_NO_ERROR if no fatal error has occurred. More... | |
virtual ErrorCode | oauthbearer_set_token (const std::string &token_value, int64_t md_lifetime_ms, const std::string &md_principal_name, const std::list< std::string > &extensions, std::string &errstr)=0 |
Set SASL/OAUTHBEARER token and metadata. More... | |
virtual ErrorCode | oauthbearer_set_token_failure (const std::string &errstr)=0 |
SASL/OAUTHBEARER token refresh failure indicator. More... | |
virtual Error * | sasl_background_callbacks_enable ()=0 |
Enable SASL OAUTHBEARER refresh callbacks on the librdkafka background thread. More... | |
virtual Queue * | get_sasl_queue ()=0 |
virtual Queue * | get_background_queue ()=0 |
virtual void * | mem_malloc (size_t size)=0 |
Allocate memory using the same allocator librdkafka uses. More... | |
virtual void | mem_free (void *ptr)=0 |
Free pointer returned by librdkafka. More... | |
virtual Error * | sasl_set_credentials (const std::string &username, const std::string &password)=0 |
Sets SASL credentials used for SASL PLAIN and SCRAM mechanisms by this Kafka client. More... | |
Static Public Member Functions | |
static KafkaConsumer * | create (const Conf *conf, std::string &errstr) |
Creates a KafkaConsumer. More... | |
High-level KafkaConsumer (for brokers 0.9 and later)
Currently supports the range
and roundrobin
partition assignment strategies (see partition.assignment.strategy
)
|
static |
Creates a KafkaConsumer.
The conf
object must have group.id
set to the consumer group to join.
Use RdKafka::KafkaConsumer::close() to shut down the consumer.
group.id
, session.timeout.ms
, partition.assignment.strategy
, etc.
|
pure virtual |
Update the subscription set to topics
.
Any previous subscription will be unassigned and unsubscribed first.
The subscription set denotes the desired topics to consume and this set is provided to the partition assignor (one of the elected group members) for all clients which then uses the configured partition.assignment.strategy
to assign the subscription sets's topics's partitions to the consumers, depending on their subscription.
The result of such an assignment is a rebalancing which is either handled automatically in librdkafka or can be overridden by the application by providing a RdKafka::RebalanceCb.
The rebalancing passes the assigned partition set to RdKafka::KafkaConsumer::assign() to update what partitions are actually being fetched by the KafkaConsumer.
Regex pattern matching automatically performed for topics prefixed with "^"
(e.g. "^myPfx
[0-9]_.*"
topics
. The error will be ERR_UNKNOWN_TOPIC_OR_PART for non-existent topics, and ERR_TOPIC_AUTHORIZATION_FAILED for unauthorized topics. The consumer error will be raised through consume() (et.al.) with the RdKafka::Message::err()
returning one of the error codes mentioned above. The subscribe function itself is asynchronous and will not return an error on unavailable topics.
|
pure virtual |
Update the assignment set to partitions
.
The assignment set is the set of partitions actually being consumed by the KafkaConsumer.
|
pure virtual |
Consume message or get error event, triggers callbacks.
Will automatically call registered callbacks for any such queued events, including RdKafka::RebalanceCb, RdKafka::EventCb, RdKafka::OffsetCommitCb, etc.
delete
to free the message.poll()
on KafkaConsumer objects.timeout_ms
(RdKafka::Message::err() is ERR__TIMED_OUT)
|
pure virtual |
Commit offsets for the current assignment.
|
pure virtual |
Asynchronous version of RdKafka::KafkaConsumer::CommitSync()
Commit offset for a single topic+partition based on message
.
Commit offset for a single topic+partition based on message
.
|
pure virtual |
Commit offsets for the provided list of partitions.
.offset of the partitions in offsets
should be the offset where consumption will resume, i.e., the last processed offset + 1.
|
pure virtual |
Commit offset for the provided list of partitions.
.offset of the partitions in offsets
should be the offset where consumption will resume, i.e., the last processed offset + 1.
|
pure virtual |
Commit offsets for the current assignment.
|
pure virtual |
Commit offsets for the provided list of partitions.
|
pure virtual |
Retrieve committed offsets for topics+partitions.
offset
or err
field of each partitions'
element is filled in with the stored offset, or a partition specific error. Else returns an error code.
|
pure virtual |
Retrieve current positions (offsets) for topics+partitions.
offset
or err
field of each partitions'
element is filled in with the stored offset, or a partition specific error. Else returns an error code.
|
pure virtual |
Close and shut down the consumer.
For pausing and resuming consumption, see
The maximum blocking time is roughly limited to session.timeout.ms.
delete
|
pure virtual |
Seek consumer for topic+partition to offset which is either an absolute or logical offset.
If timeout_ms
is not 0 the call will wait this long for the seek to be performed. If the timeout is reached the internal state will be unknown and this function returns ERR__TIMED_OUT
. If timeout_ms
is 0 it will initiate the seek but return immediately without any error reporting (e.g., async).
This call triggers a fetch queue barrier flush.
|
pure virtual |
Store offset offset
for topic partition partition
. The offset will be committed (written) to the offset store according to auto.commit.interval.ms
or the next manual offset-less commit*()
Per-partition success/error status propagated through TopicPartition.err()
.offset field is stored as is, it will NOT be + 1.enable.auto.offset.store
must be set to false
when using this API.enable.auto.offset.store
is true.
|
pure virtual |
group.id
. This metadata object should be passed to the transactional producer's RdKafka::Producer::send_offsets_to_transaction() API.
|
pure virtual |
Check whether the consumer considers the current assignment to have been lost involuntarily. This method is only applicable for use with a subscribing consumer. Assignments are revoked immediately when determined to have been lost, so this method is only useful within a rebalance callback. Partitions that have been lost may already be owned by other members in the group and therefore commiting offsets, for example, may fail.
|
pure virtual |
The rebalance protocol currently in use. This will be "NONE" if the consumer has not (yet) joined a group, else it will match the rebalance protocol ("EAGER", "COOPERATIVE") of the configured and selected assignor(s). All configured assignors must have the same protocol type, meaning online migration of a consumer group from using one protocol to another (in particular upgading from EAGER to COOPERATIVE) without a restart is not currently supported.
|
pure virtual |
Incrementally add partitions
to the current assignment.
If a COOPERATIVE assignor (i.e. incremental rebalancing) is being used, this method should be used in a rebalance callback to adjust the current assignment appropriately in the case where the rebalance type is ERR__ASSIGN_PARTITIONS. The application must pass the partition list passed to the callback (or a copy of it), even if the list is empty. This method may also be used outside the context of a rebalance callback.
|
pure virtual |
Incrementally remove partitions
from the current assignment.
If a COOPERATIVE assignor (i.e. incremental rebalancing) is being used, this method should be used in a rebalance callback to adjust the current assignment appropriately in the case where the rebalance type is ERR__REVOKE_PARTITIONS. The application must pass the partition list passed to the callback (or a copy of it), even if the list is empty. This method may also be used outside the context of a rebalance callback.
Close and shut down the consumer.
Performs the same actions as RdKafka::KafkaConsumer::close() but in a background thread.
Rebalance events/callbacks (etc) will be forwarded to the application-provided queue
. The application must poll this queue until RdKafka::KafkaConsumer::closed() returns true.
rkqu
.
|
pure virtual |