librdkafka
The Apache Kafka C/C++ client library
rdkafkacpp.h
Go to the documentation of this file.
1 /*
2  * librdkafka - Apache Kafka C/C++ library
3  *
4  * Copyright (c) 2014-2022 Magnus Edenhill
5  * All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions are met:
9  *
10  * 1. Redistributions of source code must retain the above copyright notice,
11  * this list of conditions and the following disclaimer.
12  * 2. Redistributions in binary form must reproduce the above copyright notice,
13  * this list of conditions and the following disclaimer in the documentation
14  * and/or other materials provided with the distribution.
15  *
16  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26  * POSSIBILITY OF SUCH DAMAGE.
27  */
28 
29 #ifndef _RDKAFKACPP_H_
30 #define _RDKAFKACPP_H_
31 
50 #include <string>
51 #include <list>
52 #include <vector>
53 #include <cstdlib>
54 #include <cstring>
55 #include <stdint.h>
56 #include <sys/types.h>
57 
58 #ifdef _WIN32
59 #ifndef ssize_t
60 #ifndef _BASETSD_H_
61 #include <basetsd.h>
62 #endif
63 #ifndef _SSIZE_T_DEFINED
64 #define _SSIZE_T_DEFINED
65 typedef SSIZE_T ssize_t;
66 #endif
67 #endif
68 #undef RD_EXPORT
69 #ifdef LIBRDKAFKA_STATICLIB
70 #define RD_EXPORT
71 #else
72 #ifdef LIBRDKAFKACPP_EXPORTS
73 #define RD_EXPORT __declspec(dllexport)
74 #else
75 #define RD_EXPORT __declspec(dllimport)
76 #endif
77 #endif
78 #else
79 #define RD_EXPORT
80 #endif
81 
84 extern "C" {
85 /* Forward declarations */
86 struct rd_kafka_s;
87 struct rd_kafka_topic_s;
88 struct rd_kafka_message_s;
89 struct rd_kafka_conf_s;
90 struct rd_kafka_topic_conf_s;
91 }
92 
93 namespace RdKafka {
94 
114 #define RD_KAFKA_VERSION 0x020100ff
115 
121 RD_EXPORT
122 int version();
123 
127 RD_EXPORT
128 std::string version_str();
129 
134 RD_EXPORT
135 std::string get_debug_contexts();
136 
146 RD_EXPORT
147 int wait_destroyed(int timeout_ms);
148 
159 RD_EXPORT
160 void *mem_malloc(size_t size);
161 
175 RD_EXPORT
176 void mem_free(void *ptr);
177 
200 enum ErrorCode {
201  /* Internal errors to rdkafka: */
203  ERR__BEGIN = -200,
205  ERR__BAD_MSG = -199,
209  ERR__DESTROY = -197,
211  ERR__FAIL = -196,
217  ERR__RESOLVE = -193,
228  ERR__FS = -189,
244  ERR__SSL = -181,
262  ERR__STATE = -172,
280  ERR__INTR = -163,
290  ERR__PARTIAL = -158,
294  ERR__NOENT = -156,
300  ERR__RETRY = -153,
306  ERR__FATAL = -150,
318  ERR__FENCED = -144,
324  ERR__NOOP = -141,
329 
331  ERR__END = -100,
332 
333  /* Kafka broker errors: */
367 #define ERR_GROUP_LOAD_IN_PROGRESS ERR_COORDINATOR_LOAD_IN_PROGRESS
368 
371 #define ERR_GROUP_COORDINATOR_NOT_AVAILABLE ERR_COORDINATOR_NOT_AVAILABLE
372 
375 #define ERR_NOT_COORDINATOR_FOR_GROUP ERR_NOT_COORDINATOR
376 
547 };
548 
549 
553 RD_EXPORT
554 std::string err2str(RdKafka::ErrorCode err);
555 
556 
557 
566  CERT__CNT
567 };
568 
577  CERT_ENC__CNT
578 };
579 
585 /* Forward declarations */
586 class Handle;
587 class Producer;
588 class Message;
589 class Headers;
590 class Queue;
591 class Event;
592 class Topic;
593 class TopicPartition;
594 class Metadata;
595 class KafkaConsumer;
615 class RD_EXPORT Error {
616  public:
620  static Error *create(ErrorCode code, const std::string *errstr);
621 
622  virtual ~Error() {
623  }
624 
625  /*
626  * Error accessor methods
627  */
628 
632  virtual ErrorCode code() const = 0;
633 
637  virtual std::string name() const = 0;
638 
642  virtual std::string str() const = 0;
643 
648  virtual bool is_fatal() const = 0;
649 
653  virtual bool is_retriable() const = 0;
654 
666  virtual bool txn_requires_abort() const = 0;
667 };
668 
700 class RD_EXPORT DeliveryReportCb {
701  public:
705  virtual void dr_cb(Message &message) = 0;
706 
707  virtual ~DeliveryReportCb() {
708  }
709 };
710 
711 
739 class RD_EXPORT OAuthBearerTokenRefreshCb {
740  public:
748  virtual void oauthbearer_token_refresh_cb(
749  RdKafka::Handle *handle,
750  const std::string &oauthbearer_config) = 0;
751 
752  virtual ~OAuthBearerTokenRefreshCb() {
753  }
754 };
755 
756 
764 class RD_EXPORT PartitionerCb {
765  public:
783  virtual int32_t partitioner_cb(const Topic *topic,
784  const std::string *key,
785  int32_t partition_cnt,
786  void *msg_opaque) = 0;
787 
788  virtual ~PartitionerCb() {
789  }
790 };
791 
797  public:
806  virtual int32_t partitioner_cb(const Topic *topic,
807  const void *key,
808  size_t key_len,
809  int32_t partition_cnt,
810  void *msg_opaque) = 0;
811 
812  virtual ~PartitionerKeyPointerCb() {
813  }
814 };
815 
816 
817 
826 class RD_EXPORT EventCb {
827  public:
833  virtual void event_cb(Event &event) = 0;
834 
835  virtual ~EventCb() {
836  }
837 };
838 
839 
843 class RD_EXPORT Event {
844  public:
846  enum Type {
850  EVENT_THROTTLE
851  };
852 
854  enum Severity {
855  EVENT_SEVERITY_EMERG = 0,
856  EVENT_SEVERITY_ALERT = 1,
857  EVENT_SEVERITY_CRITICAL = 2,
858  EVENT_SEVERITY_ERROR = 3,
859  EVENT_SEVERITY_WARNING = 4,
860  EVENT_SEVERITY_NOTICE = 5,
861  EVENT_SEVERITY_INFO = 6,
862  EVENT_SEVERITY_DEBUG = 7
863  };
864 
865  virtual ~Event() {
866  }
867 
868  /*
869  * Event Accessor methods
870  */
871 
876  virtual Type type() const = 0;
877 
882  virtual ErrorCode err() const = 0;
883 
888  virtual Severity severity() const = 0;
889 
894  virtual std::string fac() const = 0;
895 
904  virtual std::string str() const = 0;
905 
910  virtual int throttle_time() const = 0;
911 
916  virtual std::string broker_name() const = 0;
917 
922  virtual int broker_id() const = 0;
923 
924 
930  virtual bool fatal() const = 0;
931 };
932 
933 
934 
938 class RD_EXPORT ConsumeCb {
939  public:
947  virtual void consume_cb(Message &message, void *opaque) = 0;
948 
949  virtual ~ConsumeCb() {
950  }
951 };
952 
953 
957 class RD_EXPORT RebalanceCb {
958  public:
1027  virtual void rebalance_cb(RdKafka::KafkaConsumer *consumer,
1028  RdKafka::ErrorCode err,
1029  std::vector<TopicPartition *> &partitions) = 0;
1030 
1031  virtual ~RebalanceCb() {
1032  }
1033 };
1034 
1035 
1039 class RD_EXPORT OffsetCommitCb {
1040  public:
1056  virtual void offset_commit_cb(RdKafka::ErrorCode err,
1057  std::vector<TopicPartition *> &offsets) = 0;
1058 
1059  virtual ~OffsetCommitCb() {
1060  }
1061 };
1062 
1063 
1064 
1070 class RD_EXPORT SslCertificateVerifyCb {
1071  public:
1108  virtual bool ssl_cert_verify_cb(const std::string &broker_name,
1109  int32_t broker_id,
1110  int *x509_error,
1111  int depth,
1112  const char *buf,
1113  size_t size,
1114  std::string &errstr) = 0;
1115 
1116  virtual ~SslCertificateVerifyCb() {
1117  }
1118 };
1119 
1120 
1125 class RD_EXPORT SocketCb {
1126  public:
1140  virtual int socket_cb(int domain, int type, int protocol) = 0;
1141 
1142  virtual ~SocketCb() {
1143  }
1144 };
1145 
1146 
1151 class RD_EXPORT OpenCb {
1152  public:
1164  virtual int open_cb(const std::string &path, int flags, int mode) = 0;
1165 
1166  virtual ~OpenCb() {
1167  }
1168 };
1169 
1170 
1190 class RD_EXPORT Conf {
1191  public:
1195  enum ConfType {
1197  CONF_TOPIC
1198  };
1199 
1203  enum ConfResult {
1204  CONF_UNKNOWN = -2,
1205  CONF_INVALID = -1,
1206  CONF_OK = 0
1207  };
1208 
1209 
1213  static Conf *create(ConfType type);
1214 
1215  virtual ~Conf() {
1216  }
1217 
1231  virtual Conf::ConfResult set(const std::string &name,
1232  const std::string &value,
1233  std::string &errstr) = 0;
1234 
1236  virtual Conf::ConfResult set(const std::string &name,
1237  DeliveryReportCb *dr_cb,
1238  std::string &errstr) = 0;
1239 
1241  virtual Conf::ConfResult set(
1242  const std::string &name,
1243  OAuthBearerTokenRefreshCb *oauthbearer_token_refresh_cb,
1244  std::string &errstr) = 0;
1245 
1247  virtual Conf::ConfResult set(const std::string &name,
1248  EventCb *event_cb,
1249  std::string &errstr) = 0;
1250 
1258  virtual Conf::ConfResult set(const std::string &name,
1259  const Conf *topic_conf,
1260  std::string &errstr) = 0;
1261 
1263  virtual Conf::ConfResult set(const std::string &name,
1264  PartitionerCb *partitioner_cb,
1265  std::string &errstr) = 0;
1266 
1268  virtual Conf::ConfResult set(const std::string &name,
1269  PartitionerKeyPointerCb *partitioner_kp_cb,
1270  std::string &errstr) = 0;
1271 
1273  virtual Conf::ConfResult set(const std::string &name,
1274  SocketCb *socket_cb,
1275  std::string &errstr) = 0;
1276 
1278  virtual Conf::ConfResult set(const std::string &name,
1279  OpenCb *open_cb,
1280  std::string &errstr) = 0;
1281 
1283  virtual Conf::ConfResult set(const std::string &name,
1284  RebalanceCb *rebalance_cb,
1285  std::string &errstr) = 0;
1286 
1288  virtual Conf::ConfResult set(const std::string &name,
1289  OffsetCommitCb *offset_commit_cb,
1290  std::string &errstr) = 0;
1291 
1296  virtual Conf::ConfResult set(const std::string &name,
1297  SslCertificateVerifyCb *ssl_cert_verify_cb,
1298  std::string &errstr) = 0;
1299 
1340  virtual Conf::ConfResult set_ssl_cert(RdKafka::CertificateType cert_type,
1342  const void *buffer,
1343  size_t size,
1344  std::string &errstr) = 0;
1345 
1358  virtual Conf::ConfResult get(const std::string &name,
1359  std::string &value) const = 0;
1360 
1364  virtual Conf::ConfResult get(DeliveryReportCb *&dr_cb) const = 0;
1365 
1369  virtual Conf::ConfResult get(
1370  OAuthBearerTokenRefreshCb *&oauthbearer_token_refresh_cb) const = 0;
1371 
1375  virtual Conf::ConfResult get(EventCb *&event_cb) const = 0;
1376 
1380  virtual Conf::ConfResult get(PartitionerCb *&partitioner_cb) const = 0;
1381 
1385  virtual Conf::ConfResult get(
1386  PartitionerKeyPointerCb *&partitioner_kp_cb) const = 0;
1387 
1391  virtual Conf::ConfResult get(SocketCb *&socket_cb) const = 0;
1392 
1396  virtual Conf::ConfResult get(OpenCb *&open_cb) const = 0;
1397 
1401  virtual Conf::ConfResult get(RebalanceCb *&rebalance_cb) const = 0;
1402 
1406  virtual Conf::ConfResult get(OffsetCommitCb *&offset_commit_cb) const = 0;
1407 
1409  virtual Conf::ConfResult get(
1410  SslCertificateVerifyCb *&ssl_cert_verify_cb) const = 0;
1411 
1414  virtual std::list<std::string> *dump() = 0;
1415 
1417  virtual Conf::ConfResult set(const std::string &name,
1418  ConsumeCb *consume_cb,
1419  std::string &errstr) = 0;
1420 
1437  virtual struct rd_kafka_conf_s *c_ptr_global() = 0;
1438 
1456  virtual struct rd_kafka_topic_conf_s *c_ptr_topic() = 0;
1457 
1470  virtual Conf::ConfResult set_engine_callback_data(void *value,
1471  std::string &errstr) = 0;
1472 
1473 
1496  virtual Conf::ConfResult enable_sasl_queue(bool enable,
1497  std::string &errstr) = 0;
1498 };
1499 
1512 class RD_EXPORT Handle {
1513  public:
1514  virtual ~Handle() {
1515  }
1516 
1518  virtual std::string name() const = 0;
1519 
1528  virtual std::string memberid() const = 0;
1529 
1530 
1555  virtual int poll(int timeout_ms) = 0;
1556 
1563  virtual int outq_len() = 0;
1564 
1580  virtual ErrorCode metadata(bool all_topics,
1581  const Topic *only_rkt,
1582  Metadata **metadatap,
1583  int timeout_ms) = 0;
1584 
1585 
1595  virtual ErrorCode pause(std::vector<TopicPartition *> &partitions) = 0;
1596 
1597 
1607  virtual ErrorCode resume(std::vector<TopicPartition *> &partitions) = 0;
1608 
1609 
1618  virtual ErrorCode query_watermark_offsets(const std::string &topic,
1619  int32_t partition,
1620  int64_t *low,
1621  int64_t *high,
1622  int timeout_ms) = 0;
1623 
1641  virtual ErrorCode get_watermark_offsets(const std::string &topic,
1642  int32_t partition,
1643  int64_t *low,
1644  int64_t *high) = 0;
1645 
1646 
1668  virtual ErrorCode offsetsForTimes(std::vector<TopicPartition *> &offsets,
1669  int timeout_ms) = 0;
1670 
1671 
1680  virtual Queue *get_partition_queue(const TopicPartition *partition) = 0;
1681 
1698  virtual ErrorCode set_log_queue(Queue *queue) = 0;
1699 
1711  virtual void yield() = 0;
1712 
1727  virtual std::string clusterid(int timeout_ms) = 0;
1728 
1745  virtual struct rd_kafka_s *c_ptr() = 0;
1746 
1762  virtual int32_t controllerid(int timeout_ms) = 0;
1763 
1764 
1786  virtual ErrorCode fatal_error(std::string &errstr) const = 0;
1787 
1827  virtual ErrorCode oauthbearer_set_token(
1828  const std::string &token_value,
1829  int64_t md_lifetime_ms,
1830  const std::string &md_principal_name,
1831  const std::list<std::string> &extensions,
1832  std::string &errstr) = 0;
1833 
1851  virtual ErrorCode oauthbearer_set_token_failure(
1852  const std::string &errstr) = 0;
1853 
1861  virtual Error *sasl_background_callbacks_enable() = 0;
1862 
1863 
1869  virtual Queue *get_sasl_queue() = 0;
1870 
1874  virtual Queue *get_background_queue() = 0;
1875 
1876 
1877 
1888  virtual void *mem_malloc(size_t size) = 0;
1889 
1903  virtual void mem_free(void *ptr) = 0;
1904 
1919  virtual Error *sasl_set_credentials(const std::string &username,
1920  const std::string &password) = 0;
1921 };
1922 
1923 
1942 class RD_EXPORT TopicPartition {
1943  public:
1949  static TopicPartition *create(const std::string &topic, int partition);
1950 
1957  static TopicPartition *create(const std::string &topic,
1958  int partition,
1959  int64_t offset);
1960 
1961  virtual ~TopicPartition() = 0;
1962 
1967  static void destroy(std::vector<TopicPartition *> &partitions);
1968 
1970  virtual const std::string &topic() const = 0;
1971 
1973  virtual int partition() const = 0;
1974 
1976  virtual int64_t offset() const = 0;
1977 
1979  virtual void set_offset(int64_t offset) = 0;
1980 
1982  virtual ErrorCode err() const = 0;
1983 
1985  virtual int32_t get_leader_epoch() = 0;
1986 
1988  virtual void set_leader_epoch(int32_t leader_epoch) = 0;
1989 };
1990 
1991 
1992 
1997 class RD_EXPORT Topic {
1998  public:
2005  static const int32_t PARTITION_UA;
2006 
2008  static const int64_t OFFSET_BEGINNING;
2009  static const int64_t OFFSET_END;
2010  static const int64_t OFFSET_STORED;
2011  static const int64_t OFFSET_INVALID;
2023  static Topic *create(Handle *base,
2024  const std::string &topic_str,
2025  const Conf *conf,
2026  std::string &errstr);
2027 
2028  virtual ~Topic() = 0;
2029 
2030 
2032  virtual std::string name() const = 0;
2033 
2039  virtual bool partition_available(int32_t partition) const = 0;
2040 
2057  virtual ErrorCode offset_store(int32_t partition, int64_t offset) = 0;
2058 
2075  virtual struct rd_kafka_topic_s *c_ptr() = 0;
2076 };
2077 
2078 
2100 class RD_EXPORT MessageTimestamp {
2101  public:
2106  MSG_TIMESTAMP_LOG_APPEND_TIME
2107  };
2108 
2110  int64_t timestamp;
2111 };
2112 
2113 
2123 class RD_EXPORT Headers {
2124  public:
2125  virtual ~Headers() = 0;
2126 
2135  class Header {
2136  public:
2147  Header(const std::string &key, const void *value, size_t value_size) :
2148  key_(key), err_(ERR_NO_ERROR), value_size_(value_size) {
2149  value_ = copy_value(value, value_size);
2150  }
2151 
2165  Header(const std::string &key,
2166  const void *value,
2167  size_t value_size,
2168  const RdKafka::ErrorCode err) :
2169  key_(key), err_(err), value_(NULL), value_size_(value_size) {
2170  if (err == ERR_NO_ERROR)
2171  value_ = copy_value(value, value_size);
2172  }
2173 
2179  Header(const Header &other) :
2180  key_(other.key_), err_(other.err_), value_size_(other.value_size_) {
2181  value_ = copy_value(other.value_, value_size_);
2182  }
2183 
2189  Header &operator=(const Header &other) {
2190  if (&other == this) {
2191  return *this;
2192  }
2193 
2194  key_ = other.key_;
2195  err_ = other.err_;
2196  value_size_ = other.value_size_;
2197 
2198  if (value_ != NULL)
2199  mem_free(value_);
2200 
2201  value_ = copy_value(other.value_, value_size_);
2202 
2203  return *this;
2204  }
2205 
2206  ~Header() {
2207  if (value_ != NULL)
2208  mem_free(value_);
2209  }
2210 
2212  std::string key() const {
2213  return key_;
2214  }
2215 
2217  const void *value() const {
2218  return value_;
2219  }
2220 
2223  const char *value_string() const {
2224  return static_cast<const char *>(value_);
2225  }
2226 
2228  size_t value_size() const {
2229  return value_size_;
2230  }
2231 
2234  return err_;
2235  }
2236 
2237  private:
2238  char *copy_value(const void *value, size_t value_size) {
2239  if (!value)
2240  return NULL;
2241 
2242  char *dest = (char *)mem_malloc(value_size + 1);
2243  memcpy(dest, (const char *)value, value_size);
2244  dest[value_size] = '\0';
2245 
2246  return dest;
2247  }
2248 
2249  std::string key_;
2250  RdKafka::ErrorCode err_;
2251  char *value_;
2252  size_t value_size_;
2253  void *operator new(size_t); /* Prevent dynamic allocation */
2254  };
2255 
2261  static Headers *create();
2262 
2271  static Headers *create(const std::vector<Header> &headers);
2272 
2282  virtual ErrorCode add(const std::string &key,
2283  const void *value,
2284  size_t value_size) = 0;
2285 
2296  virtual ErrorCode add(const std::string &key, const std::string &value) = 0;
2297 
2307  virtual ErrorCode add(const Header &header) = 0;
2308 
2316  virtual ErrorCode remove(const std::string &key) = 0;
2317 
2327  virtual std::vector<Header> get(const std::string &key) const = 0;
2328 
2339  virtual Header get_last(const std::string &key) const = 0;
2340 
2346  virtual std::vector<Header> get_all() const = 0;
2347 
2351  virtual size_t size() const = 0;
2352 };
2353 
2354 
2366 class RD_EXPORT Message {
2367  public:
2370  enum Status {
2374  MSG_STATUS_NOT_PERSISTED = 0,
2375 
2379  MSG_STATUS_POSSIBLY_PERSISTED = 1,
2380 
2384  MSG_STATUS_PERSISTED = 2,
2385  };
2386 
2394  virtual std::string errstr() const = 0;
2395 
2397  virtual ErrorCode err() const = 0;
2398 
2403  virtual Topic *topic() const = 0;
2404 
2406  virtual std::string topic_name() const = 0;
2407 
2409  virtual int32_t partition() const = 0;
2410 
2412  virtual void *payload() const = 0;
2413 
2415  virtual size_t len() const = 0;
2416 
2418  virtual const std::string *key() const = 0;
2419 
2421  virtual const void *key_pointer() const = 0;
2422 
2424  virtual size_t key_len() const = 0;
2425 
2427  virtual int64_t offset() const = 0;
2428 
2430  virtual MessageTimestamp timestamp() const = 0;
2431 
2433  virtual void *msg_opaque() const = 0;
2434 
2435  virtual ~Message() = 0;
2436 
2439  virtual int64_t latency() const = 0;
2440 
2457  virtual struct rd_kafka_message_s *c_ptr() = 0;
2458 
2462  virtual Status status() const = 0;
2463 
2468  virtual RdKafka::Headers *headers() = 0;
2469 
2476  virtual RdKafka::Headers *headers(RdKafka::ErrorCode *err) = 0;
2477 
2480  virtual int32_t broker_id() const = 0;
2481 
2484  virtual int32_t leader_epoch() const = 0;
2485 
2505  virtual Error *offset_store() = 0;
2506 };
2507 
2531 class RD_EXPORT Queue {
2532  public:
2536  static Queue *create(Handle *handle);
2537 
2548  virtual ErrorCode forward(Queue *dst) = 0;
2549 
2550 
2562  virtual Message *consume(int timeout_ms) = 0;
2563 
2571  virtual int poll(int timeout_ms) = 0;
2572 
2573  virtual ~Queue() = 0;
2574 
2590  virtual void io_event_enable(int fd, const void *payload, size_t size) = 0;
2591 };
2592 
2606 class RD_EXPORT ConsumerGroupMetadata {
2607  public:
2608  virtual ~ConsumerGroupMetadata() = 0;
2609 };
2610 
2628 class RD_EXPORT KafkaConsumer : public virtual Handle {
2629  public:
2641  static KafkaConsumer *create(const Conf *conf, std::string &errstr);
2642 
2643  virtual ~KafkaConsumer() = 0;
2644 
2645 
2648  virtual ErrorCode assignment(
2649  std::vector<RdKafka::TopicPartition *> &partitions) = 0;
2650 
2653  virtual ErrorCode subscription(std::vector<std::string> &topics) = 0;
2654 
2689  virtual ErrorCode subscribe(const std::vector<std::string> &topics) = 0;
2690 
2692  virtual ErrorCode unsubscribe() = 0;
2693 
2700  virtual ErrorCode assign(const std::vector<TopicPartition *> &partitions) = 0;
2701 
2705  virtual ErrorCode unassign() = 0;
2706 
2731  virtual Message *consume(int timeout_ms) = 0;
2732 
2746  virtual ErrorCode commitSync() = 0;
2747 
2753  virtual ErrorCode commitAsync() = 0;
2754 
2764  virtual ErrorCode commitSync(Message *message) = 0;
2765 
2775  virtual ErrorCode commitAsync(Message *message) = 0;
2776 
2786  virtual ErrorCode commitSync(std::vector<TopicPartition *> &offsets) = 0;
2787 
2797  virtual ErrorCode commitAsync(
2798  const std::vector<TopicPartition *> &offsets) = 0;
2799 
2810  virtual ErrorCode commitSync(OffsetCommitCb *offset_commit_cb) = 0;
2811 
2822  virtual ErrorCode commitSync(std::vector<TopicPartition *> &offsets,
2823  OffsetCommitCb *offset_commit_cb) = 0;
2824 
2825 
2826 
2835  virtual ErrorCode committed(std::vector<TopicPartition *> &partitions,
2836  int timeout_ms) = 0;
2837 
2846  virtual ErrorCode position(std::vector<TopicPartition *> &partitions) = 0;
2847 
2848 
2871  virtual ErrorCode close() = 0;
2872 
2873 
2891  virtual ErrorCode seek(const TopicPartition &partition, int timeout_ms) = 0;
2892 
2893 
2914  virtual ErrorCode offsets_store(std::vector<TopicPartition *> &offsets) = 0;
2915 
2916 
2927  virtual ConsumerGroupMetadata *groupMetadata() = 0;
2928 
2929 
2944  virtual bool assignment_lost() = 0;
2945 
2961  virtual std::string rebalance_protocol() = 0;
2962 
2963 
2979  virtual Error *incremental_assign(
2980  const std::vector<TopicPartition *> &partitions) = 0;
2981 
2982 
2998  virtual Error *incremental_unassign(
2999  const std::vector<TopicPartition *> &partitions) = 0;
3000 
3018  virtual Error *close(Queue *queue) = 0;
3019 
3020 
3025  virtual bool closed() = 0;
3026 };
3027 
3028 
3043 class RD_EXPORT Consumer : public virtual Handle {
3044  public:
3055  static Consumer *create(const Conf *conf, std::string &errstr);
3056 
3057  virtual ~Consumer() = 0;
3058 
3059 
3079  virtual ErrorCode start(Topic *topic, int32_t partition, int64_t offset) = 0;
3080 
3087  virtual ErrorCode start(Topic *topic,
3088  int32_t partition,
3089  int64_t offset,
3090  Queue *queue) = 0;
3091 
3101  virtual ErrorCode stop(Topic *topic, int32_t partition) = 0;
3102 
3117  virtual ErrorCode seek(Topic *topic,
3118  int32_t partition,
3119  int64_t offset,
3120  int timeout_ms) = 0;
3121 
3139  virtual Message *consume(Topic *topic, int32_t partition, int timeout_ms) = 0;
3140 
3162  virtual Message *consume(Queue *queue, int timeout_ms) = 0;
3163 
3183  virtual int consume_callback(Topic *topic,
3184  int32_t partition,
3185  int timeout_ms,
3186  ConsumeCb *consume_cb,
3187  void *opaque) = 0;
3188 
3195  virtual int consume_callback(Queue *queue,
3196  int timeout_ms,
3197  RdKafka::ConsumeCb *consume_cb,
3198  void *opaque) = 0;
3199 
3209  static int64_t OffsetTail(int64_t offset);
3210 };
3211 
3225 class RD_EXPORT Producer : public virtual Handle {
3226  public:
3237  static Producer *create(const Conf *conf, std::string &errstr);
3238 
3239 
3240  virtual ~Producer() = 0;
3241 
3247  enum {
3248  RK_MSG_FREE = 0x1,
3251  RK_MSG_COPY = 0x2,
3256  RK_MSG_BLOCK = 0x4
3273  /* For backwards compatibility: */
3274 #ifndef MSG_COPY /* defined in sys/msg.h */
3275  ,
3278  MSG_FREE = RK_MSG_FREE,
3279  MSG_COPY = RK_MSG_COPY
3280 #endif
3281 
3282  };
3283 
3340  virtual ErrorCode produce(Topic *topic,
3341  int32_t partition,
3342  int msgflags,
3343  void *payload,
3344  size_t len,
3345  const std::string *key,
3346  void *msg_opaque) = 0;
3347 
3352  virtual ErrorCode produce(Topic *topic,
3353  int32_t partition,
3354  int msgflags,
3355  void *payload,
3356  size_t len,
3357  const void *key,
3358  size_t key_len,
3359  void *msg_opaque) = 0;
3360 
3367  virtual ErrorCode produce(const std::string topic_name,
3368  int32_t partition,
3369  int msgflags,
3370  void *payload,
3371  size_t len,
3372  const void *key,
3373  size_t key_len,
3374  int64_t timestamp,
3375  void *msg_opaque) = 0;
3376 
3384  virtual ErrorCode produce(const std::string topic_name,
3385  int32_t partition,
3386  int msgflags,
3387  void *payload,
3388  size_t len,
3389  const void *key,
3390  size_t key_len,
3391  int64_t timestamp,
3392  RdKafka::Headers *headers,
3393  void *msg_opaque) = 0;
3394 
3395 
3400  virtual ErrorCode produce(Topic *topic,
3401  int32_t partition,
3402  const std::vector<char> *payload,
3403  const std::vector<char> *key,
3404  void *msg_opaque) = 0;
3405 
3406 
3422  virtual ErrorCode flush(int timeout_ms) = 0;
3423 
3424 
3452  virtual ErrorCode purge(int purge_flags) = 0;
3453 
3457  enum {
3458  PURGE_QUEUE = 0x1,
3460  PURGE_INFLIGHT = 0x2,
3467  PURGE_NON_BLOCKING = 0x4 /* Don't wait for background queue
3468  * purging to finish. */
3469  };
3470 
3497  virtual Error *init_transactions(int timeout_ms) = 0;
3498 
3499 
3512  virtual Error *begin_transaction() = 0;
3513 
3560  virtual Error *send_offsets_to_transaction(
3561  const std::vector<TopicPartition *> &offsets,
3562  const ConsumerGroupMetadata *group_metadata,
3563  int timeout_ms) = 0;
3564 
3593  virtual Error *commit_transaction(int timeout_ms) = 0;
3594 
3625  virtual Error *abort_transaction(int timeout_ms) = 0;
3626 
3628 };
3629 
3644  public:
3646  virtual int32_t id() const = 0;
3647 
3649  virtual std::string host() const = 0;
3650 
3652  virtual int port() const = 0;
3653 
3654  virtual ~BrokerMetadata() = 0;
3655 };
3656 
3657 
3658 
3663  public:
3665  typedef std::vector<int32_t> ReplicasVector;
3667  typedef std::vector<int32_t> ISRSVector;
3668 
3670  typedef ReplicasVector::const_iterator ReplicasIterator;
3672  typedef ISRSVector::const_iterator ISRSIterator;
3673 
3674 
3676  virtual int32_t id() const = 0;
3677 
3679  virtual ErrorCode err() const = 0;
3680 
3682  virtual int32_t leader() const = 0;
3683 
3685  virtual const std::vector<int32_t> *replicas() const = 0;
3686 
3690  virtual const std::vector<int32_t> *isrs() const = 0;
3691 
3692  virtual ~PartitionMetadata() = 0;
3693 };
3694 
3695 
3696 
3701  public:
3703  typedef std::vector<const PartitionMetadata *> PartitionMetadataVector;
3705  typedef PartitionMetadataVector::const_iterator PartitionMetadataIterator;
3706 
3708  virtual std::string topic() const = 0;
3709 
3711  virtual const PartitionMetadataVector *partitions() const = 0;
3712 
3714  virtual ErrorCode err() const = 0;
3715 
3716  virtual ~TopicMetadata() = 0;
3717 };
3718 
3719 
3723 class Metadata {
3724  public:
3726  typedef std::vector<const BrokerMetadata *> BrokerMetadataVector;
3728  typedef std::vector<const TopicMetadata *> TopicMetadataVector;
3729 
3731  typedef BrokerMetadataVector::const_iterator BrokerMetadataIterator;
3733  typedef TopicMetadataVector::const_iterator TopicMetadataIterator;
3734 
3735 
3741  virtual const BrokerMetadataVector *brokers() const = 0;
3742 
3748  virtual const TopicMetadataVector *topics() const = 0;
3749 
3751  virtual int32_t orig_broker_id() const = 0;
3752 
3754  virtual std::string orig_broker_name() const = 0;
3755 
3756  virtual ~Metadata() = 0;
3757 };
3758 
3761 } // namespace RdKafka
3762 
3763 
3764 #endif /* _RDKAFKACPP_H_ */
RdKafka::ERR_INVALID_REPLICA_ASSIGNMENT
@ ERR_INVALID_REPLICA_ASSIGNMENT
Definition: rdkafkacpp.h:421
RdKafka::ERR__TIMED_OUT_QUEUE
@ ERR__TIMED_OUT_QUEUE
Definition: rdkafkacpp.h:274
RdKafka::ERR_INVALID_SESSION_TIMEOUT
@ ERR_INVALID_SESSION_TIMEOUT
Definition: rdkafkacpp.h:395
RdKafka::BrokerMetadata::port
virtual int port() const =0
RdKafka::PartitionMetadata
Metadata: Partition information.
Definition: rdkafkacpp.h:3662
RdKafka::ERR__AUTO_OFFSET_RESET
@ ERR__AUTO_OFFSET_RESET
Definition: rdkafkacpp.h:326
RdKafka::ERR_NOT_COORDINATOR
@ ERR_NOT_COORDINATOR
Definition: rdkafkacpp.h:373
RdKafka::ERR_INVALID_PARTITIONS
@ ERR_INVALID_PARTITIONS
Definition: rdkafkacpp.h:417
RdKafka::PartitionMetadata::replicas
virtual const std::vector< int32_t > * replicas() const =0
RdKafka::ERR_INVALID_PRINCIPAL_TYPE
@ ERR_INVALID_PRINCIPAL_TYPE
Definition: rdkafkacpp.h:482
rd_kafka_message_s
A Kafka message as returned by the rd_kafka_consume*() family of functions as well as provided to the...
Definition: rdkafka.h:1440
RdKafka::CERT_CA
@ CERT_CA
Definition: rdkafkacpp.h:565
RdKafka::OAuthBearerTokenRefreshCb
SASL/OAUTHBEARER token refresh callback class.
Definition: rdkafkacpp.h:739
RdKafka::ERR_DUPLICATE_RESOURCE
@ ERR_DUPLICATE_RESOURCE
Definition: rdkafkacpp.h:535
RdKafka::TopicMetadata::err
virtual ErrorCode err() const =0
RdKafka::Topic::OFFSET_END
static const int64_t OFFSET_END
Definition: rdkafkacpp.h:2009
RdKafka::ERR_NOT_LEADER_FOR_PARTITION
@ ERR_NOT_LEADER_FOR_PARTITION
Definition: rdkafkacpp.h:349
RdKafka::PartitionMetadata::isrs
virtual const std::vector< int32_t > * isrs() const =0
RdKafka::ERR__UNDERFLOW
@ ERR__UNDERFLOW
Definition: rdkafkacpp.h:296
RdKafka::KafkaConsumer
High-level KafkaConsumer (for brokers 0.9 and later)
Definition: rdkafkacpp.h:2628
RdKafka::ERR_INCONSISTENT_GROUP_PROTOCOL
@ ERR_INCONSISTENT_GROUP_PROTOCOL
Definition: rdkafkacpp.h:389
RdKafka::Headers::Header::key
std::string key() const
Definition: rdkafkacpp.h:2212
RdKafka::ERR_ELECTION_NOT_NEEDED
@ ERR_ELECTION_NOT_NEEDED
Definition: rdkafkacpp.h:517
RdKafka::ERR__QUEUE_FULL
@ ERR__QUEUE_FULL
Definition: rdkafkacpp.h:238
RdKafka::ERR_OPERATION_NOT_ATTEMPTED
@ ERR_OPERATION_NOT_ATTEMPTED
Definition: rdkafkacpp.h:458
RdKafka::ERR_DELEGATION_TOKEN_REQUEST_NOT_ALLOWED
@ ERR_DELEGATION_TOKEN_REQUEST_NOT_ALLOWED
Definition: rdkafkacpp.h:476
RdKafka::ERR__CONFLICT
@ ERR__CONFLICT
Definition: rdkafkacpp.h:260
rd_kafka_message_s::err
rd_kafka_resp_err_t err
Definition: rdkafka.h:1441
RdKafka::Conf::CONF_GLOBAL
@ CONF_GLOBAL
Definition: rdkafkacpp.h:1196
RdKafka::DeliveryReportCb
Delivery Report callback class.
Definition: rdkafkacpp.h:700
RdKafka::ERR_INVALID_RECORD
@ ERR_INVALID_RECORD
Definition: rdkafkacpp.h:524
RdKafka::ERR__NOENT
@ ERR__NOENT
Definition: rdkafkacpp.h:294
RdKafka::ERR__INCONSISTENT
@ ERR__INCONSISTENT
Definition: rdkafkacpp.h:308
RdKafka::ERR_GROUP_ID_NOT_FOUND
@ ERR_GROUP_ID_NOT_FOUND
Definition: rdkafkacpp.h:486
RdKafka::ERR_ILLEGAL_SASL_STATE
@ ERR_ILLEGAL_SASL_STATE
Definition: rdkafkacpp.h:411
RdKafka::ERR_NETWORK_EXCEPTION
@ ERR_NETWORK_EXCEPTION
Definition: rdkafkacpp.h:363
RdKafka::RebalanceCb
KafkaConsumer: Rebalance callback class
Definition: rdkafkacpp.h:957
RdKafka::ERR__BAD_MSG
@ ERR__BAD_MSG
Definition: rdkafkacpp.h:205
RdKafka::Topic::OFFSET_STORED
static const int64_t OFFSET_STORED
Definition: rdkafkacpp.h:2010
RdKafka::ERR_CLUSTER_AUTHORIZATION_FAILED
@ ERR_CLUSTER_AUTHORIZATION_FAILED
Definition: rdkafkacpp.h:405
RdKafka::ERR__KEY_SERIALIZATION
@ ERR__KEY_SERIALIZATION
Definition: rdkafkacpp.h:282
RdKafka::ERR__CRIT_SYS_RESOURCE
@ ERR__CRIT_SYS_RESOURCE
Definition: rdkafkacpp.h:215
RdKafka::Queue
Queue interface.
Definition: rdkafkacpp.h:2531
RdKafka::ERR__FS
@ ERR__FS
Definition: rdkafkacpp.h:228
RdKafka::ERR_INVALID_TXN_STATE
@ ERR_INVALID_TXN_STATE
Definition: rdkafkacpp.h:439
RdKafka::ERR__UNSUPPORTED_FEATURE
@ ERR__UNSUPPORTED_FEATURE
Definition: rdkafkacpp.h:276
RdKafka::ERR_UNKNOWN_MEMBER_ID
@ ERR_UNKNOWN_MEMBER_ID
Definition: rdkafkacpp.h:393
RdKafka::ERR__PURGE_QUEUE
@ ERR__PURGE_QUEUE
Definition: rdkafkacpp.h:302
RdKafka::ERR_INVALID_GROUP_ID
@ ERR_INVALID_GROUP_ID
Definition: rdkafkacpp.h:391
RdKafka::ERR_THROTTLING_QUOTA_EXCEEDED
@ ERR_THROTTLING_QUOTA_EXCEEDED
Definition: rdkafkacpp.h:528
RdKafka::TopicMetadata::partitions
virtual const PartitionMetadataVector * partitions() const =0
RdKafka::ERR_INVALID_TIMESTAMP
@ ERR_INVALID_TIMESTAMP
Definition: rdkafkacpp.h:407
RdKafka::ERR__FAIL
@ ERR__FAIL
Definition: rdkafkacpp.h:211
RdKafka::Metadata::brokers
virtual const BrokerMetadataVector * brokers() const =0
Broker list.
RdKafka::Event::EVENT_LOG
@ EVENT_LOG
Definition: rdkafkacpp.h:849
RdKafka::ERR__MSG_TIMED_OUT
@ ERR__MSG_TIMED_OUT
Definition: rdkafkacpp.h:219
RdKafka::Metadata::TopicMetadataVector
std::vector< const TopicMetadata * > TopicMetadataVector
Topics.
Definition: rdkafkacpp.h:3728
RdKafka::ERR__VALUE_SERIALIZATION
@ ERR__VALUE_SERIALIZATION
Definition: rdkafkacpp.h:284
RdKafka::ERR__SSL
@ ERR__SSL
Definition: rdkafkacpp.h:244
RdKafka::Event::Severity
Severity
EVENT_LOG severities (conforms to syslog(3) severities)
Definition: rdkafkacpp.h:854
RdKafka::ErrorCode
ErrorCode
Error codes.
Definition: rdkafkacpp.h:200
RdKafka::ERR__RESOLVE
@ ERR__RESOLVE
Definition: rdkafkacpp.h:217
RdKafka::ERR__NOT_CONFIGURED
@ ERR__NOT_CONFIGURED
Definition: rdkafkacpp.h:316
RdKafka::ERR__TRANSPORT
@ ERR__TRANSPORT
Definition: rdkafkacpp.h:213
RdKafka::ERR__ISR_INSUFF
@ ERR__ISR_INSUFF
Definition: rdkafkacpp.h:240
RdKafka::ERR_NOT_ENOUGH_REPLICAS_AFTER_APPEND
@ ERR_NOT_ENOUGH_REPLICAS_AFTER_APPEND
Definition: rdkafkacpp.h:383
RdKafka::ERR__ASSIGN_PARTITIONS
@ ERR__ASSIGN_PARTITIONS
Definition: rdkafkacpp.h:256
RdKafka::ERR_OFFSET_NOT_AVAILABLE
@ ERR_OFFSET_NOT_AVAILABLE
Definition: rdkafkacpp.h:504
RdKafka::ERR_NOT_ENOUGH_REPLICAS
@ ERR_NOT_ENOUGH_REPLICAS
Definition: rdkafkacpp.h:381
RdKafka::Headers::Header::operator=
Header & operator=(const Header &other)
Assignment operator.
Definition: rdkafkacpp.h:2189
RdKafka::ERR__ALL_BROKERS_DOWN
@ ERR__ALL_BROKERS_DOWN
Definition: rdkafkacpp.h:232
RdKafka::ERR_RESOURCE_NOT_FOUND
@ ERR_RESOURCE_NOT_FOUND
Definition: rdkafkacpp.h:533
RdKafka::ERR__NODE_UPDATE
@ ERR__NODE_UPDATE
Definition: rdkafkacpp.h:242
RdKafka::Headers::Header::Header
Header(const std::string &key, const void *value, size_t value_size)
Header object to encapsulate a single Header.
Definition: rdkafkacpp.h:2147
RdKafka::ERR_DELEGATION_TOKEN_EXPIRED
@ ERR_DELEGATION_TOKEN_EXPIRED
Definition: rdkafkacpp.h:480
RdKafka::ERR_UNSUPPORTED_SASL_MECHANISM
@ ERR_UNSUPPORTED_SASL_MECHANISM
Definition: rdkafkacpp.h:409
RdKafka::ERR_TRANSACTION_COORDINATOR_FENCED
@ ERR_TRANSACTION_COORDINATOR_FENCED
Definition: rdkafkacpp.h:452
RdKafka::ERR__BAD_COMPRESSION
@ ERR__BAD_COMPRESSION
Definition: rdkafkacpp.h:207
RdKafka::Event::Type
Type
Event type.
Definition: rdkafkacpp.h:846
RdKafka::Handle
Base handle, super class for specific clients.
Definition: rdkafkacpp.h:1512
RdKafka::ERR__PREV_IN_PROGRESS
@ ERR__PREV_IN_PROGRESS
Definition: rdkafkacpp.h:252
RdKafka::BrokerMetadata::host
virtual std::string host() const =0
RdKafka::ERR_DELEGATION_TOKEN_OWNER_MISMATCH
@ ERR_DELEGATION_TOKEN_OWNER_MISMATCH
Definition: rdkafkacpp.h:474
RdKafka::ERR_SASL_AUTHENTICATION_FAILED
@ ERR_SASL_AUTHENTICATION_FAILED
Definition: rdkafkacpp.h:464
RdKafka::ERR__VALUE_DESERIALIZATION
@ ERR__VALUE_DESERIALIZATION
Definition: rdkafkacpp.h:288
RdKafka::ERR_INVALID_PRODUCER_ID_MAPPING
@ ERR_INVALID_PRODUCER_ID_MAPPING
Definition: rdkafkacpp.h:442
RdKafka::ERR__ASSIGNMENT_LOST
@ ERR__ASSIGNMENT_LOST
Definition: rdkafkacpp.h:322
RdKafka::ERR_PRODUCER_FENCED
@ ERR_PRODUCER_FENCED
Definition: rdkafkacpp.h:531
RdKafka::ERR__WAIT_CACHE
@ ERR__WAIT_CACHE
Definition: rdkafkacpp.h:278
RdKafka::ERR_UNKNOWN_PRODUCER_ID
@ ERR_UNKNOWN_PRODUCER_ID
Definition: rdkafkacpp.h:466
RdKafka::ERR_ELIGIBLE_LEADERS_NOT_AVAILABLE
@ ERR_ELIGIBLE_LEADERS_NOT_AVAILABLE
Definition: rdkafkacpp.h:515
RdKafka::OffsetCommitCb
Offset Commit callback class.
Definition: rdkafkacpp.h:1039
RdKafka::ERR_INVALID_COMMIT_OFFSET_SIZE
@ ERR_INVALID_COMMIT_OFFSET_SIZE
Definition: rdkafkacpp.h:399
RdKafka::Headers::Header
Header object.
Definition: rdkafkacpp.h:2135
RdKafka::MessageTimestamp::type
MessageTimestampType type
Definition: rdkafkacpp.h:2109
RdKafka::ERR_TOPIC_ALREADY_EXISTS
@ ERR_TOPIC_ALREADY_EXISTS
Definition: rdkafkacpp.h:415
RdKafka::ERR_FENCED_INSTANCE_ID
@ ERR_FENCED_INSTANCE_ID
Definition: rdkafkacpp.h:513
RdKafka::ERR_STALE_CTRL_EPOCH
@ ERR_STALE_CTRL_EPOCH
Definition: rdkafkacpp.h:359
RdKafka::ERR__OUTDATED
@ ERR__OUTDATED
Definition: rdkafkacpp.h:272
RdKafka::ERR_MSG_SIZE_TOO_LARGE
@ ERR_MSG_SIZE_TOO_LARGE
Definition: rdkafkacpp.h:357
RdKafka::ERR_COORDINATOR_NOT_AVAILABLE
@ ERR_COORDINATOR_NOT_AVAILABLE
Definition: rdkafkacpp.h:369
RdKafka::Headers::Header::err
RdKafka::ErrorCode err() const
Definition: rdkafkacpp.h:2233
RdKafka::ERR_INCONSISTENT_VOTER_SET
@ ERR_INCONSISTENT_VOTER_SET
Definition: rdkafkacpp.h:540
RdKafka::TopicMetadata::topic
virtual std::string topic() const =0
RdKafka::Event::EVENT_ERROR
@ EVENT_ERROR
Definition: rdkafkacpp.h:847
RdKafka::ERR_INVALID_TRANSACTION_TIMEOUT
@ ERR_INVALID_TRANSACTION_TIMEOUT
Definition: rdkafkacpp.h:445
RdKafka::Metadata::BrokerMetadataVector
std::vector< const BrokerMetadata * > BrokerMetadataVector
Brokers.
Definition: rdkafkacpp.h:3726
RdKafka::MessageTimestamp::timestamp
int64_t timestamp
Definition: rdkafkacpp.h:2110
RdKafka::ERR_UNKNOWN_LEADER_EPOCH
@ ERR_UNKNOWN_LEADER_EPOCH
Definition: rdkafkacpp.h:498
RdKafka::PartitionMetadata::id
virtual int32_t id() const =0
RdKafka::ERR_REASSIGNMENT_IN_PROGRESS
@ ERR_REASSIGNMENT_IN_PROGRESS
Definition: rdkafkacpp.h:468
RdKafka::ERR_INVALID_MSG_SIZE
@ ERR_INVALID_MSG_SIZE
Definition: rdkafkacpp.h:345
RdKafka::ERR__INVALID_ARG
@ ERR__INVALID_ARG
Definition: rdkafkacpp.h:234
RdKafka::ERR__APPLICATION
@ ERR__APPLICATION
Definition: rdkafkacpp.h:320
RdKafka::mem_malloc
RD_EXPORT void * mem_malloc(size_t size)
Allocate memory using the same allocator librdkafka uses.
RdKafka::ERR_OUT_OF_ORDER_SEQUENCE_NUMBER
@ ERR_OUT_OF_ORDER_SEQUENCE_NUMBER
Definition: rdkafkacpp.h:433
RdKafka::ERR_TOPIC_AUTHORIZATION_FAILED
@ ERR_TOPIC_AUTHORIZATION_FAILED
Definition: rdkafkacpp.h:401
RdKafka::ERR__NOT_IMPLEMENTED
@ ERR__NOT_IMPLEMENTED
Definition: rdkafkacpp.h:266
RdKafka::ERR__INTR
@ ERR__INTR
Definition: rdkafkacpp.h:280
RdKafka::ERR_RECORD_LIST_TOO_LARGE
@ ERR_RECORD_LIST_TOO_LARGE
Definition: rdkafkacpp.h:379
RdKafka::PartitionMetadata::ReplicasIterator
ReplicasVector::const_iterator ReplicasIterator
Replicas iterator.
Definition: rdkafkacpp.h:3670
RdKafka::ERR__END
@ ERR__END
Definition: rdkafkacpp.h:331
RdKafka::ERR_DELEGATION_TOKEN_NOT_FOUND
@ ERR_DELEGATION_TOKEN_NOT_FOUND
Definition: rdkafkacpp.h:472
RdKafka::ERR_KAFKA_STORAGE_ERROR
@ ERR_KAFKA_STORAGE_ERROR
Definition: rdkafkacpp.h:460
RdKafka::ERR__IN_PROGRESS
@ ERR__IN_PROGRESS
Definition: rdkafkacpp.h:250
RdKafka::EventCb
Event callback class.
Definition: rdkafkacpp.h:826
RdKafka::ERR__UNKNOWN_PARTITION
@ ERR__UNKNOWN_PARTITION
Definition: rdkafkacpp.h:226
RdKafka::ERR__GAPLESS_GUARANTEE
@ ERR__GAPLESS_GUARANTEE
Definition: rdkafkacpp.h:310
RdKafka::ERR__EXISTING_SUBSCRIPTION
@ ERR__EXISTING_SUBSCRIPTION
Definition: rdkafkacpp.h:254
RdKafka::ERR__TIMED_OUT
@ ERR__TIMED_OUT
Definition: rdkafkacpp.h:236
RdKafka::Conf
Configuration interface.
Definition: rdkafkacpp.h:1190
RdKafka::Headers::Header::value_size
size_t value_size() const
Definition: rdkafkacpp.h:2228
RdKafka::ERR_INVALID_MSG
@ ERR_INVALID_MSG
Definition: rdkafkacpp.h:341
RdKafka::MessageTimestamp::MSG_TIMESTAMP_NOT_AVAILABLE
@ MSG_TIMESTAMP_NOT_AVAILABLE
Definition: rdkafkacpp.h:2104
RdKafka::ERR__STATE
@ ERR__STATE
Definition: rdkafkacpp.h:262
RdKafka::PartitionerKeyPointerCb
Variant partitioner with key pointer.
Definition: rdkafkacpp.h:796
RdKafka::ERR_INVALID_REQUEST
@ ERR_INVALID_REQUEST
Definition: rdkafkacpp.h:427
RdKafka::ERR__FENCED
@ ERR__FENCED
Definition: rdkafkacpp.h:318
RdKafka::ERR__REVOKE_PARTITIONS
@ ERR__REVOKE_PARTITIONS
Definition: rdkafkacpp.h:258
RdKafka::ERR_DUPLICATE_SEQUENCE_NUMBER
@ ERR_DUPLICATE_SEQUENCE_NUMBER
Definition: rdkafkacpp.h:435
RdKafka::CertificateEncoding
CertificateEncoding
SSL certificate encoding.
Definition: rdkafkacpp.h:573
RdKafka::ERR_UNSUPPORTED_COMPRESSION_TYPE
@ ERR_UNSUPPORTED_COMPRESSION_TYPE
Definition: rdkafkacpp.h:500
RdKafka::ConsumerGroupMetadata
ConsumerGroupMetadata holds a consumer instance's group metadata state.
Definition: rdkafkacpp.h:2606
RdKafka::ERR_GROUP_AUTHORIZATION_FAILED
@ ERR_GROUP_AUTHORIZATION_FAILED
Definition: rdkafkacpp.h:403
RdKafka::PartitionerCb
Partitioner callback class.
Definition: rdkafkacpp.h:764
RdKafka::Metadata::topics
virtual const TopicMetadataVector * topics() const =0
Topic list.
RdKafka::TopicMetadata::PartitionMetadataVector
std::vector< const PartitionMetadata * > PartitionMetadataVector
Partitions.
Definition: rdkafkacpp.h:3703
RdKafka::ERR_UNACCEPTABLE_CREDENTIAL
@ ERR_UNACCEPTABLE_CREDENTIAL
Definition: rdkafkacpp.h:537
RdKafka::ERR__UNKNOWN_BROKER
@ ERR__UNKNOWN_BROKER
Definition: rdkafkacpp.h:314
RdKafka::ERR_PREFERRED_LEADER_NOT_AVAILABLE
@ ERR_PREFERRED_LEADER_NOT_AVAILABLE
Definition: rdkafkacpp.h:508
RdKafka::ERR__BEGIN
@ ERR__BEGIN
Definition: rdkafkacpp.h:203
RdKafka::ERR_UNKNOWN_TOPIC_OR_PART
@ ERR_UNKNOWN_TOPIC_OR_PART
Definition: rdkafkacpp.h:343
RdKafka::ConsumeCb
Consume callback class.
Definition: rdkafkacpp.h:938
RdKafka::Conf::ConfType
ConfType
Configuration object type.
Definition: rdkafkacpp.h:1195
RdKafka::Consumer
Simple Consumer (legacy)
Definition: rdkafkacpp.h:3043
RdKafka::ERR_FETCH_SESSION_ID_NOT_FOUND
@ ERR_FETCH_SESSION_ID_NOT_FOUND
Definition: rdkafkacpp.h:488
RdKafka::ERR_REPLICA_NOT_AVAILABLE
@ ERR_REPLICA_NOT_AVAILABLE
Definition: rdkafkacpp.h:355
RdKafka::Message
Message object.
Definition: rdkafkacpp.h:2366
RdKafka::ERR_POLICY_VIOLATION
@ ERR_POLICY_VIOLATION
Definition: rdkafkacpp.h:431
RdKafka::TopicMetadata
Metadata: Topic information.
Definition: rdkafkacpp.h:3700
RdKafka::ERR_BROKER_NOT_AVAILABLE
@ ERR_BROKER_NOT_AVAILABLE
Definition: rdkafkacpp.h:353
RdKafka::ERR__DESTROY
@ ERR__DESTROY
Definition: rdkafkacpp.h:209
RdKafka::Headers::Header::Header
Header(const std::string &key, const void *value, size_t value_size, const RdKafka::ErrorCode err)
Header object to encapsulate a single Header.
Definition: rdkafkacpp.h:2165
RdKafka::SslCertificateVerifyCb
SSL broker certificate verification class.
Definition: rdkafkacpp.h:1070
RdKafka::ERR_OFFSET_METADATA_TOO_LARGE
@ ERR_OFFSET_METADATA_TOO_LARGE
Definition: rdkafkacpp.h:361
RdKafka::ERR__FATAL
@ ERR__FATAL
Definition: rdkafkacpp.h:306
RdKafka::Metadata::TopicMetadataIterator
TopicMetadataVector::const_iterator TopicMetadataIterator
Topics iterator.
Definition: rdkafkacpp.h:3733
RdKafka::Error
The Error class is used as a return value from APIs to propagate an error. The error consists of an e...
Definition: rdkafkacpp.h:615
RdKafka::PartitionMetadata::err
virtual ErrorCode err() const =0
RdKafka::MessageTimestamp::MSG_TIMESTAMP_CREATE_TIME
@ MSG_TIMESTAMP_CREATE_TIME
Definition: rdkafkacpp.h:2105
RdKafka::MessageTimestamp::MessageTimestampType
MessageTimestampType
Definition: rdkafkacpp.h:2103
RdKafka::Topic::OFFSET_BEGINNING
static const int64_t OFFSET_BEGINNING
Special offsets.
Definition: rdkafkacpp.h:2008
RdKafka::Metadata::orig_broker_id
virtual int32_t orig_broker_id() const =0
Broker (id) originating this metadata.
RdKafka::Event
Event object class as passed to the EventCb callback.
Definition: rdkafkacpp.h:843
RdKafka::BrokerMetadata::id
virtual int32_t id() const =0
RdKafka::mem_free
RD_EXPORT void mem_free(void *ptr)
Free pointer returned by librdkafka.
RdKafka::ERR_UNSUPPORTED_VERSION
@ ERR_UNSUPPORTED_VERSION
Definition: rdkafkacpp.h:413
RdKafka::ERR__INVALID_TYPE
@ ERR__INVALID_TYPE
Definition: rdkafkacpp.h:298
RdKafka::ERR_REBALANCE_IN_PROGRESS
@ ERR_REBALANCE_IN_PROGRESS
Definition: rdkafkacpp.h:397
RdKafka::Topic
Topic handle.
Definition: rdkafkacpp.h:1997
RdKafka::PartitionMetadata::ISRSVector
std::vector< int32_t > ISRSVector
ISRs (In-Sync-Replicas)
Definition: rdkafkacpp.h:3667
RdKafka::Topic::PARTITION_UA
static const int32_t PARTITION_UA
Unassigned partition.
Definition: rdkafkacpp.h:2005
RdKafka::ERR_PRINCIPAL_DESERIALIZATION_FAILURE
@ ERR_PRINCIPAL_DESERIALIZATION_FAILURE
Definition: rdkafkacpp.h:546
RdKafka::ERR__UNKNOWN_GROUP
@ ERR__UNKNOWN_GROUP
Definition: rdkafkacpp.h:248
RdKafka::ERR__RETRY
@ ERR__RETRY
Definition: rdkafkacpp.h:300
RdKafka::Headers::Header::value
const void * value() const
Definition: rdkafkacpp.h:2217
RdKafka::ERR_NON_EMPTY_GROUP
@ ERR_NON_EMPTY_GROUP
Definition: rdkafkacpp.h:484
RdKafka::Conf::ConfResult
ConfResult
RdKafka::Conf::Set() result code.
Definition: rdkafkacpp.h:1203
RdKafka::TopicPartition
Topic+Partition.
Definition: rdkafkacpp.h:1942
RdKafka::ERR_MEMBER_ID_REQUIRED
@ ERR_MEMBER_ID_REQUIRED
Definition: rdkafkacpp.h:506
RdKafka::ERR_DELEGATION_TOKEN_AUTHORIZATION_FAILED
@ ERR_DELEGATION_TOKEN_AUTHORIZATION_FAILED
Definition: rdkafkacpp.h:478
RdKafka::ERR__WAIT_COORD
@ ERR__WAIT_COORD
Definition: rdkafkacpp.h:246
RdKafka::Message::Status
Status
Message persistence status can be used by the application to find out if a produced message was persi...
Definition: rdkafkacpp.h:2370
RdKafka::ERR__READ_ONLY
@ ERR__READ_ONLY
Definition: rdkafkacpp.h:292
RdKafka::ERR__NO_OFFSET
@ ERR__NO_OFFSET
Definition: rdkafkacpp.h:270
RdKafka::MessageTimestamp
Message timestamp object.
Definition: rdkafkacpp.h:2100
RdKafka::ERR_UNKNOWN
@ ERR_UNKNOWN
Definition: rdkafkacpp.h:335
RdKafka::ERR_LISTENER_NOT_FOUND
@ ERR_LISTENER_NOT_FOUND
Definition: rdkafkacpp.h:492
RdKafka::CERT_ENC_DER
@ CERT_ENC_DER
Definition: rdkafkacpp.h:575
RdKafka::ERR_LOG_DIR_NOT_FOUND
@ ERR_LOG_DIR_NOT_FOUND
Definition: rdkafkacpp.h:462
RdKafka::ERR_NO_ERROR
@ ERR_NO_ERROR
Definition: rdkafkacpp.h:337
RdKafka::ERR__NOOP
@ ERR__NOOP
Definition: rdkafkacpp.h:324
RdKafka::ERR__UNKNOWN_TOPIC
@ ERR__UNKNOWN_TOPIC
Definition: rdkafkacpp.h:230
RdKafka::BrokerMetadata
Metadata: Broker information.
Definition: rdkafkacpp.h:3643
RdKafka::ERR_OFFSET_OUT_OF_RANGE
@ ERR_OFFSET_OUT_OF_RANGE
Definition: rdkafkacpp.h:339
RdKafka::ERR__MAX_POLL_EXCEEDED
@ ERR__MAX_POLL_EXCEEDED
Definition: rdkafkacpp.h:312
RdKafka::ERR_NOT_CONTROLLER
@ ERR_NOT_CONTROLLER
Definition: rdkafkacpp.h:425
RdKafka::PartitionMetadata::leader
virtual int32_t leader() const =0
RdKafka::ERR__LOG_TRUNCATION
@ ERR__LOG_TRUNCATION
Definition: rdkafkacpp.h:328
RdKafka::CERT_ENC_PEM
@ CERT_ENC_PEM
Definition: rdkafkacpp.h:576
RdKafka::CERT_ENC_PKCS12
@ CERT_ENC_PKCS12
Definition: rdkafkacpp.h:574
RdKafka::ERR_ILLEGAL_GENERATION
@ ERR_ILLEGAL_GENERATION
Definition: rdkafkacpp.h:387
RdKafka::ERR_INVALID_PRODUCER_EPOCH
@ ERR_INVALID_PRODUCER_EPOCH
Definition: rdkafkacpp.h:437
RdKafka::ERR_SECURITY_DISABLED
@ ERR_SECURITY_DISABLED
Definition: rdkafkacpp.h:456
RdKafka::Metadata::orig_broker_name
virtual std::string orig_broker_name() const =0
Broker (name) originating this metadata.
RdKafka::ERR_TRANSACTIONAL_ID_AUTHORIZATION_FAILED
@ ERR_TRANSACTIONAL_ID_AUTHORIZATION_FAILED
Definition: rdkafkacpp.h:454
RdKafka::ERR__AUTHENTICATION
@ ERR__AUTHENTICATION
Definition: rdkafkacpp.h:268
RdKafka::CERT_PRIVATE_KEY
@ CERT_PRIVATE_KEY
Definition: rdkafkacpp.h:564
RdKafka::Producer
Producer.
Definition: rdkafkacpp.h:3225
RdKafka::ERR_LEADER_NOT_AVAILABLE
@ ERR_LEADER_NOT_AVAILABLE
Definition: rdkafkacpp.h:347
RdKafka::ERR_GROUP_SUBSCRIBED_TO_TOPIC
@ ERR_GROUP_SUBSCRIBED_TO_TOPIC
Definition: rdkafkacpp.h:522
RdKafka::ERR_FEATURE_UPDATE_FAILED
@ ERR_FEATURE_UPDATE_FAILED
Definition: rdkafkacpp.h:544
RdKafka::ERR_FENCED_LEADER_EPOCH
@ ERR_FENCED_LEADER_EPOCH
Definition: rdkafkacpp.h:496
RdKafka::ERR__PARTITION_EOF
@ ERR__PARTITION_EOF
Definition: rdkafkacpp.h:224
RdKafka::Metadata
Metadata container.
Definition: rdkafkacpp.h:3723
RdKafka::CERT_PUBLIC_KEY
@ CERT_PUBLIC_KEY
Definition: rdkafkacpp.h:563
RdKafka::Metadata::BrokerMetadataIterator
BrokerMetadataVector::const_iterator BrokerMetadataIterator
Brokers iterator.
Definition: rdkafkacpp.h:3731
RdKafka::ERR_DELEGATION_TOKEN_AUTH_DISABLED
@ ERR_DELEGATION_TOKEN_AUTH_DISABLED
Definition: rdkafkacpp.h:470
RdKafka::ERR_UNSTABLE_OFFSET_COMMIT
@ ERR_UNSTABLE_OFFSET_COMMIT
Definition: rdkafkacpp.h:526
RdKafka::ERR_INVALID_UPDATE_VERSION
@ ERR_INVALID_UPDATE_VERSION
Definition: rdkafkacpp.h:542
RdKafka::ERR_INVALID_FETCH_SESSION_EPOCH
@ ERR_INVALID_FETCH_SESSION_EPOCH
Definition: rdkafkacpp.h:490
RdKafka::ERR_TOPIC_EXCEPTION
@ ERR_TOPIC_EXCEPTION
Definition: rdkafkacpp.h:377
RdKafka::ERR__KEY_DESERIALIZATION
@ ERR__KEY_DESERIALIZATION
Definition: rdkafkacpp.h:286
RdKafka::ERR__UNKNOWN_PROTOCOL
@ ERR__UNKNOWN_PROTOCOL
Definition: rdkafkacpp.h:264
RdKafka::ERR_NO_REASSIGNMENT_IN_PROGRESS
@ ERR_NO_REASSIGNMENT_IN_PROGRESS
Definition: rdkafkacpp.h:519
RdKafka::ERR_COORDINATOR_LOAD_IN_PROGRESS
@ ERR_COORDINATOR_LOAD_IN_PROGRESS
Definition: rdkafkacpp.h:365
RdKafka::SocketCb
Portability: SocketCb callback class
Definition: rdkafkacpp.h:1125
RdKafka::ERR_TOPIC_DELETION_DISABLED
@ ERR_TOPIC_DELETION_DISABLED
Definition: rdkafkacpp.h:494
RdKafka::ERR_UNSUPPORTED_FOR_MESSAGE_FORMAT
@ ERR_UNSUPPORTED_FOR_MESSAGE_FORMAT
Definition: rdkafkacpp.h:429
RdKafka::TopicMetadata::PartitionMetadataIterator
PartitionMetadataVector::const_iterator PartitionMetadataIterator
Partitions iterator.
Definition: rdkafkacpp.h:3705
RdKafka::PartitionerKeyPointerCb::partitioner_cb
virtual int32_t partitioner_cb(const Topic *topic, const void *key, size_t key_len, int32_t partition_cnt, void *msg_opaque)=0
Variant partitioner callback that gets key as pointer and length instead of as a const std::string *.
RdKafka::ERR__PARTIAL
@ ERR__PARTIAL
Definition: rdkafkacpp.h:290
RdKafka::ERR_REQUEST_TIMED_OUT
@ ERR_REQUEST_TIMED_OUT
Definition: rdkafkacpp.h:351
RdKafka::ERR_GROUP_MAX_SIZE_REACHED
@ ERR_GROUP_MAX_SIZE_REACHED
Definition: rdkafkacpp.h:510
RdKafka::ERR_CONCURRENT_TRANSACTIONS
@ ERR_CONCURRENT_TRANSACTIONS
Definition: rdkafkacpp.h:448
RdKafka::Headers::Header::value_string
const char * value_string() const
Definition: rdkafkacpp.h:2223
RdKafka::Headers
Headers object.
Definition: rdkafkacpp.h:2123
RdKafka::Topic::OFFSET_INVALID
static const int64_t OFFSET_INVALID
Definition: rdkafkacpp.h:2011
RdKafka::Event::EVENT_STATS
@ EVENT_STATS
Definition: rdkafkacpp.h:848
RdKafka::Headers::Header::Header
Header(const Header &other)
Copy constructor.
Definition: rdkafkacpp.h:2179
RdKafka::ERR_INVALID_REPLICATION_FACTOR
@ ERR_INVALID_REPLICATION_FACTOR
Definition: rdkafkacpp.h:419
RdKafka::ERR_STALE_BROKER_EPOCH
@ ERR_STALE_BROKER_EPOCH
Definition: rdkafkacpp.h:502
RdKafka::ERR_INVALID_CONFIG
@ ERR_INVALID_CONFIG
Definition: rdkafkacpp.h:423
RdKafka::ERR_INVALID_REQUIRED_ACKS
@ ERR_INVALID_REQUIRED_ACKS
Definition: rdkafkacpp.h:385
RdKafka::ERR__PURGE_INFLIGHT
@ ERR__PURGE_INFLIGHT
Definition: rdkafkacpp.h:304
RdKafka::PartitionMetadata::ISRSIterator
ISRSVector::const_iterator ISRSIterator
ISRs iterator.
Definition: rdkafkacpp.h:3672
RdKafka::CertificateType
CertificateType
SSL certificate types.
Definition: rdkafkacpp.h:562
RdKafka::PartitionMetadata::ReplicasVector
std::vector< int32_t > ReplicasVector
Replicas.
Definition: rdkafkacpp.h:3665
RdKafka::OpenCb
Portability: OpenCb callback class
Definition: rdkafkacpp.h:1151