librdkafka
The Apache Kafka C/C++ client library
|
#include <rdkafkacpp.h>
Public Types | |
enum | { RK_MSG_FREE = 0x1, RK_MSG_COPY = 0x2, RK_MSG_BLOCK = 0x4 } |
RdKafka::Producer::produce() msgflags . More... | |
enum | { PURGE_QUEUE = 0x1, PURGE_INFLIGHT = 0x2, PURGE_NON_BLOCKING = 0x4 } |
RdKafka::Handle::purge() purge_flags . More... | |
Public Member Functions | |
virtual ErrorCode | produce (Topic *topic, int32_t partition, int msgflags, void *payload, size_t len, const std::string *key, void *msg_opaque)=0 |
Produce and send a single message to broker. More... | |
virtual ErrorCode | produce (Topic *topic, int32_t partition, int msgflags, void *payload, size_t len, const void *key, size_t key_len, void *msg_opaque)=0 |
Variant produce() that passes the key as a pointer and length instead of as a const std::string *. | |
virtual ErrorCode | produce (const std::string topic_name, int32_t partition, int msgflags, void *payload, size_t len, const void *key, size_t key_len, int64_t timestamp, void *msg_opaque)=0 |
produce() variant that takes topic as a string (no need for creating a Topic object), and also allows providing the message timestamp (milliseconds since beginning of epoch, UTC). Otherwise identical to produce() above. | |
virtual ErrorCode | produce (const std::string topic_name, int32_t partition, int msgflags, void *payload, size_t len, const void *key, size_t key_len, int64_t timestamp, RdKafka::Headers *headers, void *msg_opaque)=0 |
produce() variant that that allows for Header support on produce Otherwise identical to produce() above. More... | |
virtual ErrorCode | produce (Topic *topic, int32_t partition, const std::vector< char > *payload, const std::vector< char > *key, void *msg_opaque)=0 |
Variant produce() that accepts vectors for key and payload. The vector data will be copied. | |
virtual ErrorCode | flush (int timeout_ms)=0 |
Wait until all outstanding produce requests, et.al, are completed. This should typically be done prior to destroying a producer instance to make sure all queued and in-flight produce requests are completed before terminating. More... | |
virtual ErrorCode | purge (int purge_flags)=0 |
Purge messages currently handled by the producer instance. More... | |
Transactional API | |
Requires Kafka broker version v0.11.0 or later See the Transactional API documentation in rdkafka.h for more information. | |
virtual Error * | init_transactions (int timeout_ms)=0 |
Initialize transactions for the producer instance. More... | |
virtual Error * | begin_transaction ()=0 |
init_transactions() must have been called successfully (once) before this function is called. More... | |
virtual Error * | send_offsets_to_transaction (const std::vector< TopicPartition * > &offsets, const ConsumerGroupMetadata *group_metadata, int timeout_ms)=0 |
Sends a list of topic partition offsets to the consumer group coordinator for group_metadata , and marks the offsets as part part of the current transaction. These offsets will be considered committed only if the transaction is committed successfully. More... | |
virtual Error * | commit_transaction (int timeout_ms)=0 |
Commit the current transaction as started with begin_transaction(). More... | |
virtual Error * | abort_transaction (int timeout_ms)=0 |
Aborts the ongoing transaction. More... | |
Public Member Functions inherited from RdKafka::Handle | |
virtual std::string | name () const =0 |
virtual std::string | memberid () const =0 |
Returns the client's broker-assigned group member id. More... | |
virtual int | poll (int timeout_ms)=0 |
Polls the provided kafka handle for events. More... | |
virtual int | outq_len ()=0 |
Returns the current out queue length. More... | |
virtual ErrorCode | metadata (bool all_topics, const Topic *only_rkt, Metadata **metadatap, int timeout_ms)=0 |
Request Metadata from broker. More... | |
virtual ErrorCode | pause (std::vector< TopicPartition * > &partitions)=0 |
Pause producing or consumption for the provided list of partitions. More... | |
virtual ErrorCode | resume (std::vector< TopicPartition * > &partitions)=0 |
Resume producing or consumption for the provided list of partitions. More... | |
virtual ErrorCode | query_watermark_offsets (const std::string &topic, int32_t partition, int64_t *low, int64_t *high, int timeout_ms)=0 |
Query broker for low (oldest/beginning) and high (newest/end) offsets for partition. More... | |
virtual ErrorCode | get_watermark_offsets (const std::string &topic, int32_t partition, int64_t *low, int64_t *high)=0 |
Get last known low (oldest/beginning) and high (newest/end) offsets for partition. More... | |
virtual ErrorCode | offsetsForTimes (std::vector< TopicPartition * > &offsets, int timeout_ms)=0 |
Look up the offsets for the given partitions by timestamp. More... | |
virtual Queue * | get_partition_queue (const TopicPartition *partition)=0 |
Retrieve queue for a given partition. More... | |
virtual ErrorCode | set_log_queue (Queue *queue)=0 |
Forward librdkafka logs (and debug) to the specified queue for serving with one of the ..poll() calls. More... | |
virtual void | yield ()=0 |
Cancels the current callback dispatcher (Handle::poll(), KafkaConsumer::consume(), etc). More... | |
virtual std::string | clusterid (int timeout_ms)=0 |
Returns the ClusterId as reported in broker metadata. More... | |
virtual struct rd_kafka_s * | c_ptr ()=0 |
Returns the underlying librdkafka C rd_kafka_t handle. More... | |
virtual int32_t | controllerid (int timeout_ms)=0 |
Returns the current ControllerId (controller broker id) as reported in broker metadata. More... | |
virtual ErrorCode | fatal_error (std::string &errstr) const =0 |
Returns the first fatal error set on this client instance, or ERR_NO_ERROR if no fatal error has occurred. More... | |
virtual ErrorCode | oauthbearer_set_token (const std::string &token_value, int64_t md_lifetime_ms, const std::string &md_principal_name, const std::list< std::string > &extensions, std::string &errstr)=0 |
Set SASL/OAUTHBEARER token and metadata. More... | |
virtual ErrorCode | oauthbearer_set_token_failure (const std::string &errstr)=0 |
SASL/OAUTHBEARER token refresh failure indicator. More... | |
virtual Error * | sasl_background_callbacks_enable ()=0 |
Enable SASL OAUTHBEARER refresh callbacks on the librdkafka background thread. More... | |
virtual Queue * | get_sasl_queue ()=0 |
virtual Queue * | get_background_queue ()=0 |
virtual void * | mem_malloc (size_t size)=0 |
Allocate memory using the same allocator librdkafka uses. More... | |
virtual void | mem_free (void *ptr)=0 |
Free pointer returned by librdkafka. More... | |
virtual Error * | sasl_set_credentials (const std::string &username, const std::string &password)=0 |
Sets SASL credentials used for SASL PLAIN and SCRAM mechanisms by this Kafka client. More... | |
Static Public Member Functions | |
static Producer * | create (const Conf *conf, std::string &errstr) |
Creates a new Kafka producer handle. More... | |
anonymous enum |
RdKafka::Producer::produce() msgflags
.
These flags are optional.
Enumerator | |
---|---|
RK_MSG_FREE | rdkafka will free(3) |
RK_MSG_COPY | the |
RK_MSG_BLOCK | Block produce*() on message queue full. WARNING: If a delivery report callback is used the application MUST call rd_kafka_poll() (or equiv.) to make sure delivered messages are drained from the internal delivery report queue. Failure to do so will result in indefinately blocking on the produce() call when the message queue is full. |
anonymous enum |
RdKafka::Handle::purge() purge_flags
.
Creates a new Kafka producer handle.
conf
is an optional object that will be used instead of the default configuration. The conf
object is reusable after this call.
errstr
is set to a human readable error message.
|
pure virtual |
Produce and send a single message to broker.
This is an asynch non-blocking API.
partition
is the target partition, either:
msgflags
is zero or more of the following flags OR:ed together: RK_MSG_BLOCK - block produce*
() call if queue.buffering.max.messages
or queue.buffering.max.kbytes
are exceeded. Messages are considered in-queue from the point they are accepted by produce() until their corresponding delivery report callback/event returns. It is thus a requirement to call poll() (or equiv.) from a separate thread when RK_MSG_BLOCK is used. See WARNING on RK_MSG_BLOCK
above. RK_MSG_FREE - rdkafka will free(3) payload
when it is done with it. RK_MSG_COPY - the payload
data will be copied and the payload
pointer will not be used by rdkafka after the call returns.
NOTE: RK_MSG_FREE and RK_MSG_COPY are mutually exclusive.
If the function returns an error code and RK_MSG_FREE was specified, then the memory associated with the payload is still the caller's responsibility.
payload
is the message payload of size len
bytes.
key
is an optional message key, if non-NULL it will be passed to the topic partitioner as well as be sent with the message to the broker and passed on to the consumer.
msg_opaque
is an optional application-provided per-message opaque pointer that will provided in the delivery report callback (dr_cb
) for referencing this message.
queue.buffering.max.message
messages.max.bytes
partition
is unknown in the Kafka cluster.
|
pure virtual |
|
pure virtual |
Wait until all outstanding produce requests, et.al, are completed. This should typically be done prior to destroying a producer instance to make sure all queued and in-flight produce requests are completed before terminating.
linger.ms
time will be ignored for the duration of the call, queued messages will be sent to the broker as soon as possible.timeout_ms
was reached before all outstanding requests were completed, else ERR_NO_ERROR
|
pure virtual |
Purge messages currently handled by the producer instance.
purge_flags | tells which messages should be purged and how. |
The application will need to call Handle::poll() or Producer::flush() afterwards to serve the delivery report callbacks of the purged messages.
Messages purged from internal queues fail with the delivery report error code set to ERR__PURGE_QUEUE, while purged messages that are in-flight to or from the broker will fail with the error code set to ERR__PURGE_INFLIGHT.
purge
flags are invalid or unknown, ERR__NOT_IMPLEMENTED if called on a non-producer client instance.
|
pure virtual |
Initialize transactions for the producer instance.
timeout_ms | The maximum time to block. On timeout the operation may continue in the background, depending on state, and it is okay to call init_transactions() again. |
See rd_kafka_init_transactions() in rdkafka.h for more information.
|
pure virtual |
init_transactions() must have been called successfully (once) before this function is called.
See rd_kafka_begin_transaction() in rdkafka.h for more information.
|
pure virtual |
Sends a list of topic partition offsets to the consumer group coordinator for group_metadata
, and marks the offsets as part part of the current transaction. These offsets will be considered committed only if the transaction is committed successfully.
The offsets should be the next message your application will consume, i.e., the last processed message's offset + 1 for each partition. Either track the offsets manually during processing or use RdKafka::KafkaConsumer::position() (on the consumer) to get the current offsets for the partitions assigned to the consumer.
Use this method at the end of a consume-transform-produce loop prior to committing the transaction with commit_transaction().
offsets | List of offsets to commit to the consumer group upon successful commit of the transaction. Offsets should be the next message to consume, e.g., last processed message + 1. |
group_metadata | The current consumer group metadata as returned by RdKafka::KafkaConsumer::groupMetadata() on the consumer instance the provided offsets were consumed from. |
timeout_ms | Maximum time allowed to register the offsets on the broker. |
enable.auto.commit
to false on the consumer).See rd_kafka_send_offsets_to_transaction() in rdkafka.h for more information.
|
pure virtual |
Commit the current transaction as started with begin_transaction().
Any outstanding messages will be flushed (delivered) before actually committing the transaction.
timeout_ms | The maximum time to block. On timeout the operation may continue in the background, depending on state, and it is okay to call this function again. Pass -1 to use the remaining transaction timeout, this is the recommended use. |
timeout_ms
. Using other values risk internal state desynchronization in case any of the underlying protocol requests fail.See rd_kafka_commit_transaction() in rdkafka.h for more information.
|
pure virtual |
Aborts the ongoing transaction.
This function should also be used to recover from non-fatal
abortable transaction errors.
Any outstanding messages will be purged and fail with RdKafka::ERR__PURGE_INFLIGHT or RdKafka::ERR__PURGE_QUEUE. See RdKafka::Producer::purge() for details.
timeout_ms | The maximum time to block. On timeout the operation may continue in the background, depending on state, and it is okay to call this function again. Pass -1 to use the remaining transaction timeout, this is the recommended use. |
timeout_ms
. Using other values risk internal state desynchronization in case any of the underlying protocol requests fail.See rd_kafka_abort_transaction() in rdkafka.h for more information.