librdkafka
The Apache Kafka C/C++ client library
rdkafkacpp.h
Go to the documentation of this file.
1 /*
2  * librdkafka - Apache Kafka C/C++ library
3  *
4  * Copyright (c) 2014-2022 Magnus Edenhill
5  * All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions are met:
9  *
10  * 1. Redistributions of source code must retain the above copyright notice,
11  * this list of conditions and the following disclaimer.
12  * 2. Redistributions in binary form must reproduce the above copyright notice,
13  * this list of conditions and the following disclaimer in the documentation
14  * and/or other materials provided with the distribution.
15  *
16  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26  * POSSIBILITY OF SUCH DAMAGE.
27  */
28 
29 #ifndef _RDKAFKACPP_H_
30 #define _RDKAFKACPP_H_
31 
50 #include <string>
51 #include <list>
52 #include <vector>
53 #include <cstdlib>
54 #include <cstring>
55 #include <stdint.h>
56 #include <sys/types.h>
57 
58 #ifdef _WIN32
59 #ifndef ssize_t
60 #ifndef _BASETSD_H_
61 #include <basetsd.h>
62 #endif
63 #ifndef _SSIZE_T_DEFINED
64 #define _SSIZE_T_DEFINED
65 typedef SSIZE_T ssize_t;
66 #endif
67 #endif
68 #undef RD_EXPORT
69 #ifdef LIBRDKAFKA_STATICLIB
70 #define RD_EXPORT
71 #else
72 #ifdef LIBRDKAFKACPP_EXPORTS
73 #define RD_EXPORT __declspec(dllexport)
74 #else
75 #define RD_EXPORT __declspec(dllimport)
76 #endif
77 #endif
78 #else
79 #define RD_EXPORT
80 #endif
81 
84 extern "C" {
85 /* Forward declarations */
86 struct rd_kafka_s;
87 struct rd_kafka_topic_s;
88 struct rd_kafka_message_s;
89 struct rd_kafka_conf_s;
90 struct rd_kafka_topic_conf_s;
91 }
92 
93 namespace RdKafka {
94 
114 #define RD_KAFKA_VERSION 0x020002ff
115 
121 RD_EXPORT
122 int version();
123 
127 RD_EXPORT
128 std::string version_str();
129 
134 RD_EXPORT
135 std::string get_debug_contexts();
136 
146 RD_EXPORT
147 int wait_destroyed(int timeout_ms);
148 
159 RD_EXPORT
160 void *mem_malloc(size_t size);
161 
175 RD_EXPORT
176 void mem_free(void *ptr);
177 
200 enum ErrorCode {
201  /* Internal errors to rdkafka: */
203  ERR__BEGIN = -200,
205  ERR__BAD_MSG = -199,
209  ERR__DESTROY = -197,
211  ERR__FAIL = -196,
217  ERR__RESOLVE = -193,
228  ERR__FS = -189,
244  ERR__SSL = -181,
262  ERR__STATE = -172,
280  ERR__INTR = -163,
290  ERR__PARTIAL = -158,
294  ERR__NOENT = -156,
300  ERR__RETRY = -153,
306  ERR__FATAL = -150,
318  ERR__FENCED = -144,
324  ERR__NOOP = -141,
327 
329  ERR__END = -100,
330 
331  /* Kafka broker errors: */
365 #define ERR_GROUP_LOAD_IN_PROGRESS ERR_COORDINATOR_LOAD_IN_PROGRESS
369 #define ERR_GROUP_COORDINATOR_NOT_AVAILABLE ERR_COORDINATOR_NOT_AVAILABLE
373 #define ERR_NOT_COORDINATOR_FOR_GROUP ERR_NOT_COORDINATOR
545 };
546 
547 
551 RD_EXPORT
552 std::string err2str(RdKafka::ErrorCode err);
553 
554 
555 
564  CERT__CNT
565 };
566 
575  CERT_ENC__CNT
576 };
577 
583 /* Forward declarations */
584 class Handle;
585 class Producer;
586 class Message;
587 class Headers;
588 class Queue;
589 class Event;
590 class Topic;
591 class TopicPartition;
592 class Metadata;
593 class KafkaConsumer;
613 class RD_EXPORT Error {
614  public:
618  static Error *create(ErrorCode code, const std::string *errstr);
619 
620  virtual ~Error() {
621  }
622 
623  /*
624  * Error accessor methods
625  */
626 
630  virtual ErrorCode code() const = 0;
631 
635  virtual std::string name() const = 0;
636 
640  virtual std::string str() const = 0;
641 
646  virtual bool is_fatal() const = 0;
647 
651  virtual bool is_retriable() const = 0;
652 
664  virtual bool txn_requires_abort() const = 0;
665 };
666 
698 class RD_EXPORT DeliveryReportCb {
699  public:
703  virtual void dr_cb(Message &message) = 0;
704 
705  virtual ~DeliveryReportCb() {
706  }
707 };
708 
709 
737 class RD_EXPORT OAuthBearerTokenRefreshCb {
738  public:
747  RdKafka::Handle *handle,
748  const std::string &oauthbearer_config) = 0;
749 
750  virtual ~OAuthBearerTokenRefreshCb() {
751  }
752 };
753 
754 
762 class RD_EXPORT PartitionerCb {
763  public:
781  virtual int32_t partitioner_cb(const Topic *topic,
782  const std::string *key,
783  int32_t partition_cnt,
784  void *msg_opaque) = 0;
785 
786  virtual ~PartitionerCb() {
787  }
788 };
789 
795  public:
804  virtual int32_t partitioner_cb(const Topic *topic,
805  const void *key,
806  size_t key_len,
807  int32_t partition_cnt,
808  void *msg_opaque) = 0;
809 
810  virtual ~PartitionerKeyPointerCb() {
811  }
812 };
813 
814 
815 
824 class RD_EXPORT EventCb {
825  public:
831  virtual void event_cb(Event &event) = 0;
832 
833  virtual ~EventCb() {
834  }
835 };
836 
837 
841 class RD_EXPORT Event {
842  public:
844  enum Type {
848  EVENT_THROTTLE
849  };
850 
852  enum Severity {
853  EVENT_SEVERITY_EMERG = 0,
854  EVENT_SEVERITY_ALERT = 1,
855  EVENT_SEVERITY_CRITICAL = 2,
856  EVENT_SEVERITY_ERROR = 3,
857  EVENT_SEVERITY_WARNING = 4,
858  EVENT_SEVERITY_NOTICE = 5,
859  EVENT_SEVERITY_INFO = 6,
860  EVENT_SEVERITY_DEBUG = 7
861  };
862 
863  virtual ~Event() {
864  }
865 
866  /*
867  * Event Accessor methods
868  */
869 
874  virtual Type type() const = 0;
875 
880  virtual ErrorCode err() const = 0;
881 
886  virtual Severity severity() const = 0;
887 
892  virtual std::string fac() const = 0;
893 
902  virtual std::string str() const = 0;
903 
908  virtual int throttle_time() const = 0;
909 
914  virtual std::string broker_name() const = 0;
915 
920  virtual int broker_id() const = 0;
921 
922 
928  virtual bool fatal() const = 0;
929 };
930 
931 
932 
936 class RD_EXPORT ConsumeCb {
937  public:
945  virtual void consume_cb(Message &message, void *opaque) = 0;
946 
947  virtual ~ConsumeCb() {
948  }
949 };
950 
951 
955 class RD_EXPORT RebalanceCb {
956  public:
1025  virtual void rebalance_cb(RdKafka::KafkaConsumer *consumer,
1026  RdKafka::ErrorCode err,
1027  std::vector<TopicPartition *> &partitions) = 0;
1028 
1029  virtual ~RebalanceCb() {
1030  }
1031 };
1032 
1033 
1037 class RD_EXPORT OffsetCommitCb {
1038  public:
1055  std::vector<TopicPartition *> &offsets) = 0;
1056 
1057  virtual ~OffsetCommitCb() {
1058  }
1059 };
1060 
1061 
1062 
1068 class RD_EXPORT SslCertificateVerifyCb {
1069  public:
1106  virtual bool ssl_cert_verify_cb(const std::string &broker_name,
1107  int32_t broker_id,
1108  int *x509_error,
1109  int depth,
1110  const char *buf,
1111  size_t size,
1112  std::string &errstr) = 0;
1113 
1114  virtual ~SslCertificateVerifyCb() {
1115  }
1116 };
1117 
1118 
1123 class RD_EXPORT SocketCb {
1124  public:
1138  virtual int socket_cb(int domain, int type, int protocol) = 0;
1139 
1140  virtual ~SocketCb() {
1141  }
1142 };
1143 
1144 
1149 class RD_EXPORT OpenCb {
1150  public:
1162  virtual int open_cb(const std::string &path, int flags, int mode) = 0;
1163 
1164  virtual ~OpenCb() {
1165  }
1166 };
1167 
1168 
1188 class RD_EXPORT Conf {
1189  public:
1193  enum ConfType {
1195  CONF_TOPIC
1196  };
1197 
1201  enum ConfResult {
1202  CONF_UNKNOWN = -2,
1203  CONF_INVALID = -1,
1204  CONF_OK = 0
1205  };
1206 
1207 
1211  static Conf *create(ConfType type);
1212 
1213  virtual ~Conf() {
1214  }
1215 
1229  virtual Conf::ConfResult set(const std::string &name,
1230  const std::string &value,
1231  std::string &errstr) = 0;
1232 
1234  virtual Conf::ConfResult set(const std::string &name,
1235  DeliveryReportCb *dr_cb,
1236  std::string &errstr) = 0;
1237 
1240  const std::string &name,
1241  OAuthBearerTokenRefreshCb *oauthbearer_token_refresh_cb,
1242  std::string &errstr) = 0;
1243 
1245  virtual Conf::ConfResult set(const std::string &name,
1246  EventCb *event_cb,
1247  std::string &errstr) = 0;
1248 
1256  virtual Conf::ConfResult set(const std::string &name,
1257  const Conf *topic_conf,
1258  std::string &errstr) = 0;
1259 
1261  virtual Conf::ConfResult set(const std::string &name,
1262  PartitionerCb *partitioner_cb,
1263  std::string &errstr) = 0;
1264 
1266  virtual Conf::ConfResult set(const std::string &name,
1267  PartitionerKeyPointerCb *partitioner_kp_cb,
1268  std::string &errstr) = 0;
1269 
1271  virtual Conf::ConfResult set(const std::string &name,
1272  SocketCb *socket_cb,
1273  std::string &errstr) = 0;
1274 
1276  virtual Conf::ConfResult set(const std::string &name,
1277  OpenCb *open_cb,
1278  std::string &errstr) = 0;
1279 
1281  virtual Conf::ConfResult set(const std::string &name,
1282  RebalanceCb *rebalance_cb,
1283  std::string &errstr) = 0;
1284 
1286  virtual Conf::ConfResult set(const std::string &name,
1287  OffsetCommitCb *offset_commit_cb,
1288  std::string &errstr) = 0;
1289 
1294  virtual Conf::ConfResult set(const std::string &name,
1295  SslCertificateVerifyCb *ssl_cert_verify_cb,
1296  std::string &errstr) = 0;
1297 
1340  const void *buffer,
1341  size_t size,
1342  std::string &errstr) = 0;
1343 
1356  virtual Conf::ConfResult get(const std::string &name,
1357  std::string &value) const = 0;
1358 
1362  virtual Conf::ConfResult get(DeliveryReportCb *&dr_cb) const = 0;
1363 
1368  OAuthBearerTokenRefreshCb *&oauthbearer_token_refresh_cb) const = 0;
1369 
1373  virtual Conf::ConfResult get(EventCb *&event_cb) const = 0;
1374 
1378  virtual Conf::ConfResult get(PartitionerCb *&partitioner_cb) const = 0;
1379 
1384  PartitionerKeyPointerCb *&partitioner_kp_cb) const = 0;
1385 
1389  virtual Conf::ConfResult get(SocketCb *&socket_cb) const = 0;
1390 
1394  virtual Conf::ConfResult get(OpenCb *&open_cb) const = 0;
1395 
1399  virtual Conf::ConfResult get(RebalanceCb *&rebalance_cb) const = 0;
1400 
1404  virtual Conf::ConfResult get(OffsetCommitCb *&offset_commit_cb) const = 0;
1405 
1408  SslCertificateVerifyCb *&ssl_cert_verify_cb) const = 0;
1409 
1412  virtual std::list<std::string> *dump() = 0;
1413 
1415  virtual Conf::ConfResult set(const std::string &name,
1416  ConsumeCb *consume_cb,
1417  std::string &errstr) = 0;
1418 
1435  virtual struct rd_kafka_conf_s *c_ptr_global() = 0;
1436 
1454  virtual struct rd_kafka_topic_conf_s *c_ptr_topic() = 0;
1455 
1469  std::string &errstr) = 0;
1470 
1471 
1495  std::string &errstr) = 0;
1496 };
1497 
1510 class RD_EXPORT Handle {
1511  public:
1512  virtual ~Handle() {
1513  }
1514 
1516  virtual std::string name() const = 0;
1517 
1526  virtual std::string memberid() const = 0;
1527 
1528 
1553  virtual int poll(int timeout_ms) = 0;
1554 
1561  virtual int outq_len() = 0;
1562 
1578  virtual ErrorCode metadata(bool all_topics,
1579  const Topic *only_rkt,
1580  Metadata **metadatap,
1581  int timeout_ms) = 0;
1582 
1583 
1593  virtual ErrorCode pause(std::vector<TopicPartition *> &partitions) = 0;
1594 
1595 
1605  virtual ErrorCode resume(std::vector<TopicPartition *> &partitions) = 0;
1606 
1607 
1616  virtual ErrorCode query_watermark_offsets(const std::string &topic,
1617  int32_t partition,
1618  int64_t *low,
1619  int64_t *high,
1620  int timeout_ms) = 0;
1621 
1639  virtual ErrorCode get_watermark_offsets(const std::string &topic,
1640  int32_t partition,
1641  int64_t *low,
1642  int64_t *high) = 0;
1643 
1644 
1666  virtual ErrorCode offsetsForTimes(std::vector<TopicPartition *> &offsets,
1667  int timeout_ms) = 0;
1668 
1669 
1678  virtual Queue *get_partition_queue(const TopicPartition *partition) = 0;
1679 
1696  virtual ErrorCode set_log_queue(Queue *queue) = 0;
1697 
1709  virtual void yield() = 0;
1710 
1725  virtual std::string clusterid(int timeout_ms) = 0;
1726 
1743  virtual struct rd_kafka_s *c_ptr() = 0;
1744 
1760  virtual int32_t controllerid(int timeout_ms) = 0;
1761 
1762 
1784  virtual ErrorCode fatal_error(std::string &errstr) const = 0;
1785 
1826  const std::string &token_value,
1827  int64_t md_lifetime_ms,
1828  const std::string &md_principal_name,
1829  const std::list<std::string> &extensions,
1830  std::string &errstr) = 0;
1831 
1850  const std::string &errstr) = 0;
1851 
1860 
1861 
1867  virtual Queue *get_sasl_queue() = 0;
1868 
1872  virtual Queue *get_background_queue() = 0;
1873 
1874 
1875 
1886  virtual void *mem_malloc(size_t size) = 0;
1887 
1901  virtual void mem_free(void *ptr) = 0;
1902 
1917  virtual Error *sasl_set_credentials(const std::string &username,
1918  const std::string &password) = 0;
1919 };
1920 
1921 
1940 class RD_EXPORT TopicPartition {
1941  public:
1947  static TopicPartition *create(const std::string &topic, int partition);
1948 
1955  static TopicPartition *create(const std::string &topic,
1956  int partition,
1957  int64_t offset);
1958 
1959  virtual ~TopicPartition() = 0;
1960 
1965  static void destroy(std::vector<TopicPartition *> &partitions);
1966 
1968  virtual const std::string &topic() const = 0;
1969 
1971  virtual int partition() const = 0;
1972 
1974  virtual int64_t offset() const = 0;
1975 
1977  virtual void set_offset(int64_t offset) = 0;
1978 
1980  virtual ErrorCode err() const = 0;
1981 };
1982 
1983 
1984 
1989 class RD_EXPORT Topic {
1990  public:
1997  static const int32_t PARTITION_UA;
1998 
2000  static const int64_t OFFSET_BEGINNING;
2001  static const int64_t OFFSET_END;
2002  static const int64_t OFFSET_STORED;
2003  static const int64_t OFFSET_INVALID;
2015  static Topic *create(Handle *base,
2016  const std::string &topic_str,
2017  const Conf *conf,
2018  std::string &errstr);
2019 
2020  virtual ~Topic() = 0;
2021 
2022 
2024  virtual std::string name() const = 0;
2025 
2031  virtual bool partition_available(int32_t partition) const = 0;
2032 
2044  virtual ErrorCode offset_store(int32_t partition, int64_t offset) = 0;
2045 
2062  virtual struct rd_kafka_topic_s *c_ptr() = 0;
2063 };
2064 
2065 
2087 class RD_EXPORT MessageTimestamp {
2088  public:
2093  MSG_TIMESTAMP_LOG_APPEND_TIME
2094  };
2095 
2097  int64_t timestamp;
2098 };
2099 
2100 
2110 class RD_EXPORT Headers {
2111  public:
2112  virtual ~Headers() = 0;
2113 
2122  class Header {
2123  public:
2134  Header(const std::string &key, const void *value, size_t value_size) :
2135  key_(key), err_(ERR_NO_ERROR), value_size_(value_size) {
2136  value_ = copy_value(value, value_size);
2137  }
2138 
2152  Header(const std::string &key,
2153  const void *value,
2154  size_t value_size,
2155  const RdKafka::ErrorCode err) :
2156  key_(key), err_(err), value_(NULL), value_size_(value_size) {
2157  if (err == ERR_NO_ERROR)
2158  value_ = copy_value(value, value_size);
2159  }
2160 
2166  Header(const Header &other) :
2167  key_(other.key_), err_(other.err_), value_size_(other.value_size_) {
2168  value_ = copy_value(other.value_, value_size_);
2169  }
2170 
2176  Header &operator=(const Header &other) {
2177  if (&other == this) {
2178  return *this;
2179  }
2180 
2181  key_ = other.key_;
2182  err_ = other.err_;
2183  value_size_ = other.value_size_;
2184 
2185  if (value_ != NULL)
2186  mem_free(value_);
2187 
2188  value_ = copy_value(other.value_, value_size_);
2189 
2190  return *this;
2191  }
2192 
2193  ~Header() {
2194  if (value_ != NULL)
2195  mem_free(value_);
2196  }
2197 
2199  std::string key() const {
2200  return key_;
2201  }
2202 
2204  const void *value() const {
2205  return value_;
2206  }
2207 
2210  const char *value_string() const {
2211  return static_cast<const char *>(value_);
2212  }
2213 
2215  size_t value_size() const {
2216  return value_size_;
2217  }
2218 
2221  return err_;
2222  }
2223 
2224  private:
2225  char *copy_value(const void *value, size_t value_size) {
2226  if (!value)
2227  return NULL;
2228 
2229  char *dest = (char *)mem_malloc(value_size + 1);
2230  memcpy(dest, (const char *)value, value_size);
2231  dest[value_size] = '\0';
2232 
2233  return dest;
2234  }
2235 
2236  std::string key_;
2237  RdKafka::ErrorCode err_;
2238  char *value_;
2239  size_t value_size_;
2240  void *operator new(size_t); /* Prevent dynamic allocation */
2241  };
2242 
2248  static Headers *create();
2249 
2258  static Headers *create(const std::vector<Header> &headers);
2259 
2269  virtual ErrorCode add(const std::string &key,
2270  const void *value,
2271  size_t value_size) = 0;
2272 
2283  virtual ErrorCode add(const std::string &key, const std::string &value) = 0;
2284 
2294  virtual ErrorCode add(const Header &header) = 0;
2295 
2303  virtual ErrorCode remove(const std::string &key) = 0;
2304 
2314  virtual std::vector<Header> get(const std::string &key) const = 0;
2315 
2326  virtual Header get_last(const std::string &key) const = 0;
2327 
2333  virtual std::vector<Header> get_all() const = 0;
2334 
2338  virtual size_t size() const = 0;
2339 };
2340 
2341 
2353 class RD_EXPORT Message {
2354  public:
2357  enum Status {
2361  MSG_STATUS_NOT_PERSISTED = 0,
2362 
2366  MSG_STATUS_POSSIBLY_PERSISTED = 1,
2367 
2371  MSG_STATUS_PERSISTED = 2,
2372  };
2373 
2381  virtual std::string errstr() const = 0;
2382 
2384  virtual ErrorCode err() const = 0;
2385 
2390  virtual Topic *topic() const = 0;
2391 
2393  virtual std::string topic_name() const = 0;
2394 
2396  virtual int32_t partition() const = 0;
2397 
2399  virtual void *payload() const = 0;
2400 
2402  virtual size_t len() const = 0;
2403 
2405  virtual const std::string *key() const = 0;
2406 
2408  virtual const void *key_pointer() const = 0;
2409 
2411  virtual size_t key_len() const = 0;
2412 
2414  virtual int64_t offset() const = 0;
2415 
2417  virtual MessageTimestamp timestamp() const = 0;
2418 
2420  virtual void *msg_opaque() const = 0;
2421 
2422  virtual ~Message() = 0;
2423 
2426  virtual int64_t latency() const = 0;
2427 
2444  virtual struct rd_kafka_message_s *c_ptr() = 0;
2445 
2449  virtual Status status() const = 0;
2450 
2455  virtual RdKafka::Headers *headers() = 0;
2456 
2464 
2467  virtual int32_t broker_id() const = 0;
2468 };
2469 
2493 class RD_EXPORT Queue {
2494  public:
2498  static Queue *create(Handle *handle);
2499 
2510  virtual ErrorCode forward(Queue *dst) = 0;
2511 
2512 
2524  virtual Message *consume(int timeout_ms) = 0;
2525 
2533  virtual int poll(int timeout_ms) = 0;
2534 
2535  virtual ~Queue() = 0;
2536 
2552  virtual void io_event_enable(int fd, const void *payload, size_t size) = 0;
2553 };
2554 
2568 class RD_EXPORT ConsumerGroupMetadata {
2569  public:
2570  virtual ~ConsumerGroupMetadata() = 0;
2571 };
2572 
2590 class RD_EXPORT KafkaConsumer : public virtual Handle {
2591  public:
2603  static KafkaConsumer *create(const Conf *conf, std::string &errstr);
2604 
2605  virtual ~KafkaConsumer() = 0;
2606 
2607 
2611  std::vector<RdKafka::TopicPartition *> &partitions) = 0;
2612 
2615  virtual ErrorCode subscription(std::vector<std::string> &topics) = 0;
2616 
2651  virtual ErrorCode subscribe(const std::vector<std::string> &topics) = 0;
2652 
2654  virtual ErrorCode unsubscribe() = 0;
2655 
2662  virtual ErrorCode assign(const std::vector<TopicPartition *> &partitions) = 0;
2663 
2667  virtual ErrorCode unassign() = 0;
2668 
2693  virtual Message *consume(int timeout_ms) = 0;
2694 
2708  virtual ErrorCode commitSync() = 0;
2709 
2715  virtual ErrorCode commitAsync() = 0;
2716 
2726  virtual ErrorCode commitSync(Message *message) = 0;
2727 
2737  virtual ErrorCode commitAsync(Message *message) = 0;
2738 
2748  virtual ErrorCode commitSync(std::vector<TopicPartition *> &offsets) = 0;
2749 
2760  const std::vector<TopicPartition *> &offsets) = 0;
2761 
2772  virtual ErrorCode commitSync(OffsetCommitCb *offset_commit_cb) = 0;
2773 
2784  virtual ErrorCode commitSync(std::vector<TopicPartition *> &offsets,
2785  OffsetCommitCb *offset_commit_cb) = 0;
2786 
2787 
2788 
2797  virtual ErrorCode committed(std::vector<TopicPartition *> &partitions,
2798  int timeout_ms) = 0;
2799 
2808  virtual ErrorCode position(std::vector<TopicPartition *> &partitions) = 0;
2809 
2810 
2833  virtual ErrorCode close() = 0;
2834 
2835 
2853  virtual ErrorCode seek(const TopicPartition &partition, int timeout_ms) = 0;
2854 
2855 
2873  virtual ErrorCode offsets_store(std::vector<TopicPartition *> &offsets) = 0;
2874 
2875 
2887 
2888 
2903  virtual bool assignment_lost() = 0;
2904 
2920  virtual std::string rebalance_protocol() = 0;
2921 
2922 
2939  const std::vector<TopicPartition *> &partitions) = 0;
2940 
2941 
2958  const std::vector<TopicPartition *> &partitions) = 0;
2959 
2977  virtual Error *close(Queue *queue) = 0;
2978 
2979 
2984  virtual bool closed() = 0;
2985 };
2986 
2987 
3002 class RD_EXPORT Consumer : public virtual Handle {
3003  public:
3014  static Consumer *create(const Conf *conf, std::string &errstr);
3015 
3016  virtual ~Consumer() = 0;
3017 
3018 
3038  virtual ErrorCode start(Topic *topic, int32_t partition, int64_t offset) = 0;
3039 
3046  virtual ErrorCode start(Topic *topic,
3047  int32_t partition,
3048  int64_t offset,
3049  Queue *queue) = 0;
3050 
3060  virtual ErrorCode stop(Topic *topic, int32_t partition) = 0;
3061 
3076  virtual ErrorCode seek(Topic *topic,
3077  int32_t partition,
3078  int64_t offset,
3079  int timeout_ms) = 0;
3080 
3098  virtual Message *consume(Topic *topic, int32_t partition, int timeout_ms) = 0;
3099 
3121  virtual Message *consume(Queue *queue, int timeout_ms) = 0;
3122 
3142  virtual int consume_callback(Topic *topic,
3143  int32_t partition,
3144  int timeout_ms,
3145  ConsumeCb *consume_cb,
3146  void *opaque) = 0;
3147 
3154  virtual int consume_callback(Queue *queue,
3155  int timeout_ms,
3156  RdKafka::ConsumeCb *consume_cb,
3157  void *opaque) = 0;
3158 
3168  static int64_t OffsetTail(int64_t offset);
3169 };
3170 
3184 class RD_EXPORT Producer : public virtual Handle {
3185  public:
3196  static Producer *create(const Conf *conf, std::string &errstr);
3197 
3198 
3199  virtual ~Producer() = 0;
3200 
3206  enum {
3207  RK_MSG_FREE = 0x1,
3210  RK_MSG_COPY = 0x2,
3215  RK_MSG_BLOCK = 0x4
3232  /* For backwards compatibility: */
3233 #ifndef MSG_COPY /* defined in sys/msg.h */
3234  ,
3237  MSG_FREE = RK_MSG_FREE,
3238  MSG_COPY = RK_MSG_COPY
3239 #endif
3241  };
3242 
3299  virtual ErrorCode produce(Topic *topic,
3300  int32_t partition,
3301  int msgflags,
3302  void *payload,
3303  size_t len,
3304  const std::string *key,
3305  void *msg_opaque) = 0;
3306 
3311  virtual ErrorCode produce(Topic *topic,
3312  int32_t partition,
3313  int msgflags,
3314  void *payload,
3315  size_t len,
3316  const void *key,
3317  size_t key_len,
3318  void *msg_opaque) = 0;
3319 
3326  virtual ErrorCode produce(const std::string topic_name,
3327  int32_t partition,
3328  int msgflags,
3329  void *payload,
3330  size_t len,
3331  const void *key,
3332  size_t key_len,
3333  int64_t timestamp,
3334  void *msg_opaque) = 0;
3335 
3343  virtual ErrorCode produce(const std::string topic_name,
3344  int32_t partition,
3345  int msgflags,
3346  void *payload,
3347  size_t len,
3348  const void *key,
3349  size_t key_len,
3350  int64_t timestamp,
3351  RdKafka::Headers *headers,
3352  void *msg_opaque) = 0;
3353 
3354 
3359  virtual ErrorCode produce(Topic *topic,
3360  int32_t partition,
3361  const std::vector<char> *payload,
3362  const std::vector<char> *key,
3363  void *msg_opaque) = 0;
3364 
3365 
3381  virtual ErrorCode flush(int timeout_ms) = 0;
3382 
3383 
3411  virtual ErrorCode purge(int purge_flags) = 0;
3412 
3416  enum {
3417  PURGE_QUEUE = 0x1,
3419  PURGE_INFLIGHT = 0x2,
3426  PURGE_NON_BLOCKING = 0x4 /* Don't wait for background queue
3427  * purging to finish. */
3428  };
3429 
3456  virtual Error *init_transactions(int timeout_ms) = 0;
3457 
3458 
3471  virtual Error *begin_transaction() = 0;
3472 
3520  const std::vector<TopicPartition *> &offsets,
3521  const ConsumerGroupMetadata *group_metadata,
3522  int timeout_ms) = 0;
3523 
3552  virtual Error *commit_transaction(int timeout_ms) = 0;
3553 
3584  virtual Error *abort_transaction(int timeout_ms) = 0;
3585 
3587 };
3588 
3603  public:
3605  virtual int32_t id() const = 0;
3606 
3608  virtual std::string host() const = 0;
3609 
3611  virtual int port() const = 0;
3612 
3613  virtual ~BrokerMetadata() = 0;
3614 };
3615 
3616 
3617 
3622  public:
3624  typedef std::vector<int32_t> ReplicasVector;
3626  typedef std::vector<int32_t> ISRSVector;
3627 
3629  typedef ReplicasVector::const_iterator ReplicasIterator;
3631  typedef ISRSVector::const_iterator ISRSIterator;
3632 
3633 
3635  virtual int32_t id() const = 0;
3636 
3638  virtual ErrorCode err() const = 0;
3639 
3641  virtual int32_t leader() const = 0;
3642 
3644  virtual const std::vector<int32_t> *replicas() const = 0;
3645 
3649  virtual const std::vector<int32_t> *isrs() const = 0;
3650 
3651  virtual ~PartitionMetadata() = 0;
3652 };
3653 
3654 
3655 
3660  public:
3662  typedef std::vector<const PartitionMetadata *> PartitionMetadataVector;
3664  typedef PartitionMetadataVector::const_iterator PartitionMetadataIterator;
3665 
3667  virtual std::string topic() const = 0;
3668 
3670  virtual const PartitionMetadataVector *partitions() const = 0;
3671 
3673  virtual ErrorCode err() const = 0;
3674 
3675  virtual ~TopicMetadata() = 0;
3676 };
3677 
3678 
3682 class Metadata {
3683  public:
3685  typedef std::vector<const BrokerMetadata *> BrokerMetadataVector;
3687  typedef std::vector<const TopicMetadata *> TopicMetadataVector;
3688 
3690  typedef BrokerMetadataVector::const_iterator BrokerMetadataIterator;
3692  typedef TopicMetadataVector::const_iterator TopicMetadataIterator;
3693 
3694 
3700  virtual const BrokerMetadataVector *brokers() const = 0;
3701 
3707  virtual const TopicMetadataVector *topics() const = 0;
3708 
3710  virtual int32_t orig_broker_id() const = 0;
3711 
3713  virtual std::string orig_broker_name() const = 0;
3714 
3715  virtual ~Metadata() = 0;
3716 };
3717 
3720 } // namespace RdKafka
3721 
3722 
3723 #endif /* _RDKAFKACPP_H_ */
Metadata: Broker information.
Definition: rdkafkacpp.h:3602
virtual int port() const =0
virtual int32_t id() const =0
virtual std::string host() const =0
Configuration interface.
Definition: rdkafkacpp.h:1188
virtual Conf::ConfResult get(OpenCb *&open_cb) const =0
Query single configuration value.
ConfType
Configuration object type.
Definition: rdkafkacpp.h:1193
@ CONF_GLOBAL
Definition: rdkafkacpp.h:1194
virtual Conf::ConfResult set(const std::string &name, EventCb *event_cb, std::string &errstr)=0
Use with name = "event_cb".
virtual struct rd_kafka_topic_conf_s * c_ptr_topic()=0
Returns the underlying librdkafka C rd_kafka_topic_conf_t handle.
virtual Conf::ConfResult set(const std::string &name, OffsetCommitCb *offset_commit_cb, std::string &errstr)=0
Use with name = "offset_commit_cb".
ConfResult
RdKafka::Conf::Set() result code.
Definition: rdkafkacpp.h:1201
virtual Conf::ConfResult set(const std::string &name, PartitionerKeyPointerCb *partitioner_kp_cb, std::string &errstr)=0
Use with name = "partitioner_key_pointer_cb".
virtual Conf::ConfResult get(EventCb *&event_cb) const =0
Query single configuration value.
virtual Conf::ConfResult set_engine_callback_data(void *value, std::string &errstr)=0
Set callback_data for ssl engine.
static Conf * create(ConfType type)
Create configuration object.
virtual Conf::ConfResult set(const std::string &name, DeliveryReportCb *dr_cb, std::string &errstr)=0
Use with name = "dr_cb".
virtual Conf::ConfResult get(SocketCb *&socket_cb) const =0
Query single configuration value.
virtual Conf::ConfResult set(const std::string &name, const Conf *topic_conf, std::string &errstr)=0
Use with name = "default_topic_conf".
virtual Conf::ConfResult get(OAuthBearerTokenRefreshCb *&oauthbearer_token_refresh_cb) const =0
Query single configuration value.
virtual Conf::ConfResult set(const std::string &name, RebalanceCb *rebalance_cb, std::string &errstr)=0
Use with name = "rebalance_cb".
virtual Conf::ConfResult set(const std::string &name, ConsumeCb *consume_cb, std::string &errstr)=0
Use with name = "consume_cb".
virtual Conf::ConfResult get(RebalanceCb *&rebalance_cb) const =0
Query single configuration value.
virtual Conf::ConfResult get(PartitionerCb *&partitioner_cb) const =0
Query single configuration value.
virtual Conf::ConfResult get(OffsetCommitCb *&offset_commit_cb) const =0
Query single configuration value.
virtual Conf::ConfResult get(PartitionerKeyPointerCb *&partitioner_kp_cb) const =0
Query single configuration value.
virtual std::list< std::string > * dump()=0
Dump configuration names and values to list containing name,value tuples.
virtual Conf::ConfResult set_ssl_cert(RdKafka::CertificateType cert_type, RdKafka::CertificateEncoding cert_enc, const void *buffer, size_t size, std::string &errstr)=0
Set certificate/key cert_type from the cert_enc encoded memory at buffer of size bytes.
virtual Conf::ConfResult set(const std::string &name, SocketCb *socket_cb, std::string &errstr)=0
Use with name = "socket_cb".
virtual Conf::ConfResult set(const std::string &name, OpenCb *open_cb, std::string &errstr)=0
Use with name = "open_cb".
virtual Conf::ConfResult enable_sasl_queue(bool enable, std::string &errstr)=0
Enable/disable creation of a queue specific to SASL events and callbacks.
virtual Conf::ConfResult set(const std::string &name, SslCertificateVerifyCb *ssl_cert_verify_cb, std::string &errstr)=0
Use with name = "ssl_cert_verify_cb".
virtual Conf::ConfResult set(const std::string &name, OAuthBearerTokenRefreshCb *oauthbearer_token_refresh_cb, std::string &errstr)=0
Use with name = "oauthbearer_token_refresh_cb".
virtual Conf::ConfResult set(const std::string &name, const std::string &value, std::string &errstr)=0
Set configuration property name to value value.
virtual Conf::ConfResult get(SslCertificateVerifyCb *&ssl_cert_verify_cb) const =0
Use with name = "ssl_cert_verify_cb".
virtual struct rd_kafka_conf_s * c_ptr_global()=0
Returns the underlying librdkafka C rd_kafka_conf_t handle.
virtual Conf::ConfResult set(const std::string &name, PartitionerCb *partitioner_cb, std::string &errstr)=0
Use with name = "partitioner_cb".
virtual Conf::ConfResult get(DeliveryReportCb *&dr_cb) const =0
Query single configuration value.
virtual Conf::ConfResult get(const std::string &name, std::string &value) const =0
Query single configuration value.
Consume callback class.
Definition: rdkafkacpp.h:936
virtual void consume_cb(Message &message, void *opaque)=0
The consume callback is used with RdKafka::Consumer::consume_callback() methods and will be called fo...
ConsumerGroupMetadata holds a consumer instance's group metadata state.
Definition: rdkafkacpp.h:2568
Simple Consumer (legacy)
Definition: rdkafkacpp.h:3002
virtual ErrorCode start(Topic *topic, int32_t partition, int64_t offset, Queue *queue)=0
Start consuming messages for topic and partition on queue queue.
virtual Message * consume(Topic *topic, int32_t partition, int timeout_ms)=0
Consume a single message from topic and partition.
virtual ErrorCode stop(Topic *topic, int32_t partition)=0
Stop consuming messages for topic and partition, purging all messages currently in the local queue.
virtual int consume_callback(Topic *topic, int32_t partition, int timeout_ms, ConsumeCb *consume_cb, void *opaque)=0
Consumes messages from topic and partition, calling the provided callback for each consumed messsage.
virtual Message * consume(Queue *queue, int timeout_ms)=0
Consume a single message from the specified queue.
static int64_t OffsetTail(int64_t offset)
Converts an offset into the logical offset from the tail of a topic.
virtual int consume_callback(Queue *queue, int timeout_ms, RdKafka::ConsumeCb *consume_cb, void *opaque)=0
Consumes messages from queue, calling the provided callback for each consumed messsage.
static Consumer * create(const Conf *conf, std::string &errstr)
Creates a new Kafka consumer handle.
virtual ErrorCode seek(Topic *topic, int32_t partition, int64_t offset, int timeout_ms)=0
Seek consumer for topic+partition to offset which is either an absolute or logical offset.
virtual ErrorCode start(Topic *topic, int32_t partition, int64_t offset)=0
Start consuming messages for topic and partition at offset offset which may either be a proper offset...
Delivery Report callback class.
Definition: rdkafkacpp.h:698
virtual void dr_cb(Message &message)=0
Delivery report callback.
The Error class is used as a return value from APIs to propagate an error. The error consists of an e...
Definition: rdkafkacpp.h:613
virtual ErrorCode code() const =0
virtual std::string name() const =0
virtual std::string str() const =0
virtual bool is_fatal() const =0
static Error * create(ErrorCode code, const std::string *errstr)
Create error object.
virtual bool is_retriable() const =0
virtual bool txn_requires_abort() const =0
Event callback class.
Definition: rdkafkacpp.h:824
virtual void event_cb(Event &event)=0
Event callback.
Event object class as passed to the EventCb callback.
Definition: rdkafkacpp.h:841
virtual int throttle_time() const =0
virtual std::string broker_name() const =0
virtual bool fatal() const =0
virtual std::string fac() const =0
virtual ErrorCode err() const =0
virtual Type type() const =0
virtual std::string str() const =0
Severity
EVENT_LOG severities (conforms to syslog(3) severities)
Definition: rdkafkacpp.h:852
Type
Event type.
Definition: rdkafkacpp.h:844
@ EVENT_ERROR
Definition: rdkafkacpp.h:845
@ EVENT_STATS
Definition: rdkafkacpp.h:846
@ EVENT_LOG
Definition: rdkafkacpp.h:847
virtual Severity severity() const =0
virtual int broker_id() const =0
Base handle, super class for specific clients.
Definition: rdkafkacpp.h:1510
virtual Queue * get_partition_queue(const TopicPartition *partition)=0
Retrieve queue for a given partition.
virtual ErrorCode offsetsForTimes(std::vector< TopicPartition * > &offsets, int timeout_ms)=0
Look up the offsets for the given partitions by timestamp.
virtual ErrorCode oauthbearer_set_token(const std::string &token_value, int64_t md_lifetime_ms, const std::string &md_principal_name, const std::list< std::string > &extensions, std::string &errstr)=0
Set SASL/OAUTHBEARER token and metadata.
virtual ErrorCode resume(std::vector< TopicPartition * > &partitions)=0
Resume producing or consumption for the provided list of partitions.
virtual Error * sasl_background_callbacks_enable()=0
Enable SASL OAUTHBEARER refresh callbacks on the librdkafka background thread.
virtual std::string memberid() const =0
Returns the client's broker-assigned group member id.
virtual void mem_free(void *ptr)=0
Free pointer returned by librdkafka.
virtual int outq_len()=0
Returns the current out queue length.
virtual void * mem_malloc(size_t size)=0
Allocate memory using the same allocator librdkafka uses.
virtual void yield()=0
Cancels the current callback dispatcher (Handle::poll(), KafkaConsumer::consume(),...
virtual ErrorCode pause(std::vector< TopicPartition * > &partitions)=0
Pause producing or consumption for the provided list of partitions.
virtual int32_t controllerid(int timeout_ms)=0
Returns the current ControllerId (controller broker id) as reported in broker metadata.
virtual struct rd_kafka_s * c_ptr()=0
Returns the underlying librdkafka C rd_kafka_t handle.
virtual Queue * get_sasl_queue()=0
virtual int poll(int timeout_ms)=0
Polls the provided kafka handle for events.
virtual ErrorCode oauthbearer_set_token_failure(const std::string &errstr)=0
SASL/OAUTHBEARER token refresh failure indicator.
virtual ErrorCode query_watermark_offsets(const std::string &topic, int32_t partition, int64_t *low, int64_t *high, int timeout_ms)=0
Query broker for low (oldest/beginning) and high (newest/end) offsets for partition.
virtual ErrorCode get_watermark_offsets(const std::string &topic, int32_t partition, int64_t *low, int64_t *high)=0
Get last known low (oldest/beginning) and high (newest/end) offsets for partition.
virtual Error * sasl_set_credentials(const std::string &username, const std::string &password)=0
Sets SASL credentials used for SASL PLAIN and SCRAM mechanisms by this Kafka client.
virtual ErrorCode fatal_error(std::string &errstr) const =0
Returns the first fatal error set on this client instance, or ERR_NO_ERROR if no fatal error has occu...
virtual ErrorCode metadata(bool all_topics, const Topic *only_rkt, Metadata **metadatap, int timeout_ms)=0
Request Metadata from broker.
virtual ErrorCode set_log_queue(Queue *queue)=0
Forward librdkafka logs (and debug) to the specified queue for serving with one of the ....
virtual std::string name() const =0
virtual Queue * get_background_queue()=0
virtual std::string clusterid(int timeout_ms)=0
Returns the ClusterId as reported in broker metadata.
Header object.
Definition: rdkafkacpp.h:2122
size_t value_size() const
Definition: rdkafkacpp.h:2215
Header(const std::string &key, const void *value, size_t value_size)
Header object to encapsulate a single Header.
Definition: rdkafkacpp.h:2134
std::string key() const
Definition: rdkafkacpp.h:2199
const char * value_string() const
Definition: rdkafkacpp.h:2210
Header(const std::string &key, const void *value, size_t value_size, const RdKafka::ErrorCode err)
Header object to encapsulate a single Header.
Definition: rdkafkacpp.h:2152
const void * value() const
Definition: rdkafkacpp.h:2204
Header(const Header &other)
Copy constructor.
Definition: rdkafkacpp.h:2166
RdKafka::ErrorCode err() const
Definition: rdkafkacpp.h:2220
Header & operator=(const Header &other)
Assignment operator.
Definition: rdkafkacpp.h:2176
Headers object.
Definition: rdkafkacpp.h:2110
virtual std::vector< Header > get(const std::string &key) const =0
Gets all of the Headers of a given key.
virtual std::vector< Header > get_all() const =0
Returns all Headers.
virtual ErrorCode remove(const std::string &key)=0
Removes all the Headers of a given key.
static Headers * create()
Create a new instance of the Headers object.
virtual ErrorCode add(const std::string &key, const void *value, size_t value_size)=0
Adds a Header to the end of the list.
virtual ErrorCode add(const Header &header)=0
Adds a Header to the end of the list.
virtual ErrorCode add(const std::string &key, const std::string &value)=0
Adds a Header to the end of the list.
virtual Header get_last(const std::string &key) const =0
Gets the last occurrence of a Header of a given key.
static Headers * create(const std::vector< Header > &headers)
Create a new instance of the Headers object from a std::vector.
virtual size_t size() const =0
High-level KafkaConsumer (for brokers 0.9 and later)
Definition: rdkafkacpp.h:2590
virtual ErrorCode commitSync()=0
Commit offsets for the current assignment.
static KafkaConsumer * create(const Conf *conf, std::string &errstr)
Creates a KafkaConsumer.
virtual ErrorCode unassign()=0
Stop consumption and remove the current assignment.
virtual ErrorCode commitAsync(Message *message)=0
Commit offset for a single topic+partition based on message.
virtual Error * incremental_assign(const std::vector< TopicPartition * > &partitions)=0
Incrementally add partitions to the current assignment.
virtual ErrorCode subscription(std::vector< std::string > &topics)=0
Returns the current subscription as set by RdKafka::KafkaConsumer::subscribe()
virtual ErrorCode position(std::vector< TopicPartition * > &partitions)=0
Retrieve current positions (offsets) for topics+partitions.
virtual std::string rebalance_protocol()=0
The rebalance protocol currently in use. This will be "NONE" if the consumer has not (yet) joined a g...
virtual ErrorCode close()=0
Close and shut down the consumer.
virtual ErrorCode commitAsync(const std::vector< TopicPartition * > &offsets)=0
Commit offset for the provided list of partitions.
virtual ErrorCode commitSync(OffsetCommitCb *offset_commit_cb)=0
Commit offsets for the current assignment.
virtual bool assignment_lost()=0
Check whether the consumer considers the current assignment to have been lost involuntarily....
virtual ErrorCode commitAsync()=0
Asynchronous version of RdKafka::KafkaConsumer::CommitSync()
virtual ErrorCode subscribe(const std::vector< std::string > &topics)=0
Update the subscription set to topics.
virtual ConsumerGroupMetadata * groupMetadata()=0
virtual Error * close(Queue *queue)=0
Close and shut down the consumer.
virtual bool closed()=0
virtual Message * consume(int timeout_ms)=0
Consume message or get error event, triggers callbacks.
virtual ErrorCode commitSync(std::vector< TopicPartition * > &offsets)=0
Commit offsets for the provided list of partitions.
virtual ErrorCode unsubscribe()=0
Unsubscribe from the current subscription set.
virtual ErrorCode commitSync(std::vector< TopicPartition * > &offsets, OffsetCommitCb *offset_commit_cb)=0
Commit offsets for the provided list of partitions.
virtual ErrorCode committed(std::vector< TopicPartition * > &partitions, int timeout_ms)=0
Retrieve committed offsets for topics+partitions.
virtual ErrorCode seek(const TopicPartition &partition, int timeout_ms)=0
Seek consumer for topic+partition to offset which is either an absolute or logical offset.
virtual ErrorCode offsets_store(std::vector< TopicPartition * > &offsets)=0
Store offset offset for topic partition partition. The offset will be committed (written) to the offs...
virtual ErrorCode commitSync(Message *message)=0
Commit offset for a single topic+partition based on message.
virtual Error * incremental_unassign(const std::vector< TopicPartition * > &partitions)=0
Incrementally remove partitions from the current assignment.
virtual ErrorCode assign(const std::vector< TopicPartition * > &partitions)=0
Update the assignment set to partitions.
virtual ErrorCode assignment(std::vector< RdKafka::TopicPartition * > &partitions)=0
Returns the current partition assignment as set by RdKafka::KafkaConsumer::assign()
Message timestamp object.
Definition: rdkafkacpp.h:2087
int64_t timestamp
Definition: rdkafkacpp.h:2097
MessageTimestampType
Definition: rdkafkacpp.h:2090
@ MSG_TIMESTAMP_CREATE_TIME
Definition: rdkafkacpp.h:2092
@ MSG_TIMESTAMP_NOT_AVAILABLE
Definition: rdkafkacpp.h:2091
MessageTimestampType type
Definition: rdkafkacpp.h:2096
Message object.
Definition: rdkafkacpp.h:2353
virtual std::string topic_name() const =0
virtual Status status() const =0
Returns the message's persistence status in the topic log.
virtual const std::string * key() const =0
virtual std::string errstr() const =0
Accessor functions*.
virtual int32_t broker_id() const =0
virtual const void * key_pointer() const =0
virtual RdKafka::Headers * headers()=0
virtual int64_t offset() const =0
virtual size_t len() const =0
virtual struct rd_kafka_message_s * c_ptr()=0
Returns the underlying librdkafka C rd_kafka_message_t handle.
virtual Topic * topic() const =0
virtual MessageTimestamp timestamp() const =0
virtual int64_t latency() const =0
virtual ErrorCode err() const =0
Status
Message persistence status can be used by the application to find out if a produced message was persi...
Definition: rdkafkacpp.h:2357
virtual size_t key_len() const =0
virtual RdKafka::Headers * headers(RdKafka::ErrorCode *err)=0
virtual void * msg_opaque() const =0
virtual void * payload() const =0
virtual int32_t partition() const =0
Metadata container.
Definition: rdkafkacpp.h:3682
virtual const TopicMetadataVector * topics() const =0
Topic list.
virtual int32_t orig_broker_id() const =0
Broker (id) originating this metadata.
virtual const BrokerMetadataVector * brokers() const =0
Broker list.
std::vector< const TopicMetadata * > TopicMetadataVector
Topics.
Definition: rdkafkacpp.h:3687
std::vector< const BrokerMetadata * > BrokerMetadataVector
Brokers.
Definition: rdkafkacpp.h:3685
virtual std::string orig_broker_name() const =0
Broker (name) originating this metadata.
BrokerMetadataVector::const_iterator BrokerMetadataIterator
Brokers iterator.
Definition: rdkafkacpp.h:3690
TopicMetadataVector::const_iterator TopicMetadataIterator
Topics iterator.
Definition: rdkafkacpp.h:3692
SASL/OAUTHBEARER token refresh callback class.
Definition: rdkafkacpp.h:737
virtual void oauthbearer_token_refresh_cb(RdKafka::Handle *handle, const std::string &oauthbearer_config)=0
SASL/OAUTHBEARER token refresh callback class.
Offset Commit callback class.
Definition: rdkafkacpp.h:1037
virtual void offset_commit_cb(RdKafka::ErrorCode err, std::vector< TopicPartition * > &offsets)=0
Set offset commit callback for use with consumer groups.
Portability: OpenCb callback class
Definition: rdkafkacpp.h:1149
virtual int open_cb(const std::string &path, int flags, int mode)=0
Open callback The open callback is responsible for opening the file specified by pathname,...
Metadata: Partition information.
Definition: rdkafkacpp.h:3621
std::vector< int32_t > ISRSVector
ISRs (In-Sync-Replicas)
Definition: rdkafkacpp.h:3626
ISRSVector::const_iterator ISRSIterator
ISRs iterator.
Definition: rdkafkacpp.h:3631
virtual int32_t id() const =0
ReplicasVector::const_iterator ReplicasIterator
Replicas iterator.
Definition: rdkafkacpp.h:3629
virtual ErrorCode err() const =0
virtual const std::vector< int32_t > * isrs() const =0
virtual const std::vector< int32_t > * replicas() const =0
virtual int32_t leader() const =0
std::vector< int32_t > ReplicasVector
Replicas.
Definition: rdkafkacpp.h:3624
Partitioner callback class.
Definition: rdkafkacpp.h:762
virtual int32_t partitioner_cb(const Topic *topic, const std::string *key, int32_t partition_cnt, void *msg_opaque)=0
Partitioner callback.
Variant partitioner with key pointer.
Definition: rdkafkacpp.h:794
virtual int32_t partitioner_cb(const Topic *topic, const void *key, size_t key_len, int32_t partition_cnt, void *msg_opaque)=0
Variant partitioner callback that gets key as pointer and length instead of as a const std::string *.
Producer.
Definition: rdkafkacpp.h:3184
virtual Error * abort_transaction(int timeout_ms)=0
Aborts the ongoing transaction.
virtual Error * init_transactions(int timeout_ms)=0
Initialize transactions for the producer instance.
virtual ErrorCode produce(const std::string topic_name, int32_t partition, int msgflags, void *payload, size_t len, const void *key, size_t key_len, int64_t timestamp, void *msg_opaque)=0
produce() variant that takes topic as a string (no need for creating a Topic object),...
virtual Error * commit_transaction(int timeout_ms)=0
Commit the current transaction as started with begin_transaction().
virtual ErrorCode produce(Topic *topic, int32_t partition, int msgflags, void *payload, size_t len, const void *key, size_t key_len, void *msg_opaque)=0
Variant produce() that passes the key as a pointer and length instead of as a const std::string *.
virtual Error * send_offsets_to_transaction(const std::vector< TopicPartition * > &offsets, const ConsumerGroupMetadata *group_metadata, int timeout_ms)=0
Sends a list of topic partition offsets to the consumer group coordinator for group_metadata,...
virtual ErrorCode purge(int purge_flags)=0
Purge messages currently handled by the producer instance.
virtual ErrorCode produce(const std::string topic_name, int32_t partition, int msgflags, void *payload, size_t len, const void *key, size_t key_len, int64_t timestamp, RdKafka::Headers *headers, void *msg_opaque)=0
produce() variant that that allows for Header support on produce Otherwise identical to produce() abo...
virtual ErrorCode produce(Topic *topic, int32_t partition, int msgflags, void *payload, size_t len, const std::string *key, void *msg_opaque)=0
Produce and send a single message to broker.
virtual ErrorCode produce(Topic *topic, int32_t partition, const std::vector< char > *payload, const std::vector< char > *key, void *msg_opaque)=0
Variant produce() that accepts vectors for key and payload. The vector data will be copied.
virtual Error * begin_transaction()=0
init_transactions() must have been called successfully (once) before this function is called.
static Producer * create(const Conf *conf, std::string &errstr)
Creates a new Kafka producer handle.
virtual ErrorCode flush(int timeout_ms)=0
Wait until all outstanding produce requests, et.al, are completed. This should typically be done prio...
Queue interface.
Definition: rdkafkacpp.h:2493
static Queue * create(Handle *handle)
Create Queue object.
virtual ErrorCode forward(Queue *dst)=0
Forward/re-route queue to dst. If dst is NULL, the forwarding is removed.
virtual int poll(int timeout_ms)=0
Poll queue, serving any enqueued callbacks.
virtual void io_event_enable(int fd, const void *payload, size_t size)=0
Enable IO event triggering for queue.
virtual Message * consume(int timeout_ms)=0
Consume message or get error event from the queue.
KafkaConsumer: Rebalance callback class
Definition: rdkafkacpp.h:955
virtual void rebalance_cb(RdKafka::KafkaConsumer *consumer, RdKafka::ErrorCode err, std::vector< TopicPartition * > &partitions)=0
Group rebalance callback for use with RdKafka::KafkaConsumer.
Portability: SocketCb callback class
Definition: rdkafkacpp.h:1123
virtual int socket_cb(int domain, int type, int protocol)=0
Socket callback.
SSL broker certificate verification class.
Definition: rdkafkacpp.h:1068
virtual bool ssl_cert_verify_cb(const std::string &broker_name, int32_t broker_id, int *x509_error, int depth, const char *buf, size_t size, std::string &errstr)=0
SSL broker certificate verification callback.
Metadata: Topic information.
Definition: rdkafkacpp.h:3659
virtual ErrorCode err() const =0
virtual const PartitionMetadataVector * partitions() const =0
std::vector< const PartitionMetadata * > PartitionMetadataVector
Partitions.
Definition: rdkafkacpp.h:3662
PartitionMetadataVector::const_iterator PartitionMetadataIterator
Partitions iterator.
Definition: rdkafkacpp.h:3664
virtual std::string topic() const =0
Topic+Partition.
Definition: rdkafkacpp.h:1940
static void destroy(std::vector< TopicPartition * > &partitions)
Destroy/delete the TopicPartitions in partitions and clear the vector.
virtual const std::string & topic() const =0
static TopicPartition * create(const std::string &topic, int partition, int64_t offset)
Create topic+partition object for topic and partition with offset offset.
virtual void set_offset(int64_t offset)=0
Set offset.
virtual int partition() const =0
static TopicPartition * create(const std::string &topic, int partition)
Create topic+partition object for topic and partition.
virtual int64_t offset() const =0
virtual ErrorCode err() const =0
Topic handle.
Definition: rdkafkacpp.h:1989
static const int64_t OFFSET_INVALID
Definition: rdkafkacpp.h:2003
virtual std::string name() const =0
virtual ErrorCode offset_store(int32_t partition, int64_t offset)=0
Store offset offset + 1 for topic partition partition. The offset will be committed (written) to the ...
static const int64_t OFFSET_BEGINNING
Special offsets.
Definition: rdkafkacpp.h:2000
static Topic * create(Handle *base, const std::string &topic_str, const Conf *conf, std::string &errstr)
Creates a new topic handle for topic named topic_str.
virtual bool partition_available(int32_t partition) const =0
static const int64_t OFFSET_STORED
Definition: rdkafkacpp.h:2002
virtual struct rd_kafka_topic_s * c_ptr()=0
Returns the underlying librdkafka C rd_kafka_topic_t handle.
static const int32_t PARTITION_UA
Unassigned partition.
Definition: rdkafkacpp.h:1997
static const int64_t OFFSET_END
Definition: rdkafkacpp.h:2001
RD_EXPORT std::string get_debug_contexts()
Returns a CSV list of the supported debug contexts for use with Conf::Set("debug",...
RD_EXPORT std::string version_str()
Returns the librdkafka version as string.
CertificateEncoding
SSL certificate encoding.
Definition: rdkafkacpp.h:571
@ CERT_ENC_PKCS12
Definition: rdkafkacpp.h:572
@ CERT_ENC_DER
Definition: rdkafkacpp.h:573
@ CERT_ENC_PEM
Definition: rdkafkacpp.h:574
RD_EXPORT int wait_destroyed(int timeout_ms)
Wait for all rd_kafka_t objects to be destroyed.
RD_EXPORT std::string err2str(RdKafka::ErrorCode err)
Returns a human readable representation of a kafka error.
ErrorCode
Error codes.
Definition: rdkafkacpp.h:200
@ ERR__APPLICATION
Definition: rdkafkacpp.h:320
@ ERR_PRINCIPAL_DESERIALIZATION_FAILURE
Definition: rdkafkacpp.h:544
@ ERR_NOT_LEADER_FOR_PARTITION
Definition: rdkafkacpp.h:347
@ ERR_NON_EMPTY_GROUP
Definition: rdkafkacpp.h:482
@ ERR__DESTROY
Definition: rdkafkacpp.h:209
@ ERR_INVALID_REQUEST
Definition: rdkafkacpp.h:425
@ ERR_GROUP_ID_NOT_FOUND
Definition: rdkafkacpp.h:484
@ ERR_INVALID_SESSION_TIMEOUT
Definition: rdkafkacpp.h:393
@ ERR__UNDERFLOW
Definition: rdkafkacpp.h:296
@ ERR__ASSIGN_PARTITIONS
Definition: rdkafkacpp.h:256
@ ERR__UNKNOWN_BROKER
Definition: rdkafkacpp.h:314
@ ERR_CLUSTER_AUTHORIZATION_FAILED
Definition: rdkafkacpp.h:403
@ ERR_UNKNOWN
Definition: rdkafkacpp.h:333
@ ERR_ELIGIBLE_LEADERS_NOT_AVAILABLE
Definition: rdkafkacpp.h:513
@ ERR__REVOKE_PARTITIONS
Definition: rdkafkacpp.h:258
@ ERR__FATAL
Definition: rdkafkacpp.h:306
@ ERR_SASL_AUTHENTICATION_FAILED
Definition: rdkafkacpp.h:462
@ ERR_FEATURE_UPDATE_FAILED
Definition: rdkafkacpp.h:542
@ ERR__CRIT_SYS_RESOURCE
Definition: rdkafkacpp.h:215
@ ERR__ASSIGNMENT_LOST
Definition: rdkafkacpp.h:322
@ ERR_STALE_BROKER_EPOCH
Definition: rdkafkacpp.h:500
@ ERR_INVALID_TXN_STATE
Definition: rdkafkacpp.h:437
@ ERR__UNKNOWN_PARTITION
Definition: rdkafkacpp.h:226
@ ERR__WAIT_CACHE
Definition: rdkafkacpp.h:278
@ ERR_UNSTABLE_OFFSET_COMMIT
Definition: rdkafkacpp.h:524
@ ERR__NODE_UPDATE
Definition: rdkafkacpp.h:242
@ ERR_NO_ERROR
Definition: rdkafkacpp.h:335
@ ERR_INCONSISTENT_GROUP_PROTOCOL
Definition: rdkafkacpp.h:387
@ ERR__UNKNOWN_GROUP
Definition: rdkafkacpp.h:248
@ ERR_INVALID_PARTITIONS
Definition: rdkafkacpp.h:415
@ ERR_DELEGATION_TOKEN_NOT_FOUND
Definition: rdkafkacpp.h:470
@ ERR__NOOP
Definition: rdkafkacpp.h:324
@ ERR_NO_REASSIGNMENT_IN_PROGRESS
Definition: rdkafkacpp.h:517
@ ERR__NOT_IMPLEMENTED
Definition: rdkafkacpp.h:266
@ ERR__MSG_TIMED_OUT
Definition: rdkafkacpp.h:219
@ ERR_INVALID_REPLICATION_FACTOR
Definition: rdkafkacpp.h:417
@ ERR__READ_ONLY
Definition: rdkafkacpp.h:292
@ ERR_UNKNOWN_MEMBER_ID
Definition: rdkafkacpp.h:391
@ ERR_OUT_OF_ORDER_SEQUENCE_NUMBER
Definition: rdkafkacpp.h:431
@ ERR_INVALID_GROUP_ID
Definition: rdkafkacpp.h:389
@ ERR__RETRY
Definition: rdkafkacpp.h:300
@ ERR__TIMED_OUT_QUEUE
Definition: rdkafkacpp.h:274
@ ERR__NOT_CONFIGURED
Definition: rdkafkacpp.h:316
@ ERR_DUPLICATE_RESOURCE
Definition: rdkafkacpp.h:533
@ ERR_LEADER_NOT_AVAILABLE
Definition: rdkafkacpp.h:345
@ ERR_INVALID_FETCH_SESSION_EPOCH
Definition: rdkafkacpp.h:488
@ ERR_KAFKA_STORAGE_ERROR
Definition: rdkafkacpp.h:458
@ ERR_ILLEGAL_GENERATION
Definition: rdkafkacpp.h:385
@ ERR_TOPIC_ALREADY_EXISTS
Definition: rdkafkacpp.h:413
@ ERR__AUTHENTICATION
Definition: rdkafkacpp.h:268
@ ERR_TOPIC_EXCEPTION
Definition: rdkafkacpp.h:375
@ ERR__RESOLVE
Definition: rdkafkacpp.h:217
@ ERR_INVALID_REQUIRED_ACKS
Definition: rdkafkacpp.h:383
@ ERR_FETCH_SESSION_ID_NOT_FOUND
Definition: rdkafkacpp.h:486
@ ERR__SSL
Definition: rdkafkacpp.h:244
@ ERR_COORDINATOR_NOT_AVAILABLE
Definition: rdkafkacpp.h:367
@ ERR__CONFLICT
Definition: rdkafkacpp.h:260
@ ERR_GROUP_SUBSCRIBED_TO_TOPIC
Definition: rdkafkacpp.h:520
@ ERR__STATE
Definition: rdkafkacpp.h:262
@ ERR__KEY_SERIALIZATION
Definition: rdkafkacpp.h:282
@ ERR_INVALID_MSG_SIZE
Definition: rdkafkacpp.h:343
@ ERR_DELEGATION_TOKEN_OWNER_MISMATCH
Definition: rdkafkacpp.h:472
@ ERR_REASSIGNMENT_IN_PROGRESS
Definition: rdkafkacpp.h:466
@ ERR__OUTDATED
Definition: rdkafkacpp.h:272
@ ERR_OFFSET_NOT_AVAILABLE
Definition: rdkafkacpp.h:502
@ ERR_THROTTLING_QUOTA_EXCEEDED
Definition: rdkafkacpp.h:526
@ ERR_REQUEST_TIMED_OUT
Definition: rdkafkacpp.h:349
@ ERR_INVALID_TRANSACTION_TIMEOUT
Definition: rdkafkacpp.h:443
@ ERR_CONCURRENT_TRANSACTIONS
Definition: rdkafkacpp.h:446
@ ERR_STALE_CTRL_EPOCH
Definition: rdkafkacpp.h:357
@ ERR_UNSUPPORTED_VERSION
Definition: rdkafkacpp.h:411
@ ERR__WAIT_COORD
Definition: rdkafkacpp.h:246
@ ERR_ELECTION_NOT_NEEDED
Definition: rdkafkacpp.h:515
@ ERR_INVALID_COMMIT_OFFSET_SIZE
Definition: rdkafkacpp.h:397
@ ERR_DELEGATION_TOKEN_EXPIRED
Definition: rdkafkacpp.h:478
@ ERR_TOPIC_AUTHORIZATION_FAILED
Definition: rdkafkacpp.h:399
@ ERR_MSG_SIZE_TOO_LARGE
Definition: rdkafkacpp.h:355
@ ERR_NETWORK_EXCEPTION
Definition: rdkafkacpp.h:361
@ ERR_INVALID_PRINCIPAL_TYPE
Definition: rdkafkacpp.h:480
@ ERR__MAX_POLL_EXCEEDED
Definition: rdkafkacpp.h:312
@ ERR_FENCED_LEADER_EPOCH
Definition: rdkafkacpp.h:494
@ ERR__BEGIN
Definition: rdkafkacpp.h:203
@ ERR_DELEGATION_TOKEN_AUTHORIZATION_FAILED
Definition: rdkafkacpp.h:476
@ ERR_UNACCEPTABLE_CREDENTIAL
Definition: rdkafkacpp.h:535
@ ERR_RESOURCE_NOT_FOUND
Definition: rdkafkacpp.h:531
@ ERR_GROUP_AUTHORIZATION_FAILED
Definition: rdkafkacpp.h:401
@ ERR_LOG_DIR_NOT_FOUND
Definition: rdkafkacpp.h:460
@ ERR__AUTO_OFFSET_RESET
Definition: rdkafkacpp.h:326
@ ERR__NO_OFFSET
Definition: rdkafkacpp.h:270
@ ERR__PURGE_INFLIGHT
Definition: rdkafkacpp.h:304
@ ERR_INVALID_RECORD
Definition: rdkafkacpp.h:522
@ ERR_FENCED_INSTANCE_ID
Definition: rdkafkacpp.h:511
@ ERR_ILLEGAL_SASL_STATE
Definition: rdkafkacpp.h:409
@ ERR__FAIL
Definition: rdkafkacpp.h:211
@ ERR_MEMBER_ID_REQUIRED
Definition: rdkafkacpp.h:504
@ ERR__EXISTING_SUBSCRIPTION
Definition: rdkafkacpp.h:254
@ ERR__PREV_IN_PROGRESS
Definition: rdkafkacpp.h:252
@ ERR_PRODUCER_FENCED
Definition: rdkafkacpp.h:529
@ ERR__TIMED_OUT
Definition: rdkafkacpp.h:236
@ ERR_PREFERRED_LEADER_NOT_AVAILABLE
Definition: rdkafkacpp.h:506
@ ERR__PARTITION_EOF
Definition: rdkafkacpp.h:224
@ ERR_NOT_ENOUGH_REPLICAS_AFTER_APPEND
Definition: rdkafkacpp.h:381
@ ERR__QUEUE_FULL
Definition: rdkafkacpp.h:238
@ ERR_INVALID_MSG
Definition: rdkafkacpp.h:339
@ ERR_TOPIC_DELETION_DISABLED
Definition: rdkafkacpp.h:492
@ ERR__VALUE_DESERIALIZATION
Definition: rdkafkacpp.h:288
@ ERR_DELEGATION_TOKEN_REQUEST_NOT_ALLOWED
Definition: rdkafkacpp.h:474
@ ERR__GAPLESS_GUARANTEE
Definition: rdkafkacpp.h:310
@ ERR_NOT_COORDINATOR
Definition: rdkafkacpp.h:371
@ ERR__UNSUPPORTED_FEATURE
Definition: rdkafkacpp.h:276
@ ERR_INVALID_PRODUCER_EPOCH
Definition: rdkafkacpp.h:435
@ ERR_DUPLICATE_SEQUENCE_NUMBER
Definition: rdkafkacpp.h:433
@ ERR_OFFSET_METADATA_TOO_LARGE
Definition: rdkafkacpp.h:359
@ ERR_NOT_ENOUGH_REPLICAS
Definition: rdkafkacpp.h:379
@ ERR_UNSUPPORTED_SASL_MECHANISM
Definition: rdkafkacpp.h:407
@ ERR__INCONSISTENT
Definition: rdkafkacpp.h:308
@ ERR_UNKNOWN_PRODUCER_ID
Definition: rdkafkacpp.h:464
@ ERR_TRANSACTIONAL_ID_AUTHORIZATION_FAILED
Definition: rdkafkacpp.h:452
@ ERR_UNSUPPORTED_COMPRESSION_TYPE
Definition: rdkafkacpp.h:498
@ ERR_UNKNOWN_LEADER_EPOCH
Definition: rdkafkacpp.h:496
@ ERR_LISTENER_NOT_FOUND
Definition: rdkafkacpp.h:490
@ ERR__PURGE_QUEUE
Definition: rdkafkacpp.h:302
@ ERR_SECURITY_DISABLED
Definition: rdkafkacpp.h:454
@ ERR__ISR_INSUFF
Definition: rdkafkacpp.h:240
@ ERR__FENCED
Definition: rdkafkacpp.h:318
@ ERR_INCONSISTENT_VOTER_SET
Definition: rdkafkacpp.h:538
@ ERR__BAD_COMPRESSION
Definition: rdkafkacpp.h:207
@ ERR__ALL_BROKERS_DOWN
Definition: rdkafkacpp.h:232
@ ERR__VALUE_SERIALIZATION
Definition: rdkafkacpp.h:284
@ ERR_OPERATION_NOT_ATTEMPTED
Definition: rdkafkacpp.h:456
@ ERR__END
Definition: rdkafkacpp.h:329
@ ERR__IN_PROGRESS
Definition: rdkafkacpp.h:250
@ ERR_INVALID_CONFIG
Definition: rdkafkacpp.h:421
@ ERR_REPLICA_NOT_AVAILABLE
Definition: rdkafkacpp.h:353
@ ERR_INVALID_REPLICA_ASSIGNMENT
Definition: rdkafkacpp.h:419
@ ERR_BROKER_NOT_AVAILABLE
Definition: rdkafkacpp.h:351
@ ERR__UNKNOWN_PROTOCOL
Definition: rdkafkacpp.h:264
@ ERR__BAD_MSG
Definition: rdkafkacpp.h:205
@ ERR__NOENT
Definition: rdkafkacpp.h:294
@ ERR_REBALANCE_IN_PROGRESS
Definition: rdkafkacpp.h:395
@ ERR_COORDINATOR_LOAD_IN_PROGRESS
Definition: rdkafkacpp.h:363
@ ERR_RECORD_LIST_TOO_LARGE
Definition: rdkafkacpp.h:377
@ ERR_INVALID_PRODUCER_ID_MAPPING
Definition: rdkafkacpp.h:440
@ ERR__PARTIAL
Definition: rdkafkacpp.h:290
@ ERR_TRANSACTION_COORDINATOR_FENCED
Definition: rdkafkacpp.h:450
@ ERR_UNKNOWN_TOPIC_OR_PART
Definition: rdkafkacpp.h:341
@ ERR__UNKNOWN_TOPIC
Definition: rdkafkacpp.h:230
@ ERR__INVALID_ARG
Definition: rdkafkacpp.h:234
@ ERR_UNSUPPORTED_FOR_MESSAGE_FORMAT
Definition: rdkafkacpp.h:427
@ ERR_INVALID_TIMESTAMP
Definition: rdkafkacpp.h:405
@ ERR__INVALID_TYPE
Definition: rdkafkacpp.h:298
@ ERR_OFFSET_OUT_OF_RANGE
Definition: rdkafkacpp.h:337
@ ERR_INVALID_UPDATE_VERSION
Definition: rdkafkacpp.h:540
@ ERR_DELEGATION_TOKEN_AUTH_DISABLED
Definition: rdkafkacpp.h:468
@ ERR_POLICY_VIOLATION
Definition: rdkafkacpp.h:429
@ ERR__KEY_DESERIALIZATION
Definition: rdkafkacpp.h:286
@ ERR__FS
Definition: rdkafkacpp.h:228
@ ERR_GROUP_MAX_SIZE_REACHED
Definition: rdkafkacpp.h:508
@ ERR_NOT_CONTROLLER
Definition: rdkafkacpp.h:423
@ ERR__INTR
Definition: rdkafkacpp.h:280
@ ERR__TRANSPORT
Definition: rdkafkacpp.h:213
RD_EXPORT int version()
Returns the librdkafka version as integer.
RD_EXPORT void * mem_malloc(size_t size)
Allocate memory using the same allocator librdkafka uses.
CertificateType
SSL certificate types.
Definition: rdkafkacpp.h:560
@ CERT_PRIVATE_KEY
Definition: rdkafkacpp.h:562
@ CERT_PUBLIC_KEY
Definition: rdkafkacpp.h:561
@ CERT_CA
Definition: rdkafkacpp.h:563
RD_EXPORT void mem_free(void *ptr)
Free pointer returned by librdkafka.