30 #ifndef _RDKAFKACPP_H_ 
   31 #define _RDKAFKACPP_H_ 
   57 #include <sys/types.h> 
   64 #ifndef _SSIZE_T_DEFINED 
   65 #define _SSIZE_T_DEFINED 
   66 typedef SSIZE_T ssize_t;
 
   70 #ifdef LIBRDKAFKA_STATICLIB 
   73 #ifdef LIBRDKAFKACPP_EXPORTS 
   74 #define RD_EXPORT __declspec(dllexport) 
   76 #define RD_EXPORT __declspec(dllimport) 
   88 struct rd_kafka_topic_s;
 
   89 struct rd_kafka_message_s;
 
   90 struct rd_kafka_conf_s;
 
   91 struct rd_kafka_topic_conf_s;
 
  115 #define RD_KAFKA_VERSION 0x020601ff 
  368 #define ERR_GROUP_LOAD_IN_PROGRESS ERR_COORDINATOR_LOAD_IN_PROGRESS 
  372 #define ERR_GROUP_COORDINATOR_NOT_AVAILABLE ERR_COORDINATOR_NOT_AVAILABLE 
  376 #define ERR_NOT_COORDINATOR_FOR_GROUP ERR_NOT_COORDINATOR 
  594 class TopicPartition;
 
  638   virtual std::string 
name() 
const = 0;
 
  643   virtual std::string 
str() 
const = 0;
 
  751       const std::string &oauthbearer_config) = 0;
 
  785                                  const std::string *key,
 
  786                                  int32_t partition_cnt,
 
  787                                  void *msg_opaque) = 0;
 
  810                                  int32_t partition_cnt,
 
  811                                  void *msg_opaque) = 0;
 
  856     EVENT_SEVERITY_EMERG    = 0,
 
  857     EVENT_SEVERITY_ALERT    = 1,
 
  858     EVENT_SEVERITY_CRITICAL = 2,
 
  859     EVENT_SEVERITY_ERROR    = 3,
 
  860     EVENT_SEVERITY_WARNING  = 4,
 
  861     EVENT_SEVERITY_NOTICE   = 5,
 
  862     EVENT_SEVERITY_INFO     = 6,
 
  863     EVENT_SEVERITY_DEBUG    = 7
 
  895   virtual std::string 
fac() 
const = 0;
 
  905   virtual std::string 
str() 
const = 0;
 
 1030                             std::vector<TopicPartition *> &partitions) = 0;
 
 1058                                 std::vector<TopicPartition *> &offsets) = 0;
 
 1115                                   std::string &errstr) = 0;
 
 1141   virtual int socket_cb(
int domain, 
int type, 
int protocol) = 0;
 
 1165   virtual int open_cb(
const std::string &path, 
int flags, 
int mode) = 0;
 
 1233                                const std::string &value,
 
 1234                                std::string &errstr) = 0;
 
 1239                                std::string &errstr) = 0;
 
 1243       const std::string &name,
 
 1245       std::string &errstr) = 0;
 
 1250                                std::string &errstr) = 0;
 
 1260                                const Conf *topic_conf,
 
 1261                                std::string &errstr) = 0;
 
 1266                                std::string &errstr) = 0;
 
 1271                                std::string &errstr) = 0;
 
 1276                                std::string &errstr) = 0;
 
 1281                                std::string &errstr) = 0;
 
 1286                                std::string &errstr) = 0;
 
 1291                                std::string &errstr) = 0;
 
 1299                                std::string &errstr) = 0;
 
 1345                                         std::string &errstr) = 0;
 
 1360                                std::string &value) 
const = 0;
 
 1415   virtual std::list<std::string> *
dump() = 0;
 
 1420                                std::string &errstr) = 0;
 
 1472                                                     std::string &errstr) = 0;
 
 1498                                              std::string &errstr) = 0;
 
 1519   virtual std::string 
name() 
const = 0;
 
 1556   virtual int poll(
int timeout_ms) = 0;
 
 1582                              const Topic *only_rkt,
 
 1584                              int timeout_ms) = 0;
 
 1623                                             int timeout_ms) = 0;
 
 1670                                     int timeout_ms) = 0;
 
 1829       const std::string &token_value,
 
 1830       int64_t md_lifetime_ms,
 
 1831       const std::string &md_principal_name,
 
 1832       const std::list<std::string> &extensions,
 
 1833       std::string &errstr) = 0;
 
 1853       const std::string &errstr) = 0;
 
 1921                                       const std::string &password) = 0;
 
 1968   static void destroy(std::vector<TopicPartition *> &partitions);
 
 1971   virtual const std::string &
topic() 
const = 0;
 
 2031                        const std::string &topic_str,
 
 2033                        std::string &errstr);
 
 2035   virtual ~
Topic() = 0;
 
 2039   virtual std::string 
name() 
const = 0;
 
 2082   virtual struct rd_kafka_topic_s *
c_ptr() = 0;
 
 2113     MSG_TIMESTAMP_LOG_APPEND_TIME 
 
 2154     Header(
const std::string &key, 
const void *value, 
size_t value_size) :
 
 2155         key_(key), err_(
ERR_NO_ERROR), value_size_(value_size) {
 
 2156       value_ = copy_value(value, value_size);
 
 2176         key_(key), err_(err), value_(NULL), value_size_(value_size) {
 
 2178         value_ = copy_value(value, value_size);
 
 2187         key_(other.key_), err_(other.err_), value_size_(other.value_size_) {
 
 2188       value_ = copy_value(other.value_, value_size_);
 
 2197       if (&other == 
this) {
 
 2203       value_size_ = other.value_size_;
 
 2208       value_ = copy_value(other.value_, value_size_);
 
 2231       return static_cast<const char *
>(value_);
 
 2245     char *copy_value(
const void *value, 
size_t value_size) {
 
 2249       char *dest = (
char *)
mem_malloc(value_size + 1);
 
 2250       memcpy(dest, (
const char *)value, value_size);
 
 2251       dest[value_size] = 
'\0';
 
 2260     void *
operator new(size_t); 
 
 2291                         size_t value_size) = 0;
 
 2303   virtual ErrorCode add(
const std::string &key, 
const std::string &value) = 0;
 
 2334   virtual std::vector<Header> 
get(
const std::string &key) 
const = 0;
 
 2381     MSG_STATUS_NOT_PERSISTED = 0,
 
 2386     MSG_STATUS_POSSIBLY_PERSISTED = 1,
 
 2391     MSG_STATUS_PERSISTED = 2,
 
 2422   virtual size_t len() 
const = 0;
 
 2425   virtual const std::string *
key() 
const = 0;
 
 2464   virtual struct rd_kafka_message_s *
c_ptr() = 0;
 
 2578   virtual int poll(
int timeout_ms) = 0;
 
 2580   virtual ~
Queue() = 0;
 
 2656       std::vector<RdKafka::TopicPartition *> &partitions) = 0;
 
 2805       const std::vector<TopicPartition *> &offsets) = 0;
 
 2843                               int timeout_ms) = 0;
 
 2987       const std::vector<TopicPartition *> &partitions) = 0;
 
 3006       const std::vector<TopicPartition *> &partitions) = 0;
 
 3127                          int timeout_ms) = 0;
 
 3285     MSG_FREE = RK_MSG_FREE,
 
 3286     MSG_COPY = RK_MSG_COPY
 
 3352                             const std::string *key,
 
 3353                             void *msg_opaque) = 0;
 
 3366                             void *msg_opaque) = 0;
 
 3382                             void *msg_opaque) = 0;
 
 3400                             void *msg_opaque) = 0;
 
 3409                             const std::vector<char> *payload,
 
 3410                             const std::vector<char> *key,
 
 3411                             void *msg_opaque) = 0;
 
 3467     PURGE_INFLIGHT = 0x2, 
 
 3474     PURGE_NON_BLOCKING = 0x4 
 
 3568       const std::vector<TopicPartition *> &offsets,
 
 3570       int timeout_ms) = 0;
 
 3653   virtual int32_t 
id() 
const = 0;
 
 3656   virtual std::string 
host() 
const = 0;
 
 3683   virtual int32_t 
id() 
const = 0;
 
 3692   virtual const std::vector<int32_t> *
replicas() 
const = 0;
 
 3697   virtual const std::vector<int32_t> *
isrs() 
const = 0;
 
Configuration interface.
Definition: rdkafkacpp.h:1191
 
virtual Conf::ConfResult get(OpenCb *&open_cb) const =0
Query single configuration value.
 
ConfType
Configuration object type.
Definition: rdkafkacpp.h:1196
 
@ CONF_GLOBAL
Definition: rdkafkacpp.h:1197
 
virtual Conf::ConfResult set(const std::string &name, EventCb *event_cb, std::string &errstr)=0
Use with name = "event_cb".
 
virtual struct rd_kafka_topic_conf_s * c_ptr_topic()=0
Returns the underlying librdkafka C rd_kafka_topic_conf_t handle.
 
virtual Conf::ConfResult set(const std::string &name, OffsetCommitCb *offset_commit_cb, std::string &errstr)=0
Use with name = "offset_commit_cb".
 
ConfResult
RdKafka::Conf::Set() result code.
Definition: rdkafkacpp.h:1204
 
virtual Conf::ConfResult set(const std::string &name, PartitionerKeyPointerCb *partitioner_kp_cb, std::string &errstr)=0
Use with name = "partitioner_key_pointer_cb".
 
virtual Conf::ConfResult get(EventCb *&event_cb) const =0
Query single configuration value.
 
virtual Conf::ConfResult set_engine_callback_data(void *value, std::string &errstr)=0
Set callback_data for ssl engine.
 
static Conf * create(ConfType type)
Create configuration object.
 
virtual Conf::ConfResult set(const std::string &name, DeliveryReportCb *dr_cb, std::string &errstr)=0
Use with name = "dr_cb".
 
virtual Conf::ConfResult get(SocketCb *&socket_cb) const =0
Query single configuration value.
 
virtual Conf::ConfResult set(const std::string &name, const Conf *topic_conf, std::string &errstr)=0
Use with name = "default_topic_conf".
 
virtual Conf::ConfResult get(OAuthBearerTokenRefreshCb *&oauthbearer_token_refresh_cb) const =0
Query single configuration value.
 
virtual Conf::ConfResult set(const std::string &name, RebalanceCb *rebalance_cb, std::string &errstr)=0
Use with name = "rebalance_cb".
 
virtual Conf::ConfResult set(const std::string &name, ConsumeCb *consume_cb, std::string &errstr)=0
Use with name = "consume_cb".
 
virtual Conf::ConfResult get(RebalanceCb *&rebalance_cb) const =0
Query single configuration value.
 
virtual Conf::ConfResult get(PartitionerCb *&partitioner_cb) const =0
Query single configuration value.
 
virtual Conf::ConfResult get(OffsetCommitCb *&offset_commit_cb) const =0
Query single configuration value.
 
virtual Conf::ConfResult get(PartitionerKeyPointerCb *&partitioner_kp_cb) const =0
Query single configuration value.
 
virtual std::list< std::string > * dump()=0
Dump configuration names and values to list containing name,value tuples.
 
virtual Conf::ConfResult set_ssl_cert(RdKafka::CertificateType cert_type, RdKafka::CertificateEncoding cert_enc, const void *buffer, size_t size, std::string &errstr)=0
Set certificate/key cert_type from the cert_enc encoded memory at buffer of size bytes.
 
virtual Conf::ConfResult set(const std::string &name, SocketCb *socket_cb, std::string &errstr)=0
Use with name = "socket_cb".
 
virtual Conf::ConfResult set(const std::string &name, OpenCb *open_cb, std::string &errstr)=0
Use with name = "open_cb".
 
virtual Conf::ConfResult enable_sasl_queue(bool enable, std::string &errstr)=0
Enable/disable creation of a queue specific to SASL events and callbacks.
 
virtual Conf::ConfResult set(const std::string &name, SslCertificateVerifyCb *ssl_cert_verify_cb, std::string &errstr)=0
Use with name = "ssl_cert_verify_cb".
 
virtual Conf::ConfResult set(const std::string &name, OAuthBearerTokenRefreshCb *oauthbearer_token_refresh_cb, std::string &errstr)=0
Use with name = "oauthbearer_token_refresh_cb".
 
virtual Conf::ConfResult set(const std::string &name, const std::string &value, std::string &errstr)=0
Set configuration property name to value value.
 
virtual Conf::ConfResult get(SslCertificateVerifyCb *&ssl_cert_verify_cb) const =0
Use with name = "ssl_cert_verify_cb".
 
virtual struct rd_kafka_conf_s * c_ptr_global()=0
Returns the underlying librdkafka C rd_kafka_conf_t handle.
 
virtual Conf::ConfResult set(const std::string &name, PartitionerCb *partitioner_cb, std::string &errstr)=0
Use with name = "partitioner_cb".
 
virtual Conf::ConfResult get(DeliveryReportCb *&dr_cb) const =0
Query single configuration value.
 
virtual Conf::ConfResult get(const std::string &name, std::string &value) const =0
Query single configuration value.
 
Consume callback class.
Definition: rdkafkacpp.h:939
 
virtual void consume_cb(Message &message, void *opaque)=0
The consume callback is used with RdKafka::Consumer::consume_callback() methods and will be called fo...
 
Simple Consumer (legacy)
Definition: rdkafkacpp.h:3050
 
virtual ErrorCode start(Topic *topic, int32_t partition, int64_t offset, Queue *queue)=0
Start consuming messages for topic and partition on queue queue.
 
virtual Message * consume(Topic *topic, int32_t partition, int timeout_ms)=0
Consume a single message from topic and partition.
 
virtual ErrorCode stop(Topic *topic, int32_t partition)=0
Stop consuming messages for topic and partition, purging all messages currently in the local queue.
 
virtual int consume_callback(Topic *topic, int32_t partition, int timeout_ms, ConsumeCb *consume_cb, void *opaque)=0
Consumes messages from topic and partition, calling the provided callback for each consumed messsage.
 
virtual Message * consume(Queue *queue, int timeout_ms)=0
Consume a single message from the specified queue.
 
static int64_t OffsetTail(int64_t offset)
Converts an offset into the logical offset from the tail of a topic.
 
virtual int consume_callback(Queue *queue, int timeout_ms, RdKafka::ConsumeCb *consume_cb, void *opaque)=0
Consumes messages from queue, calling the provided callback for each consumed messsage.
 
static Consumer * create(const Conf *conf, std::string &errstr)
Creates a new Kafka consumer handle.
 
virtual ErrorCode seek(Topic *topic, int32_t partition, int64_t offset, int timeout_ms)=0
Seek consumer for topic+partition to offset which is either an absolute or logical offset.
 
virtual ErrorCode start(Topic *topic, int32_t partition, int64_t offset)=0
Start consuming messages for topic and partition at offset offset which may either be a proper offset...
 
Delivery Report callback class.
Definition: rdkafkacpp.h:701
 
virtual void dr_cb(Message &message)=0
Delivery report callback.
 
The Error class is used as a return value from APIs to propagate an error. The error consists of an e...
Definition: rdkafkacpp.h:616
 
virtual ErrorCode code() const =0
 
virtual std::string name() const =0
 
virtual std::string str() const =0
 
virtual bool is_fatal() const =0
 
static Error * create(ErrorCode code, const std::string *errstr)
Create error object.
 
virtual bool is_retriable() const =0
 
virtual bool txn_requires_abort() const =0
 
Event callback class.
Definition: rdkafkacpp.h:827
 
virtual void event_cb(Event &event)=0
Event callback.
 
Event object class as passed to the EventCb callback.
Definition: rdkafkacpp.h:844
 
virtual int throttle_time() const =0
 
virtual std::string broker_name() const =0
 
virtual bool fatal() const =0
 
virtual std::string fac() const =0
 
virtual ErrorCode err() const =0
 
virtual Type type() const =0
 
virtual std::string str() const =0
 
Severity
EVENT_LOG severities (conforms to syslog(3) severities)
Definition: rdkafkacpp.h:855
 
Type
Event type.
Definition: rdkafkacpp.h:847
 
@ EVENT_ERROR
Definition: rdkafkacpp.h:848
 
@ EVENT_STATS
Definition: rdkafkacpp.h:849
 
@ EVENT_LOG
Definition: rdkafkacpp.h:850
 
virtual Severity severity() const =0
 
virtual int broker_id() const =0
 
Base handle, super class for specific clients.
Definition: rdkafkacpp.h:1513
 
virtual Queue * get_partition_queue(const TopicPartition *partition)=0
Retrieve queue for a given partition.
 
virtual ErrorCode offsetsForTimes(std::vector< TopicPartition * > &offsets, int timeout_ms)=0
Look up the offsets for the given partitions by timestamp.
 
virtual ErrorCode oauthbearer_set_token(const std::string &token_value, int64_t md_lifetime_ms, const std::string &md_principal_name, const std::list< std::string > &extensions, std::string &errstr)=0
Set SASL/OAUTHBEARER token and metadata.
 
virtual ErrorCode resume(std::vector< TopicPartition * > &partitions)=0
Resume producing or consumption for the provided list of partitions.
 
virtual Error * sasl_background_callbacks_enable()=0
Enable SASL OAUTHBEARER refresh callbacks on the librdkafka background thread.
 
virtual std::string memberid() const =0
Returns the client's broker-assigned group member id.
 
virtual void mem_free(void *ptr)=0
Free pointer returned by librdkafka.
 
virtual int outq_len()=0
Returns the current out queue length.
 
virtual void * mem_malloc(size_t size)=0
Allocate memory using the same allocator librdkafka uses.
 
virtual void yield()=0
Cancels the current callback dispatcher (Handle::poll(), KafkaConsumer::consume(),...
 
virtual ErrorCode pause(std::vector< TopicPartition * > &partitions)=0
Pause producing or consumption for the provided list of partitions.
 
virtual int32_t controllerid(int timeout_ms)=0
Returns the current ControllerId (controller broker id) as reported in broker metadata.
 
virtual struct rd_kafka_s * c_ptr()=0
Returns the underlying librdkafka C rd_kafka_t handle.
 
virtual Queue * get_sasl_queue()=0
 
virtual int poll(int timeout_ms)=0
Polls the provided kafka handle for events.
 
virtual ErrorCode oauthbearer_set_token_failure(const std::string &errstr)=0
SASL/OAUTHBEARER token refresh failure indicator.
 
virtual ErrorCode query_watermark_offsets(const std::string &topic, int32_t partition, int64_t *low, int64_t *high, int timeout_ms)=0
Query broker for low (oldest/beginning) and high (newest/end) offsets for partition.
 
virtual ErrorCode get_watermark_offsets(const std::string &topic, int32_t partition, int64_t *low, int64_t *high)=0
Get last known low (oldest/beginning) and high (newest/end) offsets for partition.
 
virtual Error * sasl_set_credentials(const std::string &username, const std::string &password)=0
Sets SASL credentials used for SASL PLAIN and SCRAM mechanisms by this Kafka client.
 
virtual ErrorCode fatal_error(std::string &errstr) const =0
Returns the first fatal error set on this client instance, or ERR_NO_ERROR if no fatal error has occu...
 
virtual ErrorCode metadata(bool all_topics, const Topic *only_rkt, Metadata **metadatap, int timeout_ms)=0
Request Metadata from broker.
 
virtual ErrorCode set_log_queue(Queue *queue)=0
Forward librdkafka logs (and debug) to the specified queue for serving with one of the ....
 
virtual std::string name() const =0
 
virtual Queue * get_background_queue()=0
 
virtual std::string clusterid(int timeout_ms)=0
Returns the ClusterId as reported in broker metadata.
 
High-level KafkaConsumer (for brokers 0.9 and later)
Definition: rdkafkacpp.h:2635
 
virtual ErrorCode commitSync()=0
Commit offsets for the current assignment.
 
static KafkaConsumer * create(const Conf *conf, std::string &errstr)
Creates a KafkaConsumer.
 
virtual ErrorCode unassign()=0
Stop consumption and remove the current assignment.
 
virtual ErrorCode commitAsync(Message *message)=0
Commit offset for a single topic+partition based on message.
 
virtual Error * incremental_assign(const std::vector< TopicPartition * > &partitions)=0
Incrementally add partitions to the current assignment.
 
virtual ErrorCode subscription(std::vector< std::string > &topics)=0
Returns the current subscription as set by RdKafka::KafkaConsumer::subscribe()
 
virtual ErrorCode position(std::vector< TopicPartition * > &partitions)=0
Retrieve current positions (offsets) for topics+partitions.
 
virtual std::string rebalance_protocol()=0
The rebalance protocol currently in use. This will be "NONE" if the consumer has not (yet) joined a g...
 
virtual ErrorCode close()=0
Close and shut down the consumer.
 
virtual ErrorCode commitAsync(const std::vector< TopicPartition * > &offsets)=0
Commit offset for the provided list of partitions.
 
virtual ErrorCode commitSync(OffsetCommitCb *offset_commit_cb)=0
Commit offsets for the current assignment.
 
virtual bool assignment_lost()=0
Check whether the consumer considers the current assignment to have been lost involuntarily....
 
virtual ErrorCode commitAsync()=0
Asynchronous version of RdKafka::KafkaConsumer::CommitSync()
 
virtual ErrorCode subscribe(const std::vector< std::string > &topics)=0
Update the subscription set to topics.
 
virtual ConsumerGroupMetadata * groupMetadata()=0
 
virtual Error * close(Queue *queue)=0
Close and shut down the consumer.
 
virtual Message * consume(int timeout_ms)=0
Consume message or get error event, triggers callbacks.
 
virtual ErrorCode commitSync(std::vector< TopicPartition * > &offsets)=0
Commit offsets for the provided list of partitions.
 
virtual ErrorCode unsubscribe()=0
Unsubscribe from the current subscription set.
 
virtual ErrorCode commitSync(std::vector< TopicPartition * > &offsets, OffsetCommitCb *offset_commit_cb)=0
Commit offsets for the provided list of partitions.
 
virtual ErrorCode committed(std::vector< TopicPartition * > &partitions, int timeout_ms)=0
Retrieve committed offsets for topics+partitions.
 
virtual ErrorCode seek(const TopicPartition &partition, int timeout_ms)=0
Seek consumer for topic+partition to offset which is either an absolute or logical offset.
 
virtual ErrorCode offsets_store(std::vector< TopicPartition * > &offsets)=0
Store offset offset for topic partition partition. The offset will be committed (written) to the offs...
 
virtual ErrorCode commitSync(Message *message)=0
Commit offset for a single topic+partition based on message.
 
virtual Error * incremental_unassign(const std::vector< TopicPartition * > &partitions)=0
Incrementally remove partitions from the current assignment.
 
virtual ErrorCode assign(const std::vector< TopicPartition * > &partitions)=0
Update the assignment set to partitions.
 
virtual ErrorCode assignment(std::vector< RdKafka::TopicPartition * > &partitions)=0
Returns the current partition assignment as set by RdKafka::KafkaConsumer::assign()
 
Message timestamp object.
Definition: rdkafkacpp.h:2107
 
int64_t timestamp
Definition: rdkafkacpp.h:2117
 
MessageTimestampType
Definition: rdkafkacpp.h:2110
 
@ MSG_TIMESTAMP_CREATE_TIME
Definition: rdkafkacpp.h:2112
 
@ MSG_TIMESTAMP_NOT_AVAILABLE
Definition: rdkafkacpp.h:2111
 
MessageTimestampType type
Definition: rdkafkacpp.h:2116
 
Message object.
Definition: rdkafkacpp.h:2373
 
virtual std::string topic_name() const =0
 
virtual Status status() const =0
Returns the message's persistence status in the topic log.
 
virtual Error * offset_store()=0
Store offset +1 for the consumed message.
 
virtual const std::string * key() const =0
 
virtual std::string errstr() const =0
Accessor functions*.
 
virtual int32_t broker_id() const =0
 
virtual const void * key_pointer() const =0
 
virtual RdKafka::Headers * headers()=0
 
virtual int64_t offset() const =0
 
virtual size_t len() const =0
 
virtual struct rd_kafka_message_s * c_ptr()=0
Returns the underlying librdkafka C rd_kafka_message_t handle.
 
virtual Topic * topic() const =0
 
virtual MessageTimestamp timestamp() const =0
 
virtual int64_t latency() const =0
 
virtual ErrorCode err() const =0
 
virtual int32_t leader_epoch() const =0
 
Status
Message persistence status can be used by the application to find out if a produced message was persi...
Definition: rdkafkacpp.h:2377
 
virtual size_t key_len() const =0
 
virtual RdKafka::Headers * headers(RdKafka::ErrorCode *err)=0
 
virtual void * msg_opaque() const =0
 
virtual void * payload() const =0
 
virtual int32_t partition() const =0
 
SASL/OAUTHBEARER token refresh callback class.
Definition: rdkafkacpp.h:740
 
virtual void oauthbearer_token_refresh_cb(RdKafka::Handle *handle, const std::string &oauthbearer_config)=0
SASL/OAUTHBEARER token refresh callback class.
 
Offset Commit callback class.
Definition: rdkafkacpp.h:1040
 
virtual void offset_commit_cb(RdKafka::ErrorCode err, std::vector< TopicPartition * > &offsets)=0
Set offset commit callback for use with consumer groups.
 
Portability: OpenCb callback class
Definition: rdkafkacpp.h:1152
 
virtual int open_cb(const std::string &path, int flags, int mode)=0
Open callback The open callback is responsible for opening the file specified by pathname,...
 
Partitioner callback class.
Definition: rdkafkacpp.h:765
 
virtual int32_t partitioner_cb(const Topic *topic, const std::string *key, int32_t partition_cnt, void *msg_opaque)=0
Partitioner callback.
 
Variant partitioner with key pointer.
Definition: rdkafkacpp.h:797
 
virtual int32_t partitioner_cb(const Topic *topic, const void *key, size_t key_len, int32_t partition_cnt, void *msg_opaque)=0
Variant partitioner callback that gets key as pointer and length instead of as a const std::string *.
 
Producer.
Definition: rdkafkacpp.h:3232
 
virtual Error * abort_transaction(int timeout_ms)=0
Aborts the ongoing transaction.
 
virtual Error * init_transactions(int timeout_ms)=0
Initialize transactions for the producer instance.
 
virtual ErrorCode produce(const std::string topic_name, int32_t partition, int msgflags, void *payload, size_t len, const void *key, size_t key_len, int64_t timestamp, void *msg_opaque)=0
produce() variant that takes topic as a string (no need for creating a Topic object),...
 
virtual Error * commit_transaction(int timeout_ms)=0
Commit the current transaction as started with begin_transaction().
 
virtual ErrorCode produce(Topic *topic, int32_t partition, int msgflags, void *payload, size_t len, const void *key, size_t key_len, void *msg_opaque)=0
Variant produce() that passes the key as a pointer and length instead of as a const std::string *.
 
virtual Error * send_offsets_to_transaction(const std::vector< TopicPartition * > &offsets, const ConsumerGroupMetadata *group_metadata, int timeout_ms)=0
Sends a list of topic partition offsets to the consumer group coordinator for group_metadata,...
 
virtual ErrorCode purge(int purge_flags)=0
Purge messages currently handled by the producer instance.
 
virtual ErrorCode produce(const std::string topic_name, int32_t partition, int msgflags, void *payload, size_t len, const void *key, size_t key_len, int64_t timestamp, RdKafka::Headers *headers, void *msg_opaque)=0
produce() variant that that allows for Header support on produce Otherwise identical to produce() abo...
 
virtual ErrorCode produce(Topic *topic, int32_t partition, int msgflags, void *payload, size_t len, const std::string *key, void *msg_opaque)=0
Produce and send a single message to broker.
 
virtual ErrorCode produce(Topic *topic, int32_t partition, const std::vector< char > *payload, const std::vector< char > *key, void *msg_opaque)=0
Variant produce() that accepts vectors for key and payload. The vector data will be copied.
 
virtual Error * begin_transaction()=0
init_transactions() must have been called successfully (once) before this function is called.
 
static Producer * create(const Conf *conf, std::string &errstr)
Creates a new Kafka producer handle.
 
virtual ErrorCode flush(int timeout_ms)=0
Wait until all outstanding produce requests, et.al, are completed. This should typically be done prio...
 
Queue interface.
Definition: rdkafkacpp.h:2538
 
static Queue * create(Handle *handle)
Create Queue object.
 
virtual ErrorCode forward(Queue *dst)=0
Forward/re-route queue to dst. If dst is NULL, the forwarding is removed.
 
virtual int poll(int timeout_ms)=0
Poll queue, serving any enqueued callbacks.
 
virtual void io_event_enable(int fd, const void *payload, size_t size)=0
Enable IO event triggering for queue.
 
virtual Message * consume(int timeout_ms)=0
Consume message or get error event from the queue.
 
KafkaConsumer: Rebalance callback class
Definition: rdkafkacpp.h:958
 
virtual void rebalance_cb(RdKafka::KafkaConsumer *consumer, RdKafka::ErrorCode err, std::vector< TopicPartition * > &partitions)=0
Group rebalance callback for use with RdKafka::KafkaConsumer.
 
Portability: SocketCb callback class
Definition: rdkafkacpp.h:1126
 
virtual int socket_cb(int domain, int type, int protocol)=0
Socket callback.
 
SSL broker certificate verification class.
Definition: rdkafkacpp.h:1071
 
virtual bool ssl_cert_verify_cb(const std::string &broker_name, int32_t broker_id, int *x509_error, int depth, const char *buf, size_t size, std::string &errstr)=0
SSL broker certificate verification callback.
 
Topic+Partition.
Definition: rdkafkacpp.h:1943
 
static void destroy(std::vector< TopicPartition * > &partitions)
Destroy/delete the TopicPartitions in partitions and clear the vector.
 
virtual void set_leader_epoch(int32_t leader_epoch)=0
Set partition leader epoch.
 
virtual const std::string & topic() const =0
 
virtual std::vector< unsigned char > get_metadata()=0
Get partition metadata.
 
static TopicPartition * create(const std::string &topic, int partition, int64_t offset)
Create topic+partition object for topic and partition with offset offset.
 
virtual void set_offset(int64_t offset)=0
Set offset.
 
virtual void set_metadata(std::vector< unsigned char > &metadata)=0
Set partition metadata.
 
virtual int partition() const =0
 
static TopicPartition * create(const std::string &topic, int partition)
Create topic+partition object for topic and partition.
 
virtual int64_t offset() const =0
 
virtual ErrorCode err() const =0
 
virtual int32_t get_leader_epoch()=0
Get partition leader epoch, or -1 if not known or relevant.
 
Topic handle.
Definition: rdkafkacpp.h:2004
 
static const int64_t OFFSET_INVALID
Definition: rdkafkacpp.h:2018
 
virtual std::string name() const =0
 
virtual ErrorCode offset_store(int32_t partition, int64_t offset)=0
Store offset offset + 1 for topic partition partition. The offset will be committed (written) to the ...
 
static const int64_t OFFSET_BEGINNING
Special offsets.
Definition: rdkafkacpp.h:2015
 
static Topic * create(Handle *base, const std::string &topic_str, const Conf *conf, std::string &errstr)
Creates a new topic handle for topic named topic_str.
 
virtual bool partition_available(int32_t partition) const =0
 
static const int64_t OFFSET_STORED
Definition: rdkafkacpp.h:2017
 
virtual struct rd_kafka_topic_s * c_ptr()=0
Returns the underlying librdkafka C rd_kafka_topic_t handle.
 
static const int32_t PARTITION_UA
Unassigned partition.
Definition: rdkafkacpp.h:2012
 
static const int64_t OFFSET_END
Definition: rdkafkacpp.h:2016
 
RD_EXPORT std::string get_debug_contexts()
Returns a CSV list of the supported debug contexts for use with Conf::Set("debug",...
 
RD_EXPORT std::string version_str()
Returns the librdkafka version as string.
 
CertificateEncoding
SSL certificate encoding.
Definition: rdkafkacpp.h:574
 
@ CERT_ENC_PKCS12
Definition: rdkafkacpp.h:575
 
@ CERT_ENC_DER
Definition: rdkafkacpp.h:576
 
@ CERT_ENC_PEM
Definition: rdkafkacpp.h:577
 
RD_EXPORT int wait_destroyed(int timeout_ms)
Wait for all rd_kafka_t objects to be destroyed.
 
RD_EXPORT std::string err2str(RdKafka::ErrorCode err)
Returns a human readable representation of a kafka error.
 
ErrorCode
Error codes.
Definition: rdkafkacpp.h:201
 
@ ERR__APPLICATION
Definition: rdkafkacpp.h:321
 
@ ERR_PRINCIPAL_DESERIALIZATION_FAILURE
Definition: rdkafkacpp.h:547
 
@ ERR_NOT_LEADER_FOR_PARTITION
Definition: rdkafkacpp.h:350
 
@ ERR_NON_EMPTY_GROUP
Definition: rdkafkacpp.h:485
 
@ ERR__DESTROY
Definition: rdkafkacpp.h:210
 
@ ERR_INVALID_REQUEST
Definition: rdkafkacpp.h:428
 
@ ERR_GROUP_ID_NOT_FOUND
Definition: rdkafkacpp.h:487
 
@ ERR_INVALID_SESSION_TIMEOUT
Definition: rdkafkacpp.h:396
 
@ ERR__UNDERFLOW
Definition: rdkafkacpp.h:297
 
@ ERR__ASSIGN_PARTITIONS
Definition: rdkafkacpp.h:257
 
@ ERR__UNKNOWN_BROKER
Definition: rdkafkacpp.h:315
 
@ ERR_CLUSTER_AUTHORIZATION_FAILED
Definition: rdkafkacpp.h:406
 
@ ERR_UNKNOWN
Definition: rdkafkacpp.h:336
 
@ ERR_ELIGIBLE_LEADERS_NOT_AVAILABLE
Definition: rdkafkacpp.h:516
 
@ ERR__REVOKE_PARTITIONS
Definition: rdkafkacpp.h:259
 
@ ERR__FATAL
Definition: rdkafkacpp.h:307
 
@ ERR_SASL_AUTHENTICATION_FAILED
Definition: rdkafkacpp.h:465
 
@ ERR_FEATURE_UPDATE_FAILED
Definition: rdkafkacpp.h:545
 
@ ERR__CRIT_SYS_RESOURCE
Definition: rdkafkacpp.h:216
 
@ ERR__LOG_TRUNCATION
Definition: rdkafkacpp.h:329
 
@ ERR__ASSIGNMENT_LOST
Definition: rdkafkacpp.h:323
 
@ ERR_STALE_BROKER_EPOCH
Definition: rdkafkacpp.h:503
 
@ ERR_INVALID_TXN_STATE
Definition: rdkafkacpp.h:440
 
@ ERR__UNKNOWN_PARTITION
Definition: rdkafkacpp.h:227
 
@ ERR__WAIT_CACHE
Definition: rdkafkacpp.h:279
 
@ ERR_UNSTABLE_OFFSET_COMMIT
Definition: rdkafkacpp.h:527
 
@ ERR__NODE_UPDATE
Definition: rdkafkacpp.h:243
 
@ ERR_NO_ERROR
Definition: rdkafkacpp.h:338
 
@ ERR_INCONSISTENT_GROUP_PROTOCOL
Definition: rdkafkacpp.h:390
 
@ ERR__UNKNOWN_GROUP
Definition: rdkafkacpp.h:249
 
@ ERR_INVALID_PARTITIONS
Definition: rdkafkacpp.h:418
 
@ ERR_DELEGATION_TOKEN_NOT_FOUND
Definition: rdkafkacpp.h:473
 
@ ERR__NOOP
Definition: rdkafkacpp.h:325
 
@ ERR_NO_REASSIGNMENT_IN_PROGRESS
Definition: rdkafkacpp.h:520
 
@ ERR__NOT_IMPLEMENTED
Definition: rdkafkacpp.h:267
 
@ ERR__MSG_TIMED_OUT
Definition: rdkafkacpp.h:220
 
@ ERR_INVALID_REPLICATION_FACTOR
Definition: rdkafkacpp.h:420
 
@ ERR__READ_ONLY
Definition: rdkafkacpp.h:293
 
@ ERR_UNKNOWN_MEMBER_ID
Definition: rdkafkacpp.h:394
 
@ ERR_OUT_OF_ORDER_SEQUENCE_NUMBER
Definition: rdkafkacpp.h:434
 
@ ERR_INVALID_GROUP_ID
Definition: rdkafkacpp.h:392
 
@ ERR__RETRY
Definition: rdkafkacpp.h:301
 
@ ERR__TIMED_OUT_QUEUE
Definition: rdkafkacpp.h:275
 
@ ERR__NOT_CONFIGURED
Definition: rdkafkacpp.h:317
 
@ ERR_DUPLICATE_RESOURCE
Definition: rdkafkacpp.h:536
 
@ ERR_LEADER_NOT_AVAILABLE
Definition: rdkafkacpp.h:348
 
@ ERR_INVALID_FETCH_SESSION_EPOCH
Definition: rdkafkacpp.h:491
 
@ ERR_KAFKA_STORAGE_ERROR
Definition: rdkafkacpp.h:461
 
@ ERR_ILLEGAL_GENERATION
Definition: rdkafkacpp.h:388
 
@ ERR_TOPIC_ALREADY_EXISTS
Definition: rdkafkacpp.h:416
 
@ ERR__AUTHENTICATION
Definition: rdkafkacpp.h:269
 
@ ERR_TOPIC_EXCEPTION
Definition: rdkafkacpp.h:378
 
@ ERR__RESOLVE
Definition: rdkafkacpp.h:218
 
@ ERR_INVALID_REQUIRED_ACKS
Definition: rdkafkacpp.h:386
 
@ ERR_FETCH_SESSION_ID_NOT_FOUND
Definition: rdkafkacpp.h:489
 
@ ERR__SSL
Definition: rdkafkacpp.h:245
 
@ ERR_COORDINATOR_NOT_AVAILABLE
Definition: rdkafkacpp.h:370
 
@ ERR__CONFLICT
Definition: rdkafkacpp.h:261
 
@ ERR_GROUP_SUBSCRIBED_TO_TOPIC
Definition: rdkafkacpp.h:523
 
@ ERR__STATE
Definition: rdkafkacpp.h:263
 
@ ERR__KEY_SERIALIZATION
Definition: rdkafkacpp.h:283
 
@ ERR_INVALID_MSG_SIZE
Definition: rdkafkacpp.h:346
 
@ ERR_DELEGATION_TOKEN_OWNER_MISMATCH
Definition: rdkafkacpp.h:475
 
@ ERR_REASSIGNMENT_IN_PROGRESS
Definition: rdkafkacpp.h:469
 
@ ERR__OUTDATED
Definition: rdkafkacpp.h:273
 
@ ERR_OFFSET_NOT_AVAILABLE
Definition: rdkafkacpp.h:505
 
@ ERR_THROTTLING_QUOTA_EXCEEDED
Definition: rdkafkacpp.h:529
 
@ ERR_REQUEST_TIMED_OUT
Definition: rdkafkacpp.h:352
 
@ ERR_INVALID_TRANSACTION_TIMEOUT
Definition: rdkafkacpp.h:446
 
@ ERR_CONCURRENT_TRANSACTIONS
Definition: rdkafkacpp.h:449
 
@ ERR_STALE_CTRL_EPOCH
Definition: rdkafkacpp.h:360
 
@ ERR_UNSUPPORTED_VERSION
Definition: rdkafkacpp.h:414
 
@ ERR__WAIT_COORD
Definition: rdkafkacpp.h:247
 
@ ERR_ELECTION_NOT_NEEDED
Definition: rdkafkacpp.h:518
 
@ ERR_INVALID_COMMIT_OFFSET_SIZE
Definition: rdkafkacpp.h:400
 
@ ERR_DELEGATION_TOKEN_EXPIRED
Definition: rdkafkacpp.h:481
 
@ ERR_TOPIC_AUTHORIZATION_FAILED
Definition: rdkafkacpp.h:402
 
@ ERR_MSG_SIZE_TOO_LARGE
Definition: rdkafkacpp.h:358
 
@ ERR_NETWORK_EXCEPTION
Definition: rdkafkacpp.h:364
 
@ ERR_INVALID_PRINCIPAL_TYPE
Definition: rdkafkacpp.h:483
 
@ ERR__MAX_POLL_EXCEEDED
Definition: rdkafkacpp.h:313
 
@ ERR_FENCED_LEADER_EPOCH
Definition: rdkafkacpp.h:497
 
@ ERR__BEGIN
Definition: rdkafkacpp.h:204
 
@ ERR_DELEGATION_TOKEN_AUTHORIZATION_FAILED
Definition: rdkafkacpp.h:479
 
@ ERR_UNACCEPTABLE_CREDENTIAL
Definition: rdkafkacpp.h:538
 
@ ERR_RESOURCE_NOT_FOUND
Definition: rdkafkacpp.h:534
 
@ ERR_GROUP_AUTHORIZATION_FAILED
Definition: rdkafkacpp.h:404
 
@ ERR_LOG_DIR_NOT_FOUND
Definition: rdkafkacpp.h:463
 
@ ERR__AUTO_OFFSET_RESET
Definition: rdkafkacpp.h:327
 
@ ERR__NO_OFFSET
Definition: rdkafkacpp.h:271
 
@ ERR__PURGE_INFLIGHT
Definition: rdkafkacpp.h:305
 
@ ERR_INVALID_RECORD
Definition: rdkafkacpp.h:525
 
@ ERR_FENCED_INSTANCE_ID
Definition: rdkafkacpp.h:514
 
@ ERR_ILLEGAL_SASL_STATE
Definition: rdkafkacpp.h:412
 
@ ERR__FAIL
Definition: rdkafkacpp.h:212
 
@ ERR_MEMBER_ID_REQUIRED
Definition: rdkafkacpp.h:507
 
@ ERR__EXISTING_SUBSCRIPTION
Definition: rdkafkacpp.h:255
 
@ ERR__PREV_IN_PROGRESS
Definition: rdkafkacpp.h:253
 
@ ERR_PRODUCER_FENCED
Definition: rdkafkacpp.h:532
 
@ ERR__TIMED_OUT
Definition: rdkafkacpp.h:237
 
@ ERR_PREFERRED_LEADER_NOT_AVAILABLE
Definition: rdkafkacpp.h:509
 
@ ERR__PARTITION_EOF
Definition: rdkafkacpp.h:225
 
@ ERR_NOT_ENOUGH_REPLICAS_AFTER_APPEND
Definition: rdkafkacpp.h:384
 
@ ERR__QUEUE_FULL
Definition: rdkafkacpp.h:239
 
@ ERR_INVALID_MSG
Definition: rdkafkacpp.h:342
 
@ ERR_TOPIC_DELETION_DISABLED
Definition: rdkafkacpp.h:495
 
@ ERR__VALUE_DESERIALIZATION
Definition: rdkafkacpp.h:289
 
@ ERR_DELEGATION_TOKEN_REQUEST_NOT_ALLOWED
Definition: rdkafkacpp.h:477
 
@ ERR__GAPLESS_GUARANTEE
Definition: rdkafkacpp.h:311
 
@ ERR_NOT_COORDINATOR
Definition: rdkafkacpp.h:374
 
@ ERR__UNSUPPORTED_FEATURE
Definition: rdkafkacpp.h:277
 
@ ERR_INVALID_PRODUCER_EPOCH
Definition: rdkafkacpp.h:438
 
@ ERR_DUPLICATE_SEQUENCE_NUMBER
Definition: rdkafkacpp.h:436
 
@ ERR_OFFSET_METADATA_TOO_LARGE
Definition: rdkafkacpp.h:362
 
@ ERR_NOT_ENOUGH_REPLICAS
Definition: rdkafkacpp.h:382
 
@ ERR_UNSUPPORTED_SASL_MECHANISM
Definition: rdkafkacpp.h:410
 
@ ERR__INCONSISTENT
Definition: rdkafkacpp.h:309
 
@ ERR_UNKNOWN_PRODUCER_ID
Definition: rdkafkacpp.h:467
 
@ ERR_TRANSACTIONAL_ID_AUTHORIZATION_FAILED
Definition: rdkafkacpp.h:455
 
@ ERR_UNSUPPORTED_COMPRESSION_TYPE
Definition: rdkafkacpp.h:501
 
@ ERR_UNKNOWN_LEADER_EPOCH
Definition: rdkafkacpp.h:499
 
@ ERR_LISTENER_NOT_FOUND
Definition: rdkafkacpp.h:493
 
@ ERR__PURGE_QUEUE
Definition: rdkafkacpp.h:303
 
@ ERR_SECURITY_DISABLED
Definition: rdkafkacpp.h:457
 
@ ERR__ISR_INSUFF
Definition: rdkafkacpp.h:241
 
@ ERR__FENCED
Definition: rdkafkacpp.h:319
 
@ ERR_INCONSISTENT_VOTER_SET
Definition: rdkafkacpp.h:541
 
@ ERR__BAD_COMPRESSION
Definition: rdkafkacpp.h:208
 
@ ERR__ALL_BROKERS_DOWN
Definition: rdkafkacpp.h:233
 
@ ERR__VALUE_SERIALIZATION
Definition: rdkafkacpp.h:285
 
@ ERR_OPERATION_NOT_ATTEMPTED
Definition: rdkafkacpp.h:459
 
@ ERR__END
Definition: rdkafkacpp.h:332
 
@ ERR__IN_PROGRESS
Definition: rdkafkacpp.h:251
 
@ ERR_INVALID_CONFIG
Definition: rdkafkacpp.h:424
 
@ ERR_REPLICA_NOT_AVAILABLE
Definition: rdkafkacpp.h:356
 
@ ERR_INVALID_REPLICA_ASSIGNMENT
Definition: rdkafkacpp.h:422
 
@ ERR_BROKER_NOT_AVAILABLE
Definition: rdkafkacpp.h:354
 
@ ERR__UNKNOWN_PROTOCOL
Definition: rdkafkacpp.h:265
 
@ ERR__BAD_MSG
Definition: rdkafkacpp.h:206
 
@ ERR__NOENT
Definition: rdkafkacpp.h:295
 
@ ERR_REBALANCE_IN_PROGRESS
Definition: rdkafkacpp.h:398
 
@ ERR_COORDINATOR_LOAD_IN_PROGRESS
Definition: rdkafkacpp.h:366
 
@ ERR_RECORD_LIST_TOO_LARGE
Definition: rdkafkacpp.h:380
 
@ ERR_INVALID_PRODUCER_ID_MAPPING
Definition: rdkafkacpp.h:443
 
@ ERR__PARTIAL
Definition: rdkafkacpp.h:291
 
@ ERR_TRANSACTION_COORDINATOR_FENCED
Definition: rdkafkacpp.h:453
 
@ ERR_UNKNOWN_TOPIC_OR_PART
Definition: rdkafkacpp.h:344
 
@ ERR__UNKNOWN_TOPIC
Definition: rdkafkacpp.h:231
 
@ ERR__INVALID_ARG
Definition: rdkafkacpp.h:235
 
@ ERR_UNSUPPORTED_FOR_MESSAGE_FORMAT
Definition: rdkafkacpp.h:430
 
@ ERR_INVALID_TIMESTAMP
Definition: rdkafkacpp.h:408
 
@ ERR__INVALID_TYPE
Definition: rdkafkacpp.h:299
 
@ ERR_OFFSET_OUT_OF_RANGE
Definition: rdkafkacpp.h:340
 
@ ERR_INVALID_UPDATE_VERSION
Definition: rdkafkacpp.h:543
 
@ ERR_DELEGATION_TOKEN_AUTH_DISABLED
Definition: rdkafkacpp.h:471
 
@ ERR_POLICY_VIOLATION
Definition: rdkafkacpp.h:432
 
@ ERR__KEY_DESERIALIZATION
Definition: rdkafkacpp.h:287
 
@ ERR__FS
Definition: rdkafkacpp.h:229
 
@ ERR_GROUP_MAX_SIZE_REACHED
Definition: rdkafkacpp.h:511
 
@ ERR_NOT_CONTROLLER
Definition: rdkafkacpp.h:426
 
@ ERR__INTR
Definition: rdkafkacpp.h:281
 
@ ERR__TRANSPORT
Definition: rdkafkacpp.h:214
 
RD_EXPORT int version()
Returns the librdkafka version as integer.
 
RD_EXPORT void * mem_malloc(size_t size)
Allocate memory using the same allocator librdkafka uses.
 
CertificateType
SSL certificate types.
Definition: rdkafkacpp.h:563
 
@ CERT_PRIVATE_KEY
Definition: rdkafkacpp.h:565
 
@ CERT_PUBLIC_KEY
Definition: rdkafkacpp.h:564
 
@ CERT_CA
Definition: rdkafkacpp.h:566
 
RD_EXPORT void mem_free(void *ptr)
Free pointer returned by librdkafka.