librdkafka
The Apache Kafka C/C++ client library
RdKafka::Producer Class Referenceabstract

Producer. More...

#include <rdkafkacpp.h>

Inheritance diagram for RdKafka::Producer:
Collaboration diagram for RdKafka::Producer:

Public Types

enum  {
  RK_MSG_FREE = 0x1 ,
  RK_MSG_COPY = 0x2 ,
  RK_MSG_BLOCK = 0x4
}
 RdKafka::Producer::produce() msgflags. More...
 
enum  {
  PURGE_QUEUE = 0x1 ,
  PURGE_INFLIGHT = 0x2 ,
  PURGE_NON_BLOCKING = 0x4
}
 RdKafka::Handle::purge() purge_flags. More...
 

Public Member Functions

virtual ErrorCode produce (Topic *topic, int32_t partition, int msgflags, void *payload, size_t len, const std::string *key, void *msg_opaque)=0
 Produce and send a single message to broker. More...
 
virtual ErrorCode produce (Topic *topic, int32_t partition, int msgflags, void *payload, size_t len, const void *key, size_t key_len, void *msg_opaque)=0
 Variant produce() that passes the key as a pointer and length instead of as a const std::string *.
 
virtual ErrorCode produce (const std::string topic_name, int32_t partition, int msgflags, void *payload, size_t len, const void *key, size_t key_len, int64_t timestamp, void *msg_opaque)=0
 produce() variant that takes topic as a string (no need for creating a Topic object), and also allows providing the message timestamp (milliseconds since beginning of epoch, UTC). Otherwise identical to produce() above.
 
virtual ErrorCode produce (const std::string topic_name, int32_t partition, int msgflags, void *payload, size_t len, const void *key, size_t key_len, int64_t timestamp, RdKafka::Headers *headers, void *msg_opaque)=0
 produce() variant that that allows for Header support on produce Otherwise identical to produce() above. More...
 
virtual ErrorCode produce (Topic *topic, int32_t partition, const std::vector< char > *payload, const std::vector< char > *key, void *msg_opaque)=0
 Variant produce() that accepts vectors for key and payload. The vector data will be copied.
 
virtual ErrorCode flush (int timeout_ms)=0
 Wait until all outstanding produce requests, et.al, are completed. This should typically be done prior to destroying a producer instance to make sure all queued and in-flight produce requests are completed before terminating. More...
 
virtual ErrorCode purge (int purge_flags)=0
 Purge messages currently handled by the producer instance. More...
 
Transactional API

Requires Kafka broker version v0.11.0 or later

See the Transactional API documentation in rdkafka.h for more information.

virtual Errorinit_transactions (int timeout_ms)=0
 Initialize transactions for the producer instance. More...
 
virtual Errorbegin_transaction ()=0
 init_transactions() must have been called successfully (once) before this function is called. More...
 
virtual Errorsend_offsets_to_transaction (const std::vector< TopicPartition * > &offsets, const ConsumerGroupMetadata *group_metadata, int timeout_ms)=0
 Sends a list of topic partition offsets to the consumer group coordinator for group_metadata, and marks the offsets as part part of the current transaction. These offsets will be considered committed only if the transaction is committed successfully. More...
 
virtual Errorcommit_transaction (int timeout_ms)=0
 Commit the current transaction as started with begin_transaction(). More...
 
virtual Errorabort_transaction (int timeout_ms)=0
 Aborts the ongoing transaction. More...
 
- Public Member Functions inherited from RdKafka::Handle
virtual const std::string name () const =0
 
virtual const std::string memberid () const =0
 Returns the client's broker-assigned group member id. More...
 
virtual int poll (int timeout_ms)=0
 Polls the provided kafka handle for events. More...
 
virtual int outq_len ()=0
 Returns the current out queue length. More...
 
virtual ErrorCode metadata (bool all_topics, const Topic *only_rkt, Metadata **metadatap, int timeout_ms)=0
 Request Metadata from broker. More...
 
virtual ErrorCode pause (std::vector< TopicPartition * > &partitions)=0
 Pause producing or consumption for the provided list of partitions. More...
 
virtual ErrorCode resume (std::vector< TopicPartition * > &partitions)=0
 Resume producing or consumption for the provided list of partitions. More...
 
virtual ErrorCode query_watermark_offsets (const std::string &topic, int32_t partition, int64_t *low, int64_t *high, int timeout_ms)=0
 Query broker for low (oldest/beginning) and high (newest/end) offsets for partition. More...
 
virtual ErrorCode get_watermark_offsets (const std::string &topic, int32_t partition, int64_t *low, int64_t *high)=0
 Get last known low (oldest/beginning) and high (newest/end) offsets for partition. More...
 
virtual ErrorCode offsetsForTimes (std::vector< TopicPartition * > &offsets, int timeout_ms)=0
 Look up the offsets for the given partitions by timestamp. More...
 
virtual Queueget_partition_queue (const TopicPartition *partition)=0
 Retrieve queue for a given partition. More...
 
virtual ErrorCode set_log_queue (Queue *queue)=0
 Forward librdkafka logs (and debug) to the specified queue for serving with one of the ..poll() calls. More...
 
virtual void yield ()=0
 Cancels the current callback dispatcher (Handle::poll(), KafkaConsumer::consume(), etc). More...
 
virtual const std::string clusterid (int timeout_ms)=0
 Returns the ClusterId as reported in broker metadata. More...
 
virtual struct rd_kafka_s * c_ptr ()=0
 Returns the underlying librdkafka C rd_kafka_t handle. More...
 
virtual int32_t controllerid (int timeout_ms)=0
 Returns the current ControllerId (controller broker id) as reported in broker metadata. More...
 
virtual ErrorCode fatal_error (std::string &errstr) const =0
 Returns the first fatal error set on this client instance, or ERR_NO_ERROR if no fatal error has occurred. More...
 
virtual ErrorCode oauthbearer_set_token (const std::string &token_value, int64_t md_lifetime_ms, const std::string &md_principal_name, const std::list< std::string > &extensions, std::string &errstr)=0
 Set SASL/OAUTHBEARER token and metadata. More...
 
virtual ErrorCode oauthbearer_set_token_failure (const std::string &errstr)=0
 SASL/OAUTHBEARER token refresh failure indicator. More...
 
virtual Errorsasl_background_callbacks_enable ()=0
 Enable SASL OAUTHBEARER refresh callbacks on the librdkafka background thread. More...
 
virtual Queueget_sasl_queue ()=0
 
virtual Queueget_background_queue ()=0
 
virtual void * mem_malloc (size_t size)=0
 Allocate memory using the same allocator librdkafka uses. More...
 
virtual void mem_free (void *ptr)=0
 Free pointer returned by librdkafka. More...
 

Static Public Member Functions

static Producercreate (const Conf *conf, std::string &errstr)
 Creates a new Kafka producer handle. More...
 

Detailed Description

Member Enumeration Documentation

◆ anonymous enum

anonymous enum

RdKafka::Producer::produce() msgflags.

These flags are optional.

Enumerator
RK_MSG_FREE 

rdkafka will free(3) payload when it is done with it. Mutually exclusive with RK_MSG_COPY.

RK_MSG_COPY 

the payload data will be copied and the payload pointer will not be used by rdkafka after the call returns. Mutually exclusive with RK_MSG_FREE.

RK_MSG_BLOCK 

Block produce*() on message queue full. WARNING: If a delivery report callback is used the application MUST call rd_kafka_poll() (or equiv.) to make sure delivered messages are drained from the internal delivery report queue. Failure to do so will result in indefinately blocking on the produce() call when the message queue is full.

◆ anonymous enum

anonymous enum

RdKafka::Handle::purge() purge_flags.

Enumerator
PURGE_QUEUE 

Purge messages in internal queues

PURGE_NON_BLOCKING 

Purge messages in-flight to or from the broker. Purging these messages will void any future acknowledgements from the broker, making it impossible for the application to know if these messages were successfully delivered or not. Retrying these messages may lead to duplicates.

Member Function Documentation

◆ create()

static Producer* RdKafka::Producer::create ( const Conf conf,
std::string &  errstr 
)
static

Creates a new Kafka producer handle.

conf is an optional object that will be used instead of the default configuration. The conf object is reusable after this call.

Returns
the new handle on success or NULL on error in which case errstr is set to a human readable error message.

◆ produce() [1/2]

virtual ErrorCode RdKafka::Producer::produce ( Topic topic,
int32_t  partition,
int  msgflags,
void *  payload,
size_t  len,
const std::string *  key,
void *  msg_opaque 
)
pure virtual

Produce and send a single message to broker.

This is an asynch non-blocking API.

partition is the target partition, either:

msgflags is zero or more of the following flags OR:ed together: RK_MSG_BLOCK - block produce*() call if queue.buffering.max.messages or queue.buffering.max.kbytes are exceeded. Messages are considered in-queue from the point they are accepted by produce() until their corresponding delivery report callback/event returns. It is thus a requirement to call poll() (or equiv.) from a separate thread when RK_MSG_BLOCK is used. See WARNING on RK_MSG_BLOCK above. RK_MSG_FREE - rdkafka will free(3) payload when it is done with it. RK_MSG_COPY - the payload data will be copied and the payload pointer will not be used by rdkafka after the call returns.

NOTE: RK_MSG_FREE and RK_MSG_COPY are mutually exclusive.

If the function returns an error code and RK_MSG_FREE was specified, then the memory associated with the payload is still the caller's responsibility.

payload is the message payload of size len bytes.

key is an optional message key, if non-NULL it will be passed to the topic partitioner as well as be sent with the message to the broker and passed on to the consumer.

msg_opaque is an optional application-provided per-message opaque pointer that will provided in the delivery report callback (dr_cb) for referencing this message.

Returns
an ErrorCode to indicate success or failure:
  • ERR_NO_ERROR - message successfully enqueued for transmission.
  • ERR__QUEUE_FULL - maximum number of outstanding messages has been reached: queue.buffering.max.message
  • ERR_MSG_SIZE_TOO_LARGE - message is larger than configured max size: messages.max.bytes
  • ERR__UNKNOWN_PARTITION - requested partition is unknown in the Kafka cluster.
  • ERR__UNKNOWN_TOPIC - topic is unknown in the Kafka cluster.

◆ produce() [2/2]

virtual ErrorCode RdKafka::Producer::produce ( const std::string  topic_name,
int32_t  partition,
int  msgflags,
void *  payload,
size_t  len,
const void *  key,
size_t  key_len,
int64_t  timestamp,
RdKafka::Headers headers,
void *  msg_opaque 
)
pure virtual

produce() variant that that allows for Header support on produce Otherwise identical to produce() above.

Warning
The headers will be freed/deleted if the produce() call succeeds, or left untouched if produce() fails.

◆ flush()

virtual ErrorCode RdKafka::Producer::flush ( int  timeout_ms)
pure virtual

Wait until all outstanding produce requests, et.al, are completed. This should typically be done prior to destroying a producer instance to make sure all queued and in-flight produce requests are completed before terminating.

Remarks
The linger.ms time will be ignored for the duration of the call, queued messages will be sent to the broker as soon as possible.
This function will call Producer::poll() and thus trigger callbacks.
Returns
ERR__TIMED_OUT if timeout_ms was reached before all outstanding requests were completed, else ERR_NO_ERROR

◆ purge()

virtual ErrorCode RdKafka::Producer::purge ( int  purge_flags)
pure virtual

Purge messages currently handled by the producer instance.

Parameters
purge_flagstells which messages should be purged and how.

The application will need to call Handle::poll() or Producer::flush() afterwards to serve the delivery report callbacks of the purged messages.

Messages purged from internal queues fail with the delivery report error code set to ERR__PURGE_QUEUE, while purged messages that are in-flight to or from the broker will fail with the error code set to ERR__PURGE_INFLIGHT.

Warning
Purging messages that are in-flight to or from the broker will ignore any sub-sequent acknowledgement for these messages received from the broker, effectively making it impossible for the application to know if the messages were successfully produced or not. This may result in duplicate messages if the application retries these messages at a later time.
Remarks
This call may block for a short time while background thread queues are purged.
Returns
ERR_NO_ERROR on success, ERR__INVALID_ARG if the purge flags are invalid or unknown, ERR__NOT_IMPLEMENTED if called on a non-producer client instance.

◆ init_transactions()

virtual Error* RdKafka::Producer::init_transactions ( int  timeout_ms)
pure virtual

Initialize transactions for the producer instance.

Parameters
timeout_msThe maximum time to block. On timeout the operation may continue in the background, depending on state, and it is okay to call init_transactions() again.
Returns
an RdKafka::Error object on error, or NULL on success. Check whether the returned error object permits retrying by calling RdKafka::Error::is_retriable(), or whether a fatal error has been raised by calling RdKafka::Error::is_fatal().
Remarks
The returned error object (if not NULL) must be deleted.

See rd_kafka_init_transactions() in rdkafka.h for more information.

◆ begin_transaction()

virtual Error* RdKafka::Producer::begin_transaction ( )
pure virtual

init_transactions() must have been called successfully (once) before this function is called.

Returns
an RdKafka::Error object on error, or NULL on success. Check whether a fatal error has been raised by calling RdKafka::Error::is_fatal_error().
Remarks
The returned error object (if not NULL) must be deleted.

See rd_kafka_begin_transaction() in rdkafka.h for more information.

◆ send_offsets_to_transaction()

virtual Error* RdKafka::Producer::send_offsets_to_transaction ( const std::vector< TopicPartition * > &  offsets,
const ConsumerGroupMetadata group_metadata,
int  timeout_ms 
)
pure virtual

Sends a list of topic partition offsets to the consumer group coordinator for group_metadata, and marks the offsets as part part of the current transaction. These offsets will be considered committed only if the transaction is committed successfully.

The offsets should be the next message your application will consume, i.e., the last processed message's offset + 1 for each partition. Either track the offsets manually during processing or use RdKafka::KafkaConsumer::position() (on the consumer) to get the current offsets for the partitions assigned to the consumer.

Use this method at the end of a consume-transform-produce loop prior to committing the transaction with commit_transaction().

Parameters
offsetsList of offsets to commit to the consumer group upon successful commit of the transaction. Offsets should be the next message to consume, e.g., last processed message + 1.
group_metadataThe current consumer group metadata as returned by RdKafka::KafkaConsumer::groupMetadata() on the consumer instance the provided offsets were consumed from.
timeout_msMaximum time allowed to register the offsets on the broker.
Remarks
This function must be called on the transactional producer instance, not the consumer.
The consumer must disable auto commits (set enable.auto.commit to false on the consumer).
Returns
an RdKafka::Error object on error, or NULL on success. Check whether the returned error object permits retrying by calling RdKafka::Error::is_retriable(), or whether an abortable or fatal error has been raised by calling RdKafka::Error::txn_requires_abort() or RdKafka::Error::is_fatal() respectively.
Remarks
The returned error object (if not NULL) must be deleted.

See rd_kafka_send_offsets_to_transaction() in rdkafka.h for more information.

◆ commit_transaction()

virtual Error* RdKafka::Producer::commit_transaction ( int  timeout_ms)
pure virtual

Commit the current transaction as started with begin_transaction().

   Any outstanding messages will be flushed (delivered) before actually
   committing the transaction.
Parameters
timeout_msThe maximum time to block. On timeout the operation may continue in the background, depending on state, and it is okay to call this function again. Pass -1 to use the remaining transaction timeout, this is the recommended use.
Remarks
It is strongly recommended to always pass -1 (remaining transaction time) as the timeout_ms. Using other values risk internal state desynchronization in case any of the underlying protocol requests fail.
Returns
an RdKafka::Error object on error, or NULL on success. Check whether the returned error object permits retrying by calling RdKafka::Error::is_retriable(), or whether an abortable or fatal error has been raised by calling RdKafka::Error::txn_requires_abort() or RdKafka::Error::is_fatal() respectively.
Remarks
The returned error object (if not NULL) must be deleted.

See rd_kafka_commit_transaction() in rdkafka.h for more information.

◆ abort_transaction()

virtual Error* RdKafka::Producer::abort_transaction ( int  timeout_ms)
pure virtual

Aborts the ongoing transaction.

   This function should also be used to recover from non-fatal

abortable transaction errors.

   Any outstanding messages will be purged and fail with
   RdKafka::ERR__PURGE_INFLIGHT or RdKafka::ERR__PURGE_QUEUE.
   See RdKafka::Producer::purge() for details.
Parameters
timeout_msThe maximum time to block. On timeout the operation may continue in the background, depending on state, and it is okay to call this function again. Pass -1 to use the remaining transaction timeout, this is the recommended use.
Remarks
It is strongly recommended to always pass -1 (remaining transaction time) as the timeout_ms. Using other values risk internal state desynchronization in case any of the underlying protocol requests fail.
Returns
an RdKafka::Error object on error, or NULL on success. Check whether the returned error object permits retrying by calling RdKafka::Error::is_retriable(), or whether a fatal error has been raised by calling RdKafka::Error::is_fatal().
Remarks
The returned error object (if not NULL) must be deleted.

See rd_kafka_abort_transaction() in rdkafka.h for more information.


The documentation for this class was generated from the following file: