librdkafka
The Apache Kafka C/C++ client library
|
#include <rdkafkacpp.h>
Public Types | |
enum | { RK_MSG_FREE = 0x1, RK_MSG_COPY = 0x2, RK_MSG_BLOCK = 0x4 } |
RdKafka::Producer::produce() msgflags . More... | |
Public Member Functions | |
virtual ErrorCode | produce (Topic *topic, int32_t partition, int msgflags, void *payload, size_t len, const std::string *key, void *msg_opaque)=0 |
Produce and send a single message to broker. More... | |
virtual ErrorCode | produce (Topic *topic, int32_t partition, int msgflags, void *payload, size_t len, const void *key, size_t key_len, void *msg_opaque)=0 |
Variant produce() that passes the key as a pointer and length instead of as a const std::string *. | |
virtual ErrorCode | produce (const std::string topic_name, int32_t partition, int msgflags, void *payload, size_t len, const void *key, size_t key_len, int64_t timestamp, void *msg_opaque)=0 |
produce() variant that takes topic as a string (no need for creating a Topic object), and also allows providing the message timestamp (microseconds since beginning of epoch, UTC). Otherwise identical to produce() above. | |
virtual ErrorCode | produce (Topic *topic, int32_t partition, const std::vector< char > *payload, const std::vector< char > *key, void *msg_opaque)=0 |
Variant produce() that accepts vectors for key and payload. The vector data will be copied. | |
virtual ErrorCode | flush (int timeout_ms)=0 |
Wait until all outstanding produce requests, et.al, are completed. This should typically be done prior to destroying a producer instance to make sure all queued and in-flight produce requests are completed before terminating. More... | |
Public Member Functions inherited from RdKafka::Handle | |
virtual const std::string | name () const =0 |
virtual const std::string | memberid () const =0 |
Returns the client's broker-assigned group member id. More... | |
virtual int | poll (int timeout_ms)=0 |
Polls the provided kafka handle for events. More... | |
virtual int | outq_len ()=0 |
Returns the current out queue length. More... | |
virtual ErrorCode | metadata (bool all_topics, const Topic *only_rkt, Metadata **metadatap, int timeout_ms)=0 |
Request Metadata from broker. More... | |
virtual ErrorCode | pause (std::vector< TopicPartition * > &partitions)=0 |
Pause producing or consumption for the provided list of partitions. More... | |
virtual ErrorCode | resume (std::vector< TopicPartition * > &partitions)=0 |
Resume producing or consumption for the provided list of partitions. More... | |
virtual ErrorCode | query_watermark_offsets (const std::string &topic, int32_t partition, int64_t *low, int64_t *high, int timeout_ms)=0 |
Query broker for low (oldest/beginning) and high (newest/end) offsets for partition. More... | |
virtual ErrorCode | get_watermark_offsets (const std::string &topic, int32_t partition, int64_t *low, int64_t *high)=0 |
Get last known low (oldest/beginning) and high (newest/end) offsets for partition. More... | |
virtual ErrorCode | offsetsForTimes (std::vector< TopicPartition * > &offsets, int timeout_ms)=0 |
Look up the offsets for the given partitions by timestamp. More... | |
virtual Queue * | get_partition_queue (const TopicPartition *partition)=0 |
Retrieve queue for a given partition. More... | |
virtual ErrorCode | set_log_queue (Queue *queue)=0 |
Forward librdkafka logs (and debug) to the specified queue for serving with one of the ..poll() calls. More... | |
virtual void | yield ()=0 |
Cancels the current callback dispatcher (Producer::poll(), Consumer::poll(), KafkaConsumer::consume(), etc). More... | |
virtual const std::string | clusterid (int timeout_ms)=0 |
Returns the ClusterId as reported in broker metadata. More... | |
virtual struct rd_kafka_s * | c_ptr ()=0 |
Returns the underlying librdkafka C rd_kafka_t handle. More... | |
Static Public Member Functions | |
static Producer * | create (Conf *conf, std::string &errstr) |
Creates a new Kafka producer handle. More... | |
anonymous enum |
RdKafka::Producer::produce() msgflags
.
These flags are optional and mutually exclusive.
Enumerator | |
---|---|
RK_MSG_FREE |
rdkafka will free(3) |
RK_MSG_COPY |
the |
RK_MSG_BLOCK |
Block produce*() on message queue full. WARNING: If a delivery report callback is used the application MUST call rd_kafka_poll() (or equiv.) to make sure delivered messages are drained from the internal delivery report queue. Failure to do so will result in indefinately blocking on the produce() call when the message queue is full. |
Creates a new Kafka producer handle.
conf
is an optional object that will be used instead of the default configuration. The conf
object is reusable after this call.
errstr
is set to a human readable error message.
|
pure virtual |
Produce and send a single message to broker.
This is an asynch non-blocking API.
partition
is the target partition, either:
msgflags
is zero or more of the following flags OR:ed together: RK_MSG_BLOCK - block produce*
() call if queue.buffering.max.messages
or queue.buffering.max.kbytes
are exceeded. Messages are considered in-queue from the point they are accepted by produce() until their corresponding delivery report callback/event returns. It is thus a requirement to call poll() (or equiv.) from a separate thread when RK_MSG_BLOCK is used. See WARNING on RK_MSG_BLOCK
above. RK_MSG_FREE - rdkafka will free(3) payload
when it is done with it. RK_MSG_COPY - the payload
data will be copied and the payload
pointer will not be used by rdkafka after the call returns.
NOTE: RK_MSG_FREE and RK_MSG_COPY are mutually exclusive.
If the function returns an error code and RK_MSG_FREE was specified, then the memory associated with the payload is still the caller's responsibility.
payload
is the message payload of size len
bytes.
key
is an optional message key, if non-NULL it will be passed to the topic partitioner as well as be sent with the message to the broker and passed on to the consumer.
msg_opaque
is an optional application-provided per-message opaque pointer that will provided in the delivery report callback (dr_cb
) for referencing this message.
queue.buffering.max.message
messages.max.bytes
partition
is unknown in the Kafka cluster.
|
pure virtual |
Wait until all outstanding produce requests, et.al, are completed. This should typically be done prior to destroying a producer instance to make sure all queued and in-flight produce requests are completed before terminating.
timeout_ms
was reached before all outstanding requests were completed, else ERR_NO_ERROR