librdkafka
The Apache Kafka C/C++ client library
rdkafkacpp.h
Go to the documentation of this file.
1 /*
2  * librdkafka - Apache Kafka C/C++ library
3  *
4  * Copyright (c) 2014-2022, Magnus Edenhill
5  * 2023, Confluent Inc.
6  * All rights reserved.
7  *
8  * Redistribution and use in source and binary forms, with or without
9  * modification, are permitted provided that the following conditions are met:
10  *
11  * 1. Redistributions of source code must retain the above copyright notice,
12  * this list of conditions and the following disclaimer.
13  * 2. Redistributions in binary form must reproduce the above copyright notice,
14  * this list of conditions and the following disclaimer in the documentation
15  * and/or other materials provided with the distribution.
16  *
17  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27  * POSSIBILITY OF SUCH DAMAGE.
28  */
29 
30 #ifndef _RDKAFKACPP_H_
31 #define _RDKAFKACPP_H_
32 
51 #include <string>
52 #include <list>
53 #include <vector>
54 #include <cstdlib>
55 #include <cstring>
56 #include <stdint.h>
57 #include <sys/types.h>
58 
59 #ifdef _WIN32
60 #ifndef ssize_t
61 #ifndef _BASETSD_H_
62 #include <basetsd.h>
63 #endif
64 #ifndef _SSIZE_T_DEFINED
65 #define _SSIZE_T_DEFINED
66 typedef SSIZE_T ssize_t;
67 #endif
68 #endif
69 #undef RD_EXPORT
70 #ifdef LIBRDKAFKA_STATICLIB
71 #define RD_EXPORT
72 #else
73 #ifdef LIBRDKAFKACPP_EXPORTS
74 #define RD_EXPORT __declspec(dllexport)
75 #else
76 #define RD_EXPORT __declspec(dllimport)
77 #endif
78 #endif
79 #else
80 #define RD_EXPORT
81 #endif
82 
85 extern "C" {
86 /* Forward declarations */
87 struct rd_kafka_s;
88 struct rd_kafka_topic_s;
89 struct rd_kafka_message_s;
90 struct rd_kafka_conf_s;
91 struct rd_kafka_topic_conf_s;
92 }
93 
94 namespace RdKafka {
95 
115 #define RD_KAFKA_VERSION 0x020400ff
116 
122 RD_EXPORT
123 int version();
124 
128 RD_EXPORT
129 std::string version_str();
130 
135 RD_EXPORT
136 std::string get_debug_contexts();
137 
147 RD_EXPORT
148 int wait_destroyed(int timeout_ms);
149 
160 RD_EXPORT
161 void *mem_malloc(size_t size);
162 
176 RD_EXPORT
177 void mem_free(void *ptr);
178 
201 enum ErrorCode {
202  /* Internal errors to rdkafka: */
204  ERR__BEGIN = -200,
206  ERR__BAD_MSG = -199,
210  ERR__DESTROY = -197,
212  ERR__FAIL = -196,
218  ERR__RESOLVE = -193,
229  ERR__FS = -189,
245  ERR__SSL = -181,
263  ERR__STATE = -172,
281  ERR__INTR = -163,
291  ERR__PARTIAL = -158,
295  ERR__NOENT = -156,
301  ERR__RETRY = -153,
307  ERR__FATAL = -150,
319  ERR__FENCED = -144,
325  ERR__NOOP = -141,
330 
332  ERR__END = -100,
333 
334  /* Kafka broker errors: */
368 #define ERR_GROUP_LOAD_IN_PROGRESS ERR_COORDINATOR_LOAD_IN_PROGRESS
369 
372 #define ERR_GROUP_COORDINATOR_NOT_AVAILABLE ERR_COORDINATOR_NOT_AVAILABLE
373 
376 #define ERR_NOT_COORDINATOR_FOR_GROUP ERR_NOT_COORDINATOR
377 
548 };
549 
550 
554 RD_EXPORT
555 std::string err2str(RdKafka::ErrorCode err);
556 
557 
558 
567  CERT__CNT
568 };
569 
578  CERT_ENC__CNT
579 };
580 
586 /* Forward declarations */
587 class Handle;
588 class Producer;
589 class Message;
590 class Headers;
591 class Queue;
592 class Event;
593 class Topic;
594 class TopicPartition;
595 class Metadata;
596 class KafkaConsumer;
616 class RD_EXPORT Error {
617  public:
621  static Error *create(ErrorCode code, const std::string *errstr);
622 
623  virtual ~Error() {
624  }
625 
626  /*
627  * Error accessor methods
628  */
629 
633  virtual ErrorCode code() const = 0;
634 
638  virtual std::string name() const = 0;
639 
643  virtual std::string str() const = 0;
644 
649  virtual bool is_fatal() const = 0;
650 
654  virtual bool is_retriable() const = 0;
655 
667  virtual bool txn_requires_abort() const = 0;
668 };
669 
701 class RD_EXPORT DeliveryReportCb {
702  public:
706  virtual void dr_cb(Message &message) = 0;
707 
708  virtual ~DeliveryReportCb() {
709  }
710 };
711 
712 
740 class RD_EXPORT OAuthBearerTokenRefreshCb {
741  public:
749  virtual void oauthbearer_token_refresh_cb(
750  RdKafka::Handle *handle,
751  const std::string &oauthbearer_config) = 0;
752 
753  virtual ~OAuthBearerTokenRefreshCb() {
754  }
755 };
756 
757 
765 class RD_EXPORT PartitionerCb {
766  public:
784  virtual int32_t partitioner_cb(const Topic *topic,
785  const std::string *key,
786  int32_t partition_cnt,
787  void *msg_opaque) = 0;
788 
789  virtual ~PartitionerCb() {
790  }
791 };
792 
798  public:
807  virtual int32_t partitioner_cb(const Topic *topic,
808  const void *key,
809  size_t key_len,
810  int32_t partition_cnt,
811  void *msg_opaque) = 0;
812 
813  virtual ~PartitionerKeyPointerCb() {
814  }
815 };
816 
817 
818 
827 class RD_EXPORT EventCb {
828  public:
834  virtual void event_cb(Event &event) = 0;
835 
836  virtual ~EventCb() {
837  }
838 };
839 
840 
844 class RD_EXPORT Event {
845  public:
847  enum Type {
851  EVENT_THROTTLE
852  };
853 
855  enum Severity {
856  EVENT_SEVERITY_EMERG = 0,
857  EVENT_SEVERITY_ALERT = 1,
858  EVENT_SEVERITY_CRITICAL = 2,
859  EVENT_SEVERITY_ERROR = 3,
860  EVENT_SEVERITY_WARNING = 4,
861  EVENT_SEVERITY_NOTICE = 5,
862  EVENT_SEVERITY_INFO = 6,
863  EVENT_SEVERITY_DEBUG = 7
864  };
865 
866  virtual ~Event() {
867  }
868 
869  /*
870  * Event Accessor methods
871  */
872 
877  virtual Type type() const = 0;
878 
883  virtual ErrorCode err() const = 0;
884 
889  virtual Severity severity() const = 0;
890 
895  virtual std::string fac() const = 0;
896 
905  virtual std::string str() const = 0;
906 
911  virtual int throttle_time() const = 0;
912 
917  virtual std::string broker_name() const = 0;
918 
923  virtual int broker_id() const = 0;
924 
925 
931  virtual bool fatal() const = 0;
932 };
933 
934 
935 
939 class RD_EXPORT ConsumeCb {
940  public:
948  virtual void consume_cb(Message &message, void *opaque) = 0;
949 
950  virtual ~ConsumeCb() {
951  }
952 };
953 
954 
958 class RD_EXPORT RebalanceCb {
959  public:
1028  virtual void rebalance_cb(RdKafka::KafkaConsumer *consumer,
1029  RdKafka::ErrorCode err,
1030  std::vector<TopicPartition *> &partitions) = 0;
1031 
1032  virtual ~RebalanceCb() {
1033  }
1034 };
1035 
1036 
1040 class RD_EXPORT OffsetCommitCb {
1041  public:
1057  virtual void offset_commit_cb(RdKafka::ErrorCode err,
1058  std::vector<TopicPartition *> &offsets) = 0;
1059 
1060  virtual ~OffsetCommitCb() {
1061  }
1062 };
1063 
1064 
1065 
1071 class RD_EXPORT SslCertificateVerifyCb {
1072  public:
1109  virtual bool ssl_cert_verify_cb(const std::string &broker_name,
1110  int32_t broker_id,
1111  int *x509_error,
1112  int depth,
1113  const char *buf,
1114  size_t size,
1115  std::string &errstr) = 0;
1116 
1117  virtual ~SslCertificateVerifyCb() {
1118  }
1119 };
1120 
1121 
1126 class RD_EXPORT SocketCb {
1127  public:
1141  virtual int socket_cb(int domain, int type, int protocol) = 0;
1142 
1143  virtual ~SocketCb() {
1144  }
1145 };
1146 
1147 
1152 class RD_EXPORT OpenCb {
1153  public:
1165  virtual int open_cb(const std::string &path, int flags, int mode) = 0;
1166 
1167  virtual ~OpenCb() {
1168  }
1169 };
1170 
1171 
1191 class RD_EXPORT Conf {
1192  public:
1196  enum ConfType {
1198  CONF_TOPIC
1199  };
1200 
1204  enum ConfResult {
1205  CONF_UNKNOWN = -2,
1206  CONF_INVALID = -1,
1207  CONF_OK = 0
1208  };
1209 
1210 
1214  static Conf *create(ConfType type);
1215 
1216  virtual ~Conf() {
1217  }
1218 
1232  virtual Conf::ConfResult set(const std::string &name,
1233  const std::string &value,
1234  std::string &errstr) = 0;
1235 
1237  virtual Conf::ConfResult set(const std::string &name,
1238  DeliveryReportCb *dr_cb,
1239  std::string &errstr) = 0;
1240 
1242  virtual Conf::ConfResult set(
1243  const std::string &name,
1244  OAuthBearerTokenRefreshCb *oauthbearer_token_refresh_cb,
1245  std::string &errstr) = 0;
1246 
1248  virtual Conf::ConfResult set(const std::string &name,
1249  EventCb *event_cb,
1250  std::string &errstr) = 0;
1251 
1259  virtual Conf::ConfResult set(const std::string &name,
1260  const Conf *topic_conf,
1261  std::string &errstr) = 0;
1262 
1264  virtual Conf::ConfResult set(const std::string &name,
1265  PartitionerCb *partitioner_cb,
1266  std::string &errstr) = 0;
1267 
1269  virtual Conf::ConfResult set(const std::string &name,
1270  PartitionerKeyPointerCb *partitioner_kp_cb,
1271  std::string &errstr) = 0;
1272 
1274  virtual Conf::ConfResult set(const std::string &name,
1275  SocketCb *socket_cb,
1276  std::string &errstr) = 0;
1277 
1279  virtual Conf::ConfResult set(const std::string &name,
1280  OpenCb *open_cb,
1281  std::string &errstr) = 0;
1282 
1284  virtual Conf::ConfResult set(const std::string &name,
1285  RebalanceCb *rebalance_cb,
1286  std::string &errstr) = 0;
1287 
1289  virtual Conf::ConfResult set(const std::string &name,
1290  OffsetCommitCb *offset_commit_cb,
1291  std::string &errstr) = 0;
1292 
1297  virtual Conf::ConfResult set(const std::string &name,
1298  SslCertificateVerifyCb *ssl_cert_verify_cb,
1299  std::string &errstr) = 0;
1300 
1341  virtual Conf::ConfResult set_ssl_cert(RdKafka::CertificateType cert_type,
1343  const void *buffer,
1344  size_t size,
1345  std::string &errstr) = 0;
1346 
1359  virtual Conf::ConfResult get(const std::string &name,
1360  std::string &value) const = 0;
1361 
1365  virtual Conf::ConfResult get(DeliveryReportCb *&dr_cb) const = 0;
1366 
1370  virtual Conf::ConfResult get(
1371  OAuthBearerTokenRefreshCb *&oauthbearer_token_refresh_cb) const = 0;
1372 
1376  virtual Conf::ConfResult get(EventCb *&event_cb) const = 0;
1377 
1381  virtual Conf::ConfResult get(PartitionerCb *&partitioner_cb) const = 0;
1382 
1386  virtual Conf::ConfResult get(
1387  PartitionerKeyPointerCb *&partitioner_kp_cb) const = 0;
1388 
1392  virtual Conf::ConfResult get(SocketCb *&socket_cb) const = 0;
1393 
1397  virtual Conf::ConfResult get(OpenCb *&open_cb) const = 0;
1398 
1402  virtual Conf::ConfResult get(RebalanceCb *&rebalance_cb) const = 0;
1403 
1407  virtual Conf::ConfResult get(OffsetCommitCb *&offset_commit_cb) const = 0;
1408 
1410  virtual Conf::ConfResult get(
1411  SslCertificateVerifyCb *&ssl_cert_verify_cb) const = 0;
1412 
1415  virtual std::list<std::string> *dump() = 0;
1416 
1418  virtual Conf::ConfResult set(const std::string &name,
1419  ConsumeCb *consume_cb,
1420  std::string &errstr) = 0;
1421 
1438  virtual struct rd_kafka_conf_s *c_ptr_global() = 0;
1439 
1457  virtual struct rd_kafka_topic_conf_s *c_ptr_topic() = 0;
1458 
1471  virtual Conf::ConfResult set_engine_callback_data(void *value,
1472  std::string &errstr) = 0;
1473 
1474 
1497  virtual Conf::ConfResult enable_sasl_queue(bool enable,
1498  std::string &errstr) = 0;
1499 };
1500 
1513 class RD_EXPORT Handle {
1514  public:
1515  virtual ~Handle() {
1516  }
1517 
1519  virtual std::string name() const = 0;
1520 
1529  virtual std::string memberid() const = 0;
1530 
1531 
1556  virtual int poll(int timeout_ms) = 0;
1557 
1564  virtual int outq_len() = 0;
1565 
1581  virtual ErrorCode metadata(bool all_topics,
1582  const Topic *only_rkt,
1583  Metadata **metadatap,
1584  int timeout_ms) = 0;
1585 
1586 
1596  virtual ErrorCode pause(std::vector<TopicPartition *> &partitions) = 0;
1597 
1598 
1608  virtual ErrorCode resume(std::vector<TopicPartition *> &partitions) = 0;
1609 
1610 
1619  virtual ErrorCode query_watermark_offsets(const std::string &topic,
1620  int32_t partition,
1621  int64_t *low,
1622  int64_t *high,
1623  int timeout_ms) = 0;
1624 
1642  virtual ErrorCode get_watermark_offsets(const std::string &topic,
1643  int32_t partition,
1644  int64_t *low,
1645  int64_t *high) = 0;
1646 
1647 
1669  virtual ErrorCode offsetsForTimes(std::vector<TopicPartition *> &offsets,
1670  int timeout_ms) = 0;
1671 
1672 
1681  virtual Queue *get_partition_queue(const TopicPartition *partition) = 0;
1682 
1699  virtual ErrorCode set_log_queue(Queue *queue) = 0;
1700 
1712  virtual void yield() = 0;
1713 
1728  virtual std::string clusterid(int timeout_ms) = 0;
1729 
1746  virtual struct rd_kafka_s *c_ptr() = 0;
1747 
1763  virtual int32_t controllerid(int timeout_ms) = 0;
1764 
1765 
1787  virtual ErrorCode fatal_error(std::string &errstr) const = 0;
1788 
1828  virtual ErrorCode oauthbearer_set_token(
1829  const std::string &token_value,
1830  int64_t md_lifetime_ms,
1831  const std::string &md_principal_name,
1832  const std::list<std::string> &extensions,
1833  std::string &errstr) = 0;
1834 
1852  virtual ErrorCode oauthbearer_set_token_failure(
1853  const std::string &errstr) = 0;
1854 
1862  virtual Error *sasl_background_callbacks_enable() = 0;
1863 
1864 
1870  virtual Queue *get_sasl_queue() = 0;
1871 
1875  virtual Queue *get_background_queue() = 0;
1876 
1877 
1878 
1889  virtual void *mem_malloc(size_t size) = 0;
1890 
1904  virtual void mem_free(void *ptr) = 0;
1905 
1920  virtual Error *sasl_set_credentials(const std::string &username,
1921  const std::string &password) = 0;
1922 };
1923 
1924 
1943 class RD_EXPORT TopicPartition {
1944  public:
1950  static TopicPartition *create(const std::string &topic, int partition);
1951 
1958  static TopicPartition *create(const std::string &topic,
1959  int partition,
1960  int64_t offset);
1961 
1962  virtual ~TopicPartition() = 0;
1963 
1968  static void destroy(std::vector<TopicPartition *> &partitions);
1969 
1971  virtual const std::string &topic() const = 0;
1972 
1974  virtual int partition() const = 0;
1975 
1977  virtual int64_t offset() const = 0;
1978 
1980  virtual void set_offset(int64_t offset) = 0;
1981 
1983  virtual ErrorCode err() const = 0;
1984 
1986  virtual int32_t get_leader_epoch() = 0;
1987 
1989  virtual void set_leader_epoch(int32_t leader_epoch) = 0;
1990 
1992  virtual std::vector<unsigned char> get_metadata() = 0;
1993 
1995  virtual void set_metadata(std::vector<unsigned char> &metadata) = 0;
1996 };
1997 
1998 
1999 
2004 class RD_EXPORT Topic {
2005  public:
2012  static const int32_t PARTITION_UA;
2013 
2015  static const int64_t OFFSET_BEGINNING;
2016  static const int64_t OFFSET_END;
2017  static const int64_t OFFSET_STORED;
2018  static const int64_t OFFSET_INVALID;
2030  static Topic *create(Handle *base,
2031  const std::string &topic_str,
2032  const Conf *conf,
2033  std::string &errstr);
2034 
2035  virtual ~Topic() = 0;
2036 
2037 
2039  virtual std::string name() const = 0;
2040 
2046  virtual bool partition_available(int32_t partition) const = 0;
2047 
2064  virtual ErrorCode offset_store(int32_t partition, int64_t offset) = 0;
2065 
2082  virtual struct rd_kafka_topic_s *c_ptr() = 0;
2083 };
2084 
2085 
2107 class RD_EXPORT MessageTimestamp {
2108  public:
2113  MSG_TIMESTAMP_LOG_APPEND_TIME
2114  };
2115 
2117  int64_t timestamp;
2118 };
2119 
2120 
2130 class RD_EXPORT Headers {
2131  public:
2132  virtual ~Headers() = 0;
2133 
2142  class Header {
2143  public:
2154  Header(const std::string &key, const void *value, size_t value_size) :
2155  key_(key), err_(ERR_NO_ERROR), value_size_(value_size) {
2156  value_ = copy_value(value, value_size);
2157  }
2158 
2172  Header(const std::string &key,
2173  const void *value,
2174  size_t value_size,
2175  const RdKafka::ErrorCode err) :
2176  key_(key), err_(err), value_(NULL), value_size_(value_size) {
2177  if (err == ERR_NO_ERROR)
2178  value_ = copy_value(value, value_size);
2179  }
2180 
2186  Header(const Header &other) :
2187  key_(other.key_), err_(other.err_), value_size_(other.value_size_) {
2188  value_ = copy_value(other.value_, value_size_);
2189  }
2190 
2196  Header &operator=(const Header &other) {
2197  if (&other == this) {
2198  return *this;
2199  }
2200 
2201  key_ = other.key_;
2202  err_ = other.err_;
2203  value_size_ = other.value_size_;
2204 
2205  if (value_ != NULL)
2206  mem_free(value_);
2207 
2208  value_ = copy_value(other.value_, value_size_);
2209 
2210  return *this;
2211  }
2212 
2213  ~Header() {
2214  if (value_ != NULL)
2215  mem_free(value_);
2216  }
2217 
2219  std::string key() const {
2220  return key_;
2221  }
2222 
2224  const void *value() const {
2225  return value_;
2226  }
2227 
2230  const char *value_string() const {
2231  return static_cast<const char *>(value_);
2232  }
2233 
2235  size_t value_size() const {
2236  return value_size_;
2237  }
2238 
2241  return err_;
2242  }
2243 
2244  private:
2245  char *copy_value(const void *value, size_t value_size) {
2246  if (!value)
2247  return NULL;
2248 
2249  char *dest = (char *)mem_malloc(value_size + 1);
2250  memcpy(dest, (const char *)value, value_size);
2251  dest[value_size] = '\0';
2252 
2253  return dest;
2254  }
2255 
2256  std::string key_;
2257  RdKafka::ErrorCode err_;
2258  char *value_;
2259  size_t value_size_;
2260  void *operator new(size_t); /* Prevent dynamic allocation */
2261  };
2262 
2268  static Headers *create();
2269 
2278  static Headers *create(const std::vector<Header> &headers);
2279 
2289  virtual ErrorCode add(const std::string &key,
2290  const void *value,
2291  size_t value_size) = 0;
2292 
2303  virtual ErrorCode add(const std::string &key, const std::string &value) = 0;
2304 
2314  virtual ErrorCode add(const Header &header) = 0;
2315 
2323  virtual ErrorCode remove(const std::string &key) = 0;
2324 
2334  virtual std::vector<Header> get(const std::string &key) const = 0;
2335 
2346  virtual Header get_last(const std::string &key) const = 0;
2347 
2353  virtual std::vector<Header> get_all() const = 0;
2354 
2358  virtual size_t size() const = 0;
2359 };
2360 
2361 
2373 class RD_EXPORT Message {
2374  public:
2377  enum Status {
2381  MSG_STATUS_NOT_PERSISTED = 0,
2382 
2386  MSG_STATUS_POSSIBLY_PERSISTED = 1,
2387 
2391  MSG_STATUS_PERSISTED = 2,
2392  };
2393 
2401  virtual std::string errstr() const = 0;
2402 
2404  virtual ErrorCode err() const = 0;
2405 
2410  virtual Topic *topic() const = 0;
2411 
2413  virtual std::string topic_name() const = 0;
2414 
2416  virtual int32_t partition() const = 0;
2417 
2419  virtual void *payload() const = 0;
2420 
2422  virtual size_t len() const = 0;
2423 
2425  virtual const std::string *key() const = 0;
2426 
2428  virtual const void *key_pointer() const = 0;
2429 
2431  virtual size_t key_len() const = 0;
2432 
2434  virtual int64_t offset() const = 0;
2435 
2437  virtual MessageTimestamp timestamp() const = 0;
2438 
2440  virtual void *msg_opaque() const = 0;
2441 
2442  virtual ~Message() = 0;
2443 
2446  virtual int64_t latency() const = 0;
2447 
2464  virtual struct rd_kafka_message_s *c_ptr() = 0;
2465 
2469  virtual Status status() const = 0;
2470 
2475  virtual RdKafka::Headers *headers() = 0;
2476 
2483  virtual RdKafka::Headers *headers(RdKafka::ErrorCode *err) = 0;
2484 
2487  virtual int32_t broker_id() const = 0;
2488 
2491  virtual int32_t leader_epoch() const = 0;
2492 
2512  virtual Error *offset_store() = 0;
2513 };
2514 
2538 class RD_EXPORT Queue {
2539  public:
2543  static Queue *create(Handle *handle);
2544 
2555  virtual ErrorCode forward(Queue *dst) = 0;
2556 
2557 
2569  virtual Message *consume(int timeout_ms) = 0;
2570 
2578  virtual int poll(int timeout_ms) = 0;
2579 
2580  virtual ~Queue() = 0;
2581 
2597  virtual void io_event_enable(int fd, const void *payload, size_t size) = 0;
2598 };
2599 
2613 class RD_EXPORT ConsumerGroupMetadata {
2614  public:
2615  virtual ~ConsumerGroupMetadata() = 0;
2616 };
2617 
2635 class RD_EXPORT KafkaConsumer : public virtual Handle {
2636  public:
2648  static KafkaConsumer *create(const Conf *conf, std::string &errstr);
2649 
2650  virtual ~KafkaConsumer() = 0;
2651 
2652 
2655  virtual ErrorCode assignment(
2656  std::vector<RdKafka::TopicPartition *> &partitions) = 0;
2657 
2660  virtual ErrorCode subscription(std::vector<std::string> &topics) = 0;
2661 
2696  virtual ErrorCode subscribe(const std::vector<std::string> &topics) = 0;
2697 
2699  virtual ErrorCode unsubscribe() = 0;
2700 
2707  virtual ErrorCode assign(const std::vector<TopicPartition *> &partitions) = 0;
2708 
2712  virtual ErrorCode unassign() = 0;
2713 
2738  virtual Message *consume(int timeout_ms) = 0;
2739 
2753  virtual ErrorCode commitSync() = 0;
2754 
2760  virtual ErrorCode commitAsync() = 0;
2761 
2771  virtual ErrorCode commitSync(Message *message) = 0;
2772 
2782  virtual ErrorCode commitAsync(Message *message) = 0;
2783 
2793  virtual ErrorCode commitSync(std::vector<TopicPartition *> &offsets) = 0;
2794 
2804  virtual ErrorCode commitAsync(
2805  const std::vector<TopicPartition *> &offsets) = 0;
2806 
2817  virtual ErrorCode commitSync(OffsetCommitCb *offset_commit_cb) = 0;
2818 
2829  virtual ErrorCode commitSync(std::vector<TopicPartition *> &offsets,
2830  OffsetCommitCb *offset_commit_cb) = 0;
2831 
2832 
2833 
2842  virtual ErrorCode committed(std::vector<TopicPartition *> &partitions,
2843  int timeout_ms) = 0;
2844 
2853  virtual ErrorCode position(std::vector<TopicPartition *> &partitions) = 0;
2854 
2855 
2878  virtual ErrorCode close() = 0;
2879 
2880 
2898  virtual ErrorCode seek(const TopicPartition &partition, int timeout_ms) = 0;
2899 
2900 
2921  virtual ErrorCode offsets_store(std::vector<TopicPartition *> &offsets) = 0;
2922 
2923 
2934  virtual ConsumerGroupMetadata *groupMetadata() = 0;
2935 
2936 
2951  virtual bool assignment_lost() = 0;
2952 
2968  virtual std::string rebalance_protocol() = 0;
2969 
2970 
2986  virtual Error *incremental_assign(
2987  const std::vector<TopicPartition *> &partitions) = 0;
2988 
2989 
3005  virtual Error *incremental_unassign(
3006  const std::vector<TopicPartition *> &partitions) = 0;
3007 
3025  virtual Error *close(Queue *queue) = 0;
3026 
3027 
3032  virtual bool closed() = 0;
3033 };
3034 
3035 
3050 class RD_EXPORT Consumer : public virtual Handle {
3051  public:
3062  static Consumer *create(const Conf *conf, std::string &errstr);
3063 
3064  virtual ~Consumer() = 0;
3065 
3066 
3086  virtual ErrorCode start(Topic *topic, int32_t partition, int64_t offset) = 0;
3087 
3094  virtual ErrorCode start(Topic *topic,
3095  int32_t partition,
3096  int64_t offset,
3097  Queue *queue) = 0;
3098 
3108  virtual ErrorCode stop(Topic *topic, int32_t partition) = 0;
3109 
3124  virtual ErrorCode seek(Topic *topic,
3125  int32_t partition,
3126  int64_t offset,
3127  int timeout_ms) = 0;
3128 
3146  virtual Message *consume(Topic *topic, int32_t partition, int timeout_ms) = 0;
3147 
3169  virtual Message *consume(Queue *queue, int timeout_ms) = 0;
3170 
3190  virtual int consume_callback(Topic *topic,
3191  int32_t partition,
3192  int timeout_ms,
3193  ConsumeCb *consume_cb,
3194  void *opaque) = 0;
3195 
3202  virtual int consume_callback(Queue *queue,
3203  int timeout_ms,
3204  RdKafka::ConsumeCb *consume_cb,
3205  void *opaque) = 0;
3206 
3216  static int64_t OffsetTail(int64_t offset);
3217 };
3218 
3232 class RD_EXPORT Producer : public virtual Handle {
3233  public:
3244  static Producer *create(const Conf *conf, std::string &errstr);
3245 
3246 
3247  virtual ~Producer() = 0;
3248 
3254  enum {
3255  RK_MSG_FREE = 0x1,
3258  RK_MSG_COPY = 0x2,
3263  RK_MSG_BLOCK = 0x4
3280  /* For backwards compatibility: */
3281 #ifndef MSG_COPY /* defined in sys/msg.h */
3282  ,
3285  MSG_FREE = RK_MSG_FREE,
3286  MSG_COPY = RK_MSG_COPY
3287 #endif
3288 
3289  };
3290 
3347  virtual ErrorCode produce(Topic *topic,
3348  int32_t partition,
3349  int msgflags,
3350  void *payload,
3351  size_t len,
3352  const std::string *key,
3353  void *msg_opaque) = 0;
3354 
3359  virtual ErrorCode produce(Topic *topic,
3360  int32_t partition,
3361  int msgflags,
3362  void *payload,
3363  size_t len,
3364  const void *key,
3365  size_t key_len,
3366  void *msg_opaque) = 0;
3367 
3374  virtual ErrorCode produce(const std::string topic_name,
3375  int32_t partition,
3376  int msgflags,
3377  void *payload,
3378  size_t len,
3379  const void *key,
3380  size_t key_len,
3381  int64_t timestamp,
3382  void *msg_opaque) = 0;
3383 
3391  virtual ErrorCode produce(const std::string topic_name,
3392  int32_t partition,
3393  int msgflags,
3394  void *payload,
3395  size_t len,
3396  const void *key,
3397  size_t key_len,
3398  int64_t timestamp,
3399  RdKafka::Headers *headers,
3400  void *msg_opaque) = 0;
3401 
3402 
3407  virtual ErrorCode produce(Topic *topic,
3408  int32_t partition,
3409  const std::vector<char> *payload,
3410  const std::vector<char> *key,
3411  void *msg_opaque) = 0;
3412 
3413 
3429  virtual ErrorCode flush(int timeout_ms) = 0;
3430 
3431 
3459  virtual ErrorCode purge(int purge_flags) = 0;
3460 
3464  enum {
3465  PURGE_QUEUE = 0x1,
3467  PURGE_INFLIGHT = 0x2,
3474  PURGE_NON_BLOCKING = 0x4 /* Don't wait for background queue
3475  * purging to finish. */
3476  };
3477 
3504  virtual Error *init_transactions(int timeout_ms) = 0;
3505 
3506 
3519  virtual Error *begin_transaction() = 0;
3520 
3567  virtual Error *send_offsets_to_transaction(
3568  const std::vector<TopicPartition *> &offsets,
3569  const ConsumerGroupMetadata *group_metadata,
3570  int timeout_ms) = 0;
3571 
3600  virtual Error *commit_transaction(int timeout_ms) = 0;
3601 
3632  virtual Error *abort_transaction(int timeout_ms) = 0;
3633 
3635 };
3636 
3651  public:
3653  virtual int32_t id() const = 0;
3654 
3656  virtual std::string host() const = 0;
3657 
3659  virtual int port() const = 0;
3660 
3661  virtual ~BrokerMetadata() = 0;
3662 };
3663 
3664 
3665 
3670  public:
3672  typedef std::vector<int32_t> ReplicasVector;
3674  typedef std::vector<int32_t> ISRSVector;
3675 
3677  typedef ReplicasVector::const_iterator ReplicasIterator;
3679  typedef ISRSVector::const_iterator ISRSIterator;
3680 
3681 
3683  virtual int32_t id() const = 0;
3684 
3686  virtual ErrorCode err() const = 0;
3687 
3689  virtual int32_t leader() const = 0;
3690 
3692  virtual const std::vector<int32_t> *replicas() const = 0;
3693 
3697  virtual const std::vector<int32_t> *isrs() const = 0;
3698 
3699  virtual ~PartitionMetadata() = 0;
3700 };
3701 
3702 
3703 
3708  public:
3710  typedef std::vector<const PartitionMetadata *> PartitionMetadataVector;
3712  typedef PartitionMetadataVector::const_iterator PartitionMetadataIterator;
3713 
3715  virtual std::string topic() const = 0;
3716 
3718  virtual const PartitionMetadataVector *partitions() const = 0;
3719 
3721  virtual ErrorCode err() const = 0;
3722 
3723  virtual ~TopicMetadata() = 0;
3724 };
3725 
3726 
3730 class Metadata {
3731  public:
3733  typedef std::vector<const BrokerMetadata *> BrokerMetadataVector;
3735  typedef std::vector<const TopicMetadata *> TopicMetadataVector;
3736 
3738  typedef BrokerMetadataVector::const_iterator BrokerMetadataIterator;
3740  typedef TopicMetadataVector::const_iterator TopicMetadataIterator;
3741 
3742 
3748  virtual const BrokerMetadataVector *brokers() const = 0;
3749 
3755  virtual const TopicMetadataVector *topics() const = 0;
3756 
3758  virtual int32_t orig_broker_id() const = 0;
3759 
3761  virtual std::string orig_broker_name() const = 0;
3762 
3763  virtual ~Metadata() = 0;
3764 };
3765 
3768 } // namespace RdKafka
3769 
3770 
3771 #endif /* _RDKAFKACPP_H_ */
RdKafka::ERR_INVALID_REPLICA_ASSIGNMENT
@ ERR_INVALID_REPLICA_ASSIGNMENT
Definition: rdkafkacpp.h:422
RdKafka::ERR__TIMED_OUT_QUEUE
@ ERR__TIMED_OUT_QUEUE
Definition: rdkafkacpp.h:275
RdKafka::ERR_INVALID_SESSION_TIMEOUT
@ ERR_INVALID_SESSION_TIMEOUT
Definition: rdkafkacpp.h:396
RdKafka::BrokerMetadata::port
virtual int port() const =0
RdKafka::PartitionMetadata
Metadata: Partition information.
Definition: rdkafkacpp.h:3669
RdKafka::ERR__AUTO_OFFSET_RESET
@ ERR__AUTO_OFFSET_RESET
Definition: rdkafkacpp.h:327
RdKafka::ERR_NOT_COORDINATOR
@ ERR_NOT_COORDINATOR
Definition: rdkafkacpp.h:374
RdKafka::ERR_INVALID_PARTITIONS
@ ERR_INVALID_PARTITIONS
Definition: rdkafkacpp.h:418
RdKafka::PartitionMetadata::replicas
virtual const std::vector< int32_t > * replicas() const =0
RdKafka::ERR_INVALID_PRINCIPAL_TYPE
@ ERR_INVALID_PRINCIPAL_TYPE
Definition: rdkafkacpp.h:483
rd_kafka_message_s
A Kafka message as returned by the rd_kafka_consume*() family of functions as well as provided to the...
Definition: rdkafka.h:1453
RdKafka::CERT_CA
@ CERT_CA
Definition: rdkafkacpp.h:566
RdKafka::OAuthBearerTokenRefreshCb
SASL/OAUTHBEARER token refresh callback class.
Definition: rdkafkacpp.h:740
RdKafka::ERR_DUPLICATE_RESOURCE
@ ERR_DUPLICATE_RESOURCE
Definition: rdkafkacpp.h:536
RdKafka::TopicMetadata::err
virtual ErrorCode err() const =0
RdKafka::Topic::OFFSET_END
static const int64_t OFFSET_END
Definition: rdkafkacpp.h:2016
RdKafka::ERR_NOT_LEADER_FOR_PARTITION
@ ERR_NOT_LEADER_FOR_PARTITION
Definition: rdkafkacpp.h:350
RdKafka::PartitionMetadata::isrs
virtual const std::vector< int32_t > * isrs() const =0
RdKafka::ERR__UNDERFLOW
@ ERR__UNDERFLOW
Definition: rdkafkacpp.h:297
RdKafka::KafkaConsumer
High-level KafkaConsumer (for brokers 0.9 and later)
Definition: rdkafkacpp.h:2635
RdKafka::ERR_INCONSISTENT_GROUP_PROTOCOL
@ ERR_INCONSISTENT_GROUP_PROTOCOL
Definition: rdkafkacpp.h:390
RdKafka::Headers::Header::key
std::string key() const
Definition: rdkafkacpp.h:2219
RdKafka::ERR_ELECTION_NOT_NEEDED
@ ERR_ELECTION_NOT_NEEDED
Definition: rdkafkacpp.h:518
RdKafka::ERR__QUEUE_FULL
@ ERR__QUEUE_FULL
Definition: rdkafkacpp.h:239
RdKafka::ERR_OPERATION_NOT_ATTEMPTED
@ ERR_OPERATION_NOT_ATTEMPTED
Definition: rdkafkacpp.h:459
RdKafka::ERR_DELEGATION_TOKEN_REQUEST_NOT_ALLOWED
@ ERR_DELEGATION_TOKEN_REQUEST_NOT_ALLOWED
Definition: rdkafkacpp.h:477
RdKafka::ERR__CONFLICT
@ ERR__CONFLICT
Definition: rdkafkacpp.h:261
rd_kafka_message_s::err
rd_kafka_resp_err_t err
Definition: rdkafka.h:1454
RdKafka::Conf::CONF_GLOBAL
@ CONF_GLOBAL
Definition: rdkafkacpp.h:1197
RdKafka::DeliveryReportCb
Delivery Report callback class.
Definition: rdkafkacpp.h:701
RdKafka::ERR_INVALID_RECORD
@ ERR_INVALID_RECORD
Definition: rdkafkacpp.h:525
RdKafka::ERR__NOENT
@ ERR__NOENT
Definition: rdkafkacpp.h:295
RdKafka::ERR__INCONSISTENT
@ ERR__INCONSISTENT
Definition: rdkafkacpp.h:309
RdKafka::ERR_GROUP_ID_NOT_FOUND
@ ERR_GROUP_ID_NOT_FOUND
Definition: rdkafkacpp.h:487
RdKafka::ERR_ILLEGAL_SASL_STATE
@ ERR_ILLEGAL_SASL_STATE
Definition: rdkafkacpp.h:412
RdKafka::ERR_NETWORK_EXCEPTION
@ ERR_NETWORK_EXCEPTION
Definition: rdkafkacpp.h:364
RdKafka::RebalanceCb
KafkaConsumer: Rebalance callback class
Definition: rdkafkacpp.h:958
RdKafka::ERR__BAD_MSG
@ ERR__BAD_MSG
Definition: rdkafkacpp.h:206
RdKafka::Topic::OFFSET_STORED
static const int64_t OFFSET_STORED
Definition: rdkafkacpp.h:2017
RdKafka::ERR_CLUSTER_AUTHORIZATION_FAILED
@ ERR_CLUSTER_AUTHORIZATION_FAILED
Definition: rdkafkacpp.h:406
RdKafka::ERR__KEY_SERIALIZATION
@ ERR__KEY_SERIALIZATION
Definition: rdkafkacpp.h:283
RdKafka::ERR__CRIT_SYS_RESOURCE
@ ERR__CRIT_SYS_RESOURCE
Definition: rdkafkacpp.h:216
RdKafka::Queue
Queue interface.
Definition: rdkafkacpp.h:2538
RdKafka::ERR__FS
@ ERR__FS
Definition: rdkafkacpp.h:229
RdKafka::ERR_INVALID_TXN_STATE
@ ERR_INVALID_TXN_STATE
Definition: rdkafkacpp.h:440
RdKafka::ERR__UNSUPPORTED_FEATURE
@ ERR__UNSUPPORTED_FEATURE
Definition: rdkafkacpp.h:277
RdKafka::ERR_UNKNOWN_MEMBER_ID
@ ERR_UNKNOWN_MEMBER_ID
Definition: rdkafkacpp.h:394
RdKafka::ERR__PURGE_QUEUE
@ ERR__PURGE_QUEUE
Definition: rdkafkacpp.h:303
RdKafka::ERR_INVALID_GROUP_ID
@ ERR_INVALID_GROUP_ID
Definition: rdkafkacpp.h:392
RdKafka::ERR_THROTTLING_QUOTA_EXCEEDED
@ ERR_THROTTLING_QUOTA_EXCEEDED
Definition: rdkafkacpp.h:529
RdKafka::TopicMetadata::partitions
virtual const PartitionMetadataVector * partitions() const =0
RdKafka::ERR_INVALID_TIMESTAMP
@ ERR_INVALID_TIMESTAMP
Definition: rdkafkacpp.h:408
RdKafka::ERR__FAIL
@ ERR__FAIL
Definition: rdkafkacpp.h:212
RdKafka::Metadata::brokers
virtual const BrokerMetadataVector * brokers() const =0
Broker list.
RdKafka::Event::EVENT_LOG
@ EVENT_LOG
Definition: rdkafkacpp.h:850
RdKafka::ERR__MSG_TIMED_OUT
@ ERR__MSG_TIMED_OUT
Definition: rdkafkacpp.h:220
RdKafka::Metadata::TopicMetadataVector
std::vector< const TopicMetadata * > TopicMetadataVector
Topics.
Definition: rdkafkacpp.h:3735
RdKafka::ERR__VALUE_SERIALIZATION
@ ERR__VALUE_SERIALIZATION
Definition: rdkafkacpp.h:285
RdKafka::ERR__SSL
@ ERR__SSL
Definition: rdkafkacpp.h:245
RdKafka::Event::Severity
Severity
EVENT_LOG severities (conforms to syslog(3) severities)
Definition: rdkafkacpp.h:855
RdKafka::ErrorCode
ErrorCode
Error codes.
Definition: rdkafkacpp.h:201
RdKafka::ERR__RESOLVE
@ ERR__RESOLVE
Definition: rdkafkacpp.h:218
RdKafka::ERR__NOT_CONFIGURED
@ ERR__NOT_CONFIGURED
Definition: rdkafkacpp.h:317
RdKafka::ERR__TRANSPORT
@ ERR__TRANSPORT
Definition: rdkafkacpp.h:214
RdKafka::ERR__ISR_INSUFF
@ ERR__ISR_INSUFF
Definition: rdkafkacpp.h:241
RdKafka::ERR_NOT_ENOUGH_REPLICAS_AFTER_APPEND
@ ERR_NOT_ENOUGH_REPLICAS_AFTER_APPEND
Definition: rdkafkacpp.h:384
RdKafka::ERR__ASSIGN_PARTITIONS
@ ERR__ASSIGN_PARTITIONS
Definition: rdkafkacpp.h:257
RdKafka::ERR_OFFSET_NOT_AVAILABLE
@ ERR_OFFSET_NOT_AVAILABLE
Definition: rdkafkacpp.h:505
RdKafka::ERR_NOT_ENOUGH_REPLICAS
@ ERR_NOT_ENOUGH_REPLICAS
Definition: rdkafkacpp.h:382
RdKafka::Headers::Header::operator=
Header & operator=(const Header &other)
Assignment operator.
Definition: rdkafkacpp.h:2196
RdKafka::ERR__ALL_BROKERS_DOWN
@ ERR__ALL_BROKERS_DOWN
Definition: rdkafkacpp.h:233
RdKafka::ERR_RESOURCE_NOT_FOUND
@ ERR_RESOURCE_NOT_FOUND
Definition: rdkafkacpp.h:534
RdKafka::ERR__NODE_UPDATE
@ ERR__NODE_UPDATE
Definition: rdkafkacpp.h:243
RdKafka::Headers::Header::Header
Header(const std::string &key, const void *value, size_t value_size)
Header object to encapsulate a single Header.
Definition: rdkafkacpp.h:2154
RdKafka::ERR_DELEGATION_TOKEN_EXPIRED
@ ERR_DELEGATION_TOKEN_EXPIRED
Definition: rdkafkacpp.h:481
RdKafka::ERR_UNSUPPORTED_SASL_MECHANISM
@ ERR_UNSUPPORTED_SASL_MECHANISM
Definition: rdkafkacpp.h:410
RdKafka::ERR_TRANSACTION_COORDINATOR_FENCED
@ ERR_TRANSACTION_COORDINATOR_FENCED
Definition: rdkafkacpp.h:453
RdKafka::ERR__BAD_COMPRESSION
@ ERR__BAD_COMPRESSION
Definition: rdkafkacpp.h:208
RdKafka::Event::Type
Type
Event type.
Definition: rdkafkacpp.h:847
RdKafka::Handle
Base handle, super class for specific clients.
Definition: rdkafkacpp.h:1513
RdKafka::ERR__PREV_IN_PROGRESS
@ ERR__PREV_IN_PROGRESS
Definition: rdkafkacpp.h:253
RdKafka::BrokerMetadata::host
virtual std::string host() const =0
RdKafka::ERR_DELEGATION_TOKEN_OWNER_MISMATCH
@ ERR_DELEGATION_TOKEN_OWNER_MISMATCH
Definition: rdkafkacpp.h:475
RdKafka::ERR_SASL_AUTHENTICATION_FAILED
@ ERR_SASL_AUTHENTICATION_FAILED
Definition: rdkafkacpp.h:465
RdKafka::ERR__VALUE_DESERIALIZATION
@ ERR__VALUE_DESERIALIZATION
Definition: rdkafkacpp.h:289
RdKafka::ERR_INVALID_PRODUCER_ID_MAPPING
@ ERR_INVALID_PRODUCER_ID_MAPPING
Definition: rdkafkacpp.h:443
RdKafka::ERR__ASSIGNMENT_LOST
@ ERR__ASSIGNMENT_LOST
Definition: rdkafkacpp.h:323
RdKafka::ERR_PRODUCER_FENCED
@ ERR_PRODUCER_FENCED
Definition: rdkafkacpp.h:532
RdKafka::ERR__WAIT_CACHE
@ ERR__WAIT_CACHE
Definition: rdkafkacpp.h:279
RdKafka::ERR_UNKNOWN_PRODUCER_ID
@ ERR_UNKNOWN_PRODUCER_ID
Definition: rdkafkacpp.h:467
RdKafka::ERR_ELIGIBLE_LEADERS_NOT_AVAILABLE
@ ERR_ELIGIBLE_LEADERS_NOT_AVAILABLE
Definition: rdkafkacpp.h:516
RdKafka::OffsetCommitCb
Offset Commit callback class.
Definition: rdkafkacpp.h:1040
RdKafka::ERR_INVALID_COMMIT_OFFSET_SIZE
@ ERR_INVALID_COMMIT_OFFSET_SIZE
Definition: rdkafkacpp.h:400
RdKafka::Headers::Header
Header object.
Definition: rdkafkacpp.h:2142
RdKafka::MessageTimestamp::type
MessageTimestampType type
Definition: rdkafkacpp.h:2116
RdKafka::ERR_TOPIC_ALREADY_EXISTS
@ ERR_TOPIC_ALREADY_EXISTS
Definition: rdkafkacpp.h:416
RdKafka::ERR_FENCED_INSTANCE_ID
@ ERR_FENCED_INSTANCE_ID
Definition: rdkafkacpp.h:514
RdKafka::ERR_STALE_CTRL_EPOCH
@ ERR_STALE_CTRL_EPOCH
Definition: rdkafkacpp.h:360
RdKafka::ERR__OUTDATED
@ ERR__OUTDATED
Definition: rdkafkacpp.h:273
RdKafka::ERR_MSG_SIZE_TOO_LARGE
@ ERR_MSG_SIZE_TOO_LARGE
Definition: rdkafkacpp.h:358
RdKafka::ERR_COORDINATOR_NOT_AVAILABLE
@ ERR_COORDINATOR_NOT_AVAILABLE
Definition: rdkafkacpp.h:370
RdKafka::Headers::Header::err
RdKafka::ErrorCode err() const
Definition: rdkafkacpp.h:2240
RdKafka::ERR_INCONSISTENT_VOTER_SET
@ ERR_INCONSISTENT_VOTER_SET
Definition: rdkafkacpp.h:541
RdKafka::TopicMetadata::topic
virtual std::string topic() const =0
RdKafka::Event::EVENT_ERROR
@ EVENT_ERROR
Definition: rdkafkacpp.h:848
RdKafka::ERR_INVALID_TRANSACTION_TIMEOUT
@ ERR_INVALID_TRANSACTION_TIMEOUT
Definition: rdkafkacpp.h:446
RdKafka::Metadata::BrokerMetadataVector
std::vector< const BrokerMetadata * > BrokerMetadataVector
Brokers.
Definition: rdkafkacpp.h:3733
RdKafka::MessageTimestamp::timestamp
int64_t timestamp
Definition: rdkafkacpp.h:2117
RdKafka::ERR_UNKNOWN_LEADER_EPOCH
@ ERR_UNKNOWN_LEADER_EPOCH
Definition: rdkafkacpp.h:499
RdKafka::PartitionMetadata::id
virtual int32_t id() const =0
RdKafka::ERR_REASSIGNMENT_IN_PROGRESS
@ ERR_REASSIGNMENT_IN_PROGRESS
Definition: rdkafkacpp.h:469
RdKafka::ERR_INVALID_MSG_SIZE
@ ERR_INVALID_MSG_SIZE
Definition: rdkafkacpp.h:346
RdKafka::ERR__INVALID_ARG
@ ERR__INVALID_ARG
Definition: rdkafkacpp.h:235
RdKafka::ERR__APPLICATION
@ ERR__APPLICATION
Definition: rdkafkacpp.h:321
RdKafka::mem_malloc
RD_EXPORT void * mem_malloc(size_t size)
Allocate memory using the same allocator librdkafka uses.
RdKafka::ERR_OUT_OF_ORDER_SEQUENCE_NUMBER
@ ERR_OUT_OF_ORDER_SEQUENCE_NUMBER
Definition: rdkafkacpp.h:434
RdKafka::ERR_TOPIC_AUTHORIZATION_FAILED
@ ERR_TOPIC_AUTHORIZATION_FAILED
Definition: rdkafkacpp.h:402
RdKafka::ERR__NOT_IMPLEMENTED
@ ERR__NOT_IMPLEMENTED
Definition: rdkafkacpp.h:267
RdKafka::ERR__INTR
@ ERR__INTR
Definition: rdkafkacpp.h:281
RdKafka::ERR_RECORD_LIST_TOO_LARGE
@ ERR_RECORD_LIST_TOO_LARGE
Definition: rdkafkacpp.h:380
RdKafka::PartitionMetadata::ReplicasIterator
ReplicasVector::const_iterator ReplicasIterator
Replicas iterator.
Definition: rdkafkacpp.h:3677
RdKafka::ERR__END
@ ERR__END
Definition: rdkafkacpp.h:332
RdKafka::ERR_DELEGATION_TOKEN_NOT_FOUND
@ ERR_DELEGATION_TOKEN_NOT_FOUND
Definition: rdkafkacpp.h:473
RdKafka::ERR_KAFKA_STORAGE_ERROR
@ ERR_KAFKA_STORAGE_ERROR
Definition: rdkafkacpp.h:461
RdKafka::ERR__IN_PROGRESS
@ ERR__IN_PROGRESS
Definition: rdkafkacpp.h:251
RdKafka::EventCb
Event callback class.
Definition: rdkafkacpp.h:827
RdKafka::ERR__UNKNOWN_PARTITION
@ ERR__UNKNOWN_PARTITION
Definition: rdkafkacpp.h:227
RdKafka::ERR__GAPLESS_GUARANTEE
@ ERR__GAPLESS_GUARANTEE
Definition: rdkafkacpp.h:311
RdKafka::ERR__EXISTING_SUBSCRIPTION
@ ERR__EXISTING_SUBSCRIPTION
Definition: rdkafkacpp.h:255
RdKafka::ERR__TIMED_OUT
@ ERR__TIMED_OUT
Definition: rdkafkacpp.h:237
RdKafka::Conf
Configuration interface.
Definition: rdkafkacpp.h:1191
RdKafka::Headers::Header::value_size
size_t value_size() const
Definition: rdkafkacpp.h:2235
RdKafka::ERR_INVALID_MSG
@ ERR_INVALID_MSG
Definition: rdkafkacpp.h:342
RdKafka::MessageTimestamp::MSG_TIMESTAMP_NOT_AVAILABLE
@ MSG_TIMESTAMP_NOT_AVAILABLE
Definition: rdkafkacpp.h:2111
RdKafka::ERR__STATE
@ ERR__STATE
Definition: rdkafkacpp.h:263
RdKafka::PartitionerKeyPointerCb
Variant partitioner with key pointer.
Definition: rdkafkacpp.h:797
RdKafka::ERR_INVALID_REQUEST
@ ERR_INVALID_REQUEST
Definition: rdkafkacpp.h:428
RdKafka::ERR__FENCED
@ ERR__FENCED
Definition: rdkafkacpp.h:319
RdKafka::ERR__REVOKE_PARTITIONS
@ ERR__REVOKE_PARTITIONS
Definition: rdkafkacpp.h:259
RdKafka::ERR_DUPLICATE_SEQUENCE_NUMBER
@ ERR_DUPLICATE_SEQUENCE_NUMBER
Definition: rdkafkacpp.h:436
RdKafka::CertificateEncoding
CertificateEncoding
SSL certificate encoding.
Definition: rdkafkacpp.h:574
RdKafka::ERR_UNSUPPORTED_COMPRESSION_TYPE
@ ERR_UNSUPPORTED_COMPRESSION_TYPE
Definition: rdkafkacpp.h:501
RdKafka::ConsumerGroupMetadata
ConsumerGroupMetadata holds a consumer instance's group metadata state.
Definition: rdkafkacpp.h:2613
RdKafka::ERR_GROUP_AUTHORIZATION_FAILED
@ ERR_GROUP_AUTHORIZATION_FAILED
Definition: rdkafkacpp.h:404
RdKafka::PartitionerCb
Partitioner callback class.
Definition: rdkafkacpp.h:765
RdKafka::Metadata::topics
virtual const TopicMetadataVector * topics() const =0
Topic list.
RdKafka::TopicMetadata::PartitionMetadataVector
std::vector< const PartitionMetadata * > PartitionMetadataVector
Partitions.
Definition: rdkafkacpp.h:3710
RdKafka::ERR_UNACCEPTABLE_CREDENTIAL
@ ERR_UNACCEPTABLE_CREDENTIAL
Definition: rdkafkacpp.h:538
RdKafka::ERR__UNKNOWN_BROKER
@ ERR__UNKNOWN_BROKER
Definition: rdkafkacpp.h:315
RdKafka::ERR_PREFERRED_LEADER_NOT_AVAILABLE
@ ERR_PREFERRED_LEADER_NOT_AVAILABLE
Definition: rdkafkacpp.h:509
RdKafka::ERR__BEGIN
@ ERR__BEGIN
Definition: rdkafkacpp.h:204
RdKafka::ERR_UNKNOWN_TOPIC_OR_PART
@ ERR_UNKNOWN_TOPIC_OR_PART
Definition: rdkafkacpp.h:344
RdKafka::ConsumeCb
Consume callback class.
Definition: rdkafkacpp.h:939
RdKafka::Conf::ConfType
ConfType
Configuration object type.
Definition: rdkafkacpp.h:1196
RdKafka::Consumer
Simple Consumer (legacy)
Definition: rdkafkacpp.h:3050
RdKafka::ERR_FETCH_SESSION_ID_NOT_FOUND
@ ERR_FETCH_SESSION_ID_NOT_FOUND
Definition: rdkafkacpp.h:489
RdKafka::ERR_REPLICA_NOT_AVAILABLE
@ ERR_REPLICA_NOT_AVAILABLE
Definition: rdkafkacpp.h:356
RdKafka::Message
Message object.
Definition: rdkafkacpp.h:2373
RdKafka::ERR_POLICY_VIOLATION
@ ERR_POLICY_VIOLATION
Definition: rdkafkacpp.h:432
RdKafka::TopicMetadata
Metadata: Topic information.
Definition: rdkafkacpp.h:3707
RdKafka::ERR_BROKER_NOT_AVAILABLE
@ ERR_BROKER_NOT_AVAILABLE
Definition: rdkafkacpp.h:354
RdKafka::ERR__DESTROY
@ ERR__DESTROY
Definition: rdkafkacpp.h:210
RdKafka::Headers::Header::Header
Header(const std::string &key, const void *value, size_t value_size, const RdKafka::ErrorCode err)
Header object to encapsulate a single Header.
Definition: rdkafkacpp.h:2172
RdKafka::SslCertificateVerifyCb
SSL broker certificate verification class.
Definition: rdkafkacpp.h:1071
RdKafka::ERR_OFFSET_METADATA_TOO_LARGE
@ ERR_OFFSET_METADATA_TOO_LARGE
Definition: rdkafkacpp.h:362
RdKafka::ERR__FATAL
@ ERR__FATAL
Definition: rdkafkacpp.h:307
RdKafka::Metadata::TopicMetadataIterator
TopicMetadataVector::const_iterator TopicMetadataIterator
Topics iterator.
Definition: rdkafkacpp.h:3740
RdKafka::Error
The Error class is used as a return value from APIs to propagate an error. The error consists of an e...
Definition: rdkafkacpp.h:616
RdKafka::PartitionMetadata::err
virtual ErrorCode err() const =0
RdKafka::MessageTimestamp::MSG_TIMESTAMP_CREATE_TIME
@ MSG_TIMESTAMP_CREATE_TIME
Definition: rdkafkacpp.h:2112
RdKafka::MessageTimestamp::MessageTimestampType
MessageTimestampType
Definition: rdkafkacpp.h:2110
RdKafka::Topic::OFFSET_BEGINNING
static const int64_t OFFSET_BEGINNING
Special offsets.
Definition: rdkafkacpp.h:2015
RdKafka::Metadata::orig_broker_id
virtual int32_t orig_broker_id() const =0
Broker (id) originating this metadata.
RdKafka::Event
Event object class as passed to the EventCb callback.
Definition: rdkafkacpp.h:844
RdKafka::BrokerMetadata::id
virtual int32_t id() const =0
RdKafka::mem_free
RD_EXPORT void mem_free(void *ptr)
Free pointer returned by librdkafka.
RdKafka::ERR_UNSUPPORTED_VERSION
@ ERR_UNSUPPORTED_VERSION
Definition: rdkafkacpp.h:414
RdKafka::ERR__INVALID_TYPE
@ ERR__INVALID_TYPE
Definition: rdkafkacpp.h:299
RdKafka::ERR_REBALANCE_IN_PROGRESS
@ ERR_REBALANCE_IN_PROGRESS
Definition: rdkafkacpp.h:398
RdKafka::Topic
Topic handle.
Definition: rdkafkacpp.h:2004
RdKafka::PartitionMetadata::ISRSVector
std::vector< int32_t > ISRSVector
ISRs (In-Sync-Replicas)
Definition: rdkafkacpp.h:3674
RdKafka::Topic::PARTITION_UA
static const int32_t PARTITION_UA
Unassigned partition.
Definition: rdkafkacpp.h:2012
RdKafka::ERR_PRINCIPAL_DESERIALIZATION_FAILURE
@ ERR_PRINCIPAL_DESERIALIZATION_FAILURE
Definition: rdkafkacpp.h:547
RdKafka::ERR__UNKNOWN_GROUP
@ ERR__UNKNOWN_GROUP
Definition: rdkafkacpp.h:249
RdKafka::ERR__RETRY
@ ERR__RETRY
Definition: rdkafkacpp.h:301
RdKafka::Headers::Header::value
const void * value() const
Definition: rdkafkacpp.h:2224
RdKafka::ERR_NON_EMPTY_GROUP
@ ERR_NON_EMPTY_GROUP
Definition: rdkafkacpp.h:485
RdKafka::Conf::ConfResult
ConfResult
RdKafka::Conf::Set() result code.
Definition: rdkafkacpp.h:1204
RdKafka::TopicPartition
Topic+Partition.
Definition: rdkafkacpp.h:1943
RdKafka::ERR_MEMBER_ID_REQUIRED
@ ERR_MEMBER_ID_REQUIRED
Definition: rdkafkacpp.h:507
RdKafka::ERR_DELEGATION_TOKEN_AUTHORIZATION_FAILED
@ ERR_DELEGATION_TOKEN_AUTHORIZATION_FAILED
Definition: rdkafkacpp.h:479
RdKafka::ERR__WAIT_COORD
@ ERR__WAIT_COORD
Definition: rdkafkacpp.h:247
RdKafka::Message::Status
Status
Message persistence status can be used by the application to find out if a produced message was persi...
Definition: rdkafkacpp.h:2377
RdKafka::ERR__READ_ONLY
@ ERR__READ_ONLY
Definition: rdkafkacpp.h:293
RdKafka::ERR__NO_OFFSET
@ ERR__NO_OFFSET
Definition: rdkafkacpp.h:271
RdKafka::MessageTimestamp
Message timestamp object.
Definition: rdkafkacpp.h:2107
RdKafka::ERR_UNKNOWN
@ ERR_UNKNOWN
Definition: rdkafkacpp.h:336
RdKafka::ERR_LISTENER_NOT_FOUND
@ ERR_LISTENER_NOT_FOUND
Definition: rdkafkacpp.h:493
RdKafka::CERT_ENC_DER
@ CERT_ENC_DER
Definition: rdkafkacpp.h:576
RdKafka::ERR_LOG_DIR_NOT_FOUND
@ ERR_LOG_DIR_NOT_FOUND
Definition: rdkafkacpp.h:463
RdKafka::ERR_NO_ERROR
@ ERR_NO_ERROR
Definition: rdkafkacpp.h:338
RdKafka::ERR__NOOP
@ ERR__NOOP
Definition: rdkafkacpp.h:325
RdKafka::ERR__UNKNOWN_TOPIC
@ ERR__UNKNOWN_TOPIC
Definition: rdkafkacpp.h:231
RdKafka::BrokerMetadata
Metadata: Broker information.
Definition: rdkafkacpp.h:3650
RdKafka::ERR_OFFSET_OUT_OF_RANGE
@ ERR_OFFSET_OUT_OF_RANGE
Definition: rdkafkacpp.h:340
RdKafka::ERR__MAX_POLL_EXCEEDED
@ ERR__MAX_POLL_EXCEEDED
Definition: rdkafkacpp.h:313
RdKafka::ERR_NOT_CONTROLLER
@ ERR_NOT_CONTROLLER
Definition: rdkafkacpp.h:426
RdKafka::PartitionMetadata::leader
virtual int32_t leader() const =0
RdKafka::ERR__LOG_TRUNCATION
@ ERR__LOG_TRUNCATION
Definition: rdkafkacpp.h:329
RdKafka::CERT_ENC_PEM
@ CERT_ENC_PEM
Definition: rdkafkacpp.h:577
RdKafka::CERT_ENC_PKCS12
@ CERT_ENC_PKCS12
Definition: rdkafkacpp.h:575
RdKafka::ERR_ILLEGAL_GENERATION
@ ERR_ILLEGAL_GENERATION
Definition: rdkafkacpp.h:388
RdKafka::ERR_INVALID_PRODUCER_EPOCH
@ ERR_INVALID_PRODUCER_EPOCH
Definition: rdkafkacpp.h:438
RdKafka::ERR_SECURITY_DISABLED
@ ERR_SECURITY_DISABLED
Definition: rdkafkacpp.h:457
RdKafka::Metadata::orig_broker_name
virtual std::string orig_broker_name() const =0
Broker (name) originating this metadata.
RdKafka::ERR_TRANSACTIONAL_ID_AUTHORIZATION_FAILED
@ ERR_TRANSACTIONAL_ID_AUTHORIZATION_FAILED
Definition: rdkafkacpp.h:455
RdKafka::ERR__AUTHENTICATION
@ ERR__AUTHENTICATION
Definition: rdkafkacpp.h:269
RdKafka::CERT_PRIVATE_KEY
@ CERT_PRIVATE_KEY
Definition: rdkafkacpp.h:565
RdKafka::Producer
Producer.
Definition: rdkafkacpp.h:3232
RdKafka::ERR_LEADER_NOT_AVAILABLE
@ ERR_LEADER_NOT_AVAILABLE
Definition: rdkafkacpp.h:348
RdKafka::ERR_GROUP_SUBSCRIBED_TO_TOPIC
@ ERR_GROUP_SUBSCRIBED_TO_TOPIC
Definition: rdkafkacpp.h:523
RdKafka::ERR_FEATURE_UPDATE_FAILED
@ ERR_FEATURE_UPDATE_FAILED
Definition: rdkafkacpp.h:545
RdKafka::ERR_FENCED_LEADER_EPOCH
@ ERR_FENCED_LEADER_EPOCH
Definition: rdkafkacpp.h:497
RdKafka::ERR__PARTITION_EOF
@ ERR__PARTITION_EOF
Definition: rdkafkacpp.h:225
RdKafka::Metadata
Metadata container.
Definition: rdkafkacpp.h:3730
RdKafka::CERT_PUBLIC_KEY
@ CERT_PUBLIC_KEY
Definition: rdkafkacpp.h:564
RdKafka::Metadata::BrokerMetadataIterator
BrokerMetadataVector::const_iterator BrokerMetadataIterator
Brokers iterator.
Definition: rdkafkacpp.h:3738
RdKafka::ERR_DELEGATION_TOKEN_AUTH_DISABLED
@ ERR_DELEGATION_TOKEN_AUTH_DISABLED
Definition: rdkafkacpp.h:471
RdKafka::ERR_UNSTABLE_OFFSET_COMMIT
@ ERR_UNSTABLE_OFFSET_COMMIT
Definition: rdkafkacpp.h:527
RdKafka::ERR_INVALID_UPDATE_VERSION
@ ERR_INVALID_UPDATE_VERSION
Definition: rdkafkacpp.h:543
RdKafka::ERR_INVALID_FETCH_SESSION_EPOCH
@ ERR_INVALID_FETCH_SESSION_EPOCH
Definition: rdkafkacpp.h:491
RdKafka::ERR_TOPIC_EXCEPTION
@ ERR_TOPIC_EXCEPTION
Definition: rdkafkacpp.h:378
RdKafka::ERR__KEY_DESERIALIZATION
@ ERR__KEY_DESERIALIZATION
Definition: rdkafkacpp.h:287
RdKafka::ERR__UNKNOWN_PROTOCOL
@ ERR__UNKNOWN_PROTOCOL
Definition: rdkafkacpp.h:265
RdKafka::ERR_NO_REASSIGNMENT_IN_PROGRESS
@ ERR_NO_REASSIGNMENT_IN_PROGRESS
Definition: rdkafkacpp.h:520
RdKafka::ERR_COORDINATOR_LOAD_IN_PROGRESS
@ ERR_COORDINATOR_LOAD_IN_PROGRESS
Definition: rdkafkacpp.h:366
RdKafka::SocketCb
Portability: SocketCb callback class
Definition: rdkafkacpp.h:1126
RdKafka::ERR_TOPIC_DELETION_DISABLED
@ ERR_TOPIC_DELETION_DISABLED
Definition: rdkafkacpp.h:495
RdKafka::ERR_UNSUPPORTED_FOR_MESSAGE_FORMAT
@ ERR_UNSUPPORTED_FOR_MESSAGE_FORMAT
Definition: rdkafkacpp.h:430
RdKafka::TopicMetadata::PartitionMetadataIterator
PartitionMetadataVector::const_iterator PartitionMetadataIterator
Partitions iterator.
Definition: rdkafkacpp.h:3712
RdKafka::PartitionerKeyPointerCb::partitioner_cb
virtual int32_t partitioner_cb(const Topic *topic, const void *key, size_t key_len, int32_t partition_cnt, void *msg_opaque)=0
Variant partitioner callback that gets key as pointer and length instead of as a const std::string *.
RdKafka::ERR__PARTIAL
@ ERR__PARTIAL
Definition: rdkafkacpp.h:291
RdKafka::ERR_REQUEST_TIMED_OUT
@ ERR_REQUEST_TIMED_OUT
Definition: rdkafkacpp.h:352
RdKafka::ERR_GROUP_MAX_SIZE_REACHED
@ ERR_GROUP_MAX_SIZE_REACHED
Definition: rdkafkacpp.h:511
RdKafka::ERR_CONCURRENT_TRANSACTIONS
@ ERR_CONCURRENT_TRANSACTIONS
Definition: rdkafkacpp.h:449
RdKafka::Headers::Header::value_string
const char * value_string() const
Definition: rdkafkacpp.h:2230
RdKafka::Headers
Headers object.
Definition: rdkafkacpp.h:2130
RdKafka::Topic::OFFSET_INVALID
static const int64_t OFFSET_INVALID
Definition: rdkafkacpp.h:2018
RdKafka::Event::EVENT_STATS
@ EVENT_STATS
Definition: rdkafkacpp.h:849
RdKafka::Headers::Header::Header
Header(const Header &other)
Copy constructor.
Definition: rdkafkacpp.h:2186
RdKafka::ERR_INVALID_REPLICATION_FACTOR
@ ERR_INVALID_REPLICATION_FACTOR
Definition: rdkafkacpp.h:420
RdKafka::ERR_STALE_BROKER_EPOCH
@ ERR_STALE_BROKER_EPOCH
Definition: rdkafkacpp.h:503
RdKafka::ERR_INVALID_CONFIG
@ ERR_INVALID_CONFIG
Definition: rdkafkacpp.h:424
RdKafka::ERR_INVALID_REQUIRED_ACKS
@ ERR_INVALID_REQUIRED_ACKS
Definition: rdkafkacpp.h:386
RdKafka::ERR__PURGE_INFLIGHT
@ ERR__PURGE_INFLIGHT
Definition: rdkafkacpp.h:305
RdKafka::PartitionMetadata::ISRSIterator
ISRSVector::const_iterator ISRSIterator
ISRs iterator.
Definition: rdkafkacpp.h:3679
RdKafka::CertificateType
CertificateType
SSL certificate types.
Definition: rdkafkacpp.h:563
RdKafka::PartitionMetadata::ReplicasVector
std::vector< int32_t > ReplicasVector
Replicas.
Definition: rdkafkacpp.h:3672
RdKafka::OpenCb
Portability: OpenCb callback class
Definition: rdkafkacpp.h:1152