librdkafka
The Apache Kafka C/C++ client library
 All Data Structures Files Functions Variables Typedefs Enumerations Enumerator Macros Pages
RdKafka::Producer Class Referenceabstract

Producer. More...

#include <rdkafkacpp.h>

Inheritance diagram for RdKafka::Producer:
Collaboration diagram for RdKafka::Producer:

Public Types

enum  {
  RK_MSG_FREE = 0x1,
  RK_MSG_COPY = 0x2,
  RK_MSG_BLOCK = 0x4
}
 RdKafka::Producer::produce() msgflags. More...
 

Public Member Functions

virtual ErrorCode produce (Topic *topic, int32_t partition, int msgflags, void *payload, size_t len, const std::string *key, void *msg_opaque)=0
 Produce and send a single message to broker. More...
 
virtual ErrorCode produce (Topic *topic, int32_t partition, int msgflags, void *payload, size_t len, const void *key, size_t key_len, void *msg_opaque)=0
 Variant produce() that passes the key as a pointer and length instead of as a const std::string *.
 
virtual ErrorCode produce (const std::string topic_name, int32_t partition, int msgflags, void *payload, size_t len, const void *key, size_t key_len, int64_t timestamp, void *msg_opaque)=0
 produce() variant that takes topic as a string (no need for creating a Topic object), and also allows providing the message timestamp (microseconds since beginning of epoch, UTC). Otherwise identical to produce() above.
 
virtual ErrorCode produce (Topic *topic, int32_t partition, const std::vector< char > *payload, const std::vector< char > *key, void *msg_opaque)=0
 Variant produce() that accepts vectors for key and payload. The vector data will be copied.
 
virtual ErrorCode flush (int timeout_ms)=0
 Wait until all outstanding produce requests, et.al, are completed. This should typically be done prior to destroying a producer instance to make sure all queued and in-flight produce requests are completed before terminating. More...
 
- Public Member Functions inherited from RdKafka::Handle
virtual const std::string name () const =0
 
virtual const std::string memberid () const =0
 Returns the client's broker-assigned group member id. More...
 
virtual int poll (int timeout_ms)=0
 Polls the provided kafka handle for events. More...
 
virtual int outq_len ()=0
 Returns the current out queue length. More...
 
virtual ErrorCode metadata (bool all_topics, const Topic *only_rkt, Metadata **metadatap, int timeout_ms)=0
 Request Metadata from broker. More...
 
virtual ErrorCode pause (std::vector< TopicPartition * > &partitions)=0
 Pause producing or consumption for the provided list of partitions. More...
 
virtual ErrorCode resume (std::vector< TopicPartition * > &partitions)=0
 Resume producing or consumption for the provided list of partitions. More...
 
virtual ErrorCode query_watermark_offsets (const std::string &topic, int32_t partition, int64_t *low, int64_t *high, int timeout_ms)=0
 Query broker for low (oldest/beginning) and high (newest/end) offsets for partition. More...
 
virtual ErrorCode get_watermark_offsets (const std::string &topic, int32_t partition, int64_t *low, int64_t *high)=0
 Get last known low (oldest/beginning) and high (newest/end) offsets for partition. More...
 
virtual ErrorCode offsetsForTimes (std::vector< TopicPartition * > &offsets, int timeout_ms)=0
 Look up the offsets for the given partitions by timestamp. More...
 
virtual Queueget_partition_queue (const TopicPartition *partition)=0
 Retrieve queue for a given partition. More...
 
virtual ErrorCode set_log_queue (Queue *queue)=0
 Forward librdkafka logs (and debug) to the specified queue for serving with one of the ..poll() calls. More...
 
virtual void yield ()=0
 Cancels the current callback dispatcher (Producer::poll(), Consumer::poll(), KafkaConsumer::consume(), etc). More...
 
virtual const std::string clusterid (int timeout_ms)=0
 Returns the ClusterId as reported in broker metadata. More...
 
virtual struct rd_kafka_s * c_ptr ()=0
 Returns the underlying librdkafka C rd_kafka_t handle. More...
 
virtual int32_t controllerid (int timeout_ms)=0
 Returns the current ControllerId (controller broker id) as reported in broker metadata. More...
 

Static Public Member Functions

static Producercreate (Conf *conf, std::string &errstr)
 Creates a new Kafka producer handle. More...
 

Detailed Description

Member Enumeration Documentation

anonymous enum

RdKafka::Producer::produce() msgflags.

These flags are optional and mutually exclusive.

Enumerator
RK_MSG_FREE 

rdkafka will free(3) payload when it is done with it.

RK_MSG_COPY 

the payload data will be copied and the payload pointer will not be used by rdkafka after the call returns.

RK_MSG_BLOCK 

Block produce*() on message queue full. WARNING: If a delivery report callback is used the application MUST call rd_kafka_poll() (or equiv.) to make sure delivered messages are drained from the internal delivery report queue. Failure to do so will result in indefinately blocking on the produce() call when the message queue is full.

Member Function Documentation

static Producer* RdKafka::Producer::create ( Conf conf,
std::string &  errstr 
)
static

Creates a new Kafka producer handle.

conf is an optional object that will be used instead of the default configuration. The conf object is reusable after this call.

Returns
the new handle on success or NULL on error in which case errstr is set to a human readable error message.
virtual ErrorCode RdKafka::Producer::produce ( Topic topic,
int32_t  partition,
int  msgflags,
void *  payload,
size_t  len,
const std::string *  key,
void *  msg_opaque 
)
pure virtual

Produce and send a single message to broker.

This is an asynch non-blocking API.

partition is the target partition, either:

msgflags is zero or more of the following flags OR:ed together: RK_MSG_BLOCK - block produce*() call if queue.buffering.max.messages or queue.buffering.max.kbytes are exceeded. Messages are considered in-queue from the point they are accepted by produce() until their corresponding delivery report callback/event returns. It is thus a requirement to call poll() (or equiv.) from a separate thread when RK_MSG_BLOCK is used. See WARNING on RK_MSG_BLOCK above. RK_MSG_FREE - rdkafka will free(3) payload when it is done with it. RK_MSG_COPY - the payload data will be copied and the payload pointer will not be used by rdkafka after the call returns.

NOTE: RK_MSG_FREE and RK_MSG_COPY are mutually exclusive.

If the function returns an error code and RK_MSG_FREE was specified, then the memory associated with the payload is still the caller's responsibility.

payload is the message payload of size len bytes.

key is an optional message key, if non-NULL it will be passed to the topic partitioner as well as be sent with the message to the broker and passed on to the consumer.

msg_opaque is an optional application-provided per-message opaque pointer that will provided in the delivery report callback (dr_cb) for referencing this message.

Returns
an ErrorCode to indicate success or failure:
  • ERR_NO_ERROR - message successfully enqueued for transmission.
  • ERR__QUEUE_FULL - maximum number of outstanding messages has been reached: queue.buffering.max.message
  • ERR_MSG_SIZE_TOO_LARGE - message is larger than configured max size: messages.max.bytes
  • ERR__UNKNOWN_PARTITION - requested partition is unknown in the Kafka cluster.
  • ERR__UNKNOWN_TOPIC - topic is unknown in the Kafka cluster.
virtual ErrorCode RdKafka::Producer::flush ( int  timeout_ms)
pure virtual

Wait until all outstanding produce requests, et.al, are completed. This should typically be done prior to destroying a producer instance to make sure all queued and in-flight produce requests are completed before terminating.

Remarks
This function will call poll() and thus trigger callbacks.
Returns
ERR__TIMED_OUT if timeout_ms was reached before all outstanding requests were completed, else ERR_NO_ERROR

The documentation for this class was generated from the following file: