librdkafka
The Apache Kafka C/C++ client library
 All Data Structures Files Functions Variables Typedefs Enumerations Enumerator Macros Pages
rdkafkacpp.h
Go to the documentation of this file.
1 /*
2  * librdkafka - Apache Kafka C/C++ library
3  *
4  * Copyright (c) 2014 Magnus Edenhill
5  * All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions are met:
9  *
10  * 1. Redistributions of source code must retain the above copyright notice,
11  * this list of conditions and the following disclaimer.
12  * 2. Redistributions in binary form must reproduce the above copyright notice,
13  * this list of conditions and the following disclaimer in the documentation
14  * and/or other materials provided with the distribution.
15  *
16  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26  * POSSIBILITY OF SUCH DAMAGE.
27  */
28 
29 #ifndef _RDKAFKACPP_H_
30 #define _RDKAFKACPP_H_
31 
50 #include <string>
51 #include <list>
52 #include <vector>
53 #include <stdint.h>
54 
55 
56 #ifdef _MSC_VER
57 #undef RD_EXPORT
58 #ifdef LIBRDKAFKA_STATICLIB
59 #define RD_EXPORT
60 #else
61 #ifdef LIBRDKAFKACPP_EXPORTS
62 #define RD_EXPORT __declspec(dllexport)
63 #else
64 #define RD_EXPORT __declspec(dllimport)
65 #endif
66 #endif
67 #else
68 #define RD_EXPORT
69 #endif
70 
73 extern "C" {
74  /* Forward declarations */
75  struct rd_kafka_s;
76  struct rd_kafka_topic_s;
77  struct rd_kafka_message_s;
78 };
79 
80 namespace RdKafka {
81 
82 
102 #define RD_KAFKA_VERSION 0x000b04ff
103 
109 RD_EXPORT
110 int version ();
111 
115 RD_EXPORT
116 std::string version_str();
117 
122 RD_EXPORT
123 std::string get_debug_contexts();
124 
134 RD_EXPORT
135 int wait_destroyed(int timeout_ms);
136 
137 
160 enum ErrorCode {
161  /* Internal errors to rdkafka: */
163  ERR__BEGIN = -200,
165  ERR__BAD_MSG = -199,
167  ERR__BAD_COMPRESSION = -198,
169  ERR__DESTROY = -197,
171  ERR__FAIL = -196,
173  ERR__TRANSPORT = -195,
175  ERR__CRIT_SYS_RESOURCE = -194,
177  ERR__RESOLVE = -193,
179  ERR__MSG_TIMED_OUT = -192,
182  ERR__PARTITION_EOF = -191,
184  ERR__UNKNOWN_PARTITION = -190,
186  ERR__FS = -189,
188  ERR__UNKNOWN_TOPIC = -188,
190  ERR__ALL_BROKERS_DOWN = -187,
192  ERR__INVALID_ARG = -186,
194  ERR__TIMED_OUT = -185,
196  ERR__QUEUE_FULL = -184,
198  ERR__ISR_INSUFF = -183,
200  ERR__NODE_UPDATE = -182,
202  ERR__SSL = -181,
204  ERR__WAIT_COORD = -180,
206  ERR__UNKNOWN_GROUP = -179,
208  ERR__IN_PROGRESS = -178,
210  ERR__PREV_IN_PROGRESS = -177,
212  ERR__EXISTING_SUBSCRIPTION = -176,
214  ERR__ASSIGN_PARTITIONS = -175,
216  ERR__REVOKE_PARTITIONS = -174,
218  ERR__CONFLICT = -173,
220  ERR__STATE = -172,
222  ERR__UNKNOWN_PROTOCOL = -171,
224  ERR__NOT_IMPLEMENTED = -170,
226  ERR__AUTHENTICATION = -169,
228  ERR__NO_OFFSET = -168,
230  ERR__OUTDATED = -167,
232  ERR__TIMED_OUT_QUEUE = -166,
234  ERR__UNSUPPORTED_FEATURE = -165,
236  ERR__WAIT_CACHE = -164,
238  ERR__INTR = -163,
240  ERR__KEY_SERIALIZATION = -162,
242  ERR__VALUE_SERIALIZATION = -161,
244  ERR__KEY_DESERIALIZATION = -160,
246  ERR__VALUE_DESERIALIZATION = -159,
248  ERR__PARTIAL = -158,
250  ERR__READ_ONLY = -157,
252  ERR__NOENT = -156,
254  ERR__UNDERFLOW = -155,
255 
257  ERR__END = -100,
258 
259  /* Kafka broker errors: */
261  ERR_UNKNOWN = -1,
263  ERR_NO_ERROR = 0,
265  ERR_OFFSET_OUT_OF_RANGE = 1,
267  ERR_INVALID_MSG = 2,
269  ERR_UNKNOWN_TOPIC_OR_PART = 3,
271  ERR_INVALID_MSG_SIZE = 4,
273  ERR_LEADER_NOT_AVAILABLE = 5,
275  ERR_NOT_LEADER_FOR_PARTITION = 6,
277  ERR_REQUEST_TIMED_OUT = 7,
279  ERR_BROKER_NOT_AVAILABLE = 8,
281  ERR_REPLICA_NOT_AVAILABLE = 9,
283  ERR_MSG_SIZE_TOO_LARGE = 10,
285  ERR_STALE_CTRL_EPOCH = 11,
287  ERR_OFFSET_METADATA_TOO_LARGE = 12,
289  ERR_NETWORK_EXCEPTION = 13,
291  ERR_GROUP_LOAD_IN_PROGRESS = 14,
293  ERR_GROUP_COORDINATOR_NOT_AVAILABLE = 15,
295  ERR_NOT_COORDINATOR_FOR_GROUP = 16,
297  ERR_TOPIC_EXCEPTION = 17,
299  ERR_RECORD_LIST_TOO_LARGE = 18,
301  ERR_NOT_ENOUGH_REPLICAS = 19,
303  ERR_NOT_ENOUGH_REPLICAS_AFTER_APPEND = 20,
305  ERR_INVALID_REQUIRED_ACKS = 21,
307  ERR_ILLEGAL_GENERATION = 22,
309  ERR_INCONSISTENT_GROUP_PROTOCOL = 23,
311  ERR_INVALID_GROUP_ID = 24,
313  ERR_UNKNOWN_MEMBER_ID = 25,
315  ERR_INVALID_SESSION_TIMEOUT = 26,
317  ERR_REBALANCE_IN_PROGRESS = 27,
319  ERR_INVALID_COMMIT_OFFSET_SIZE = 28,
321  ERR_TOPIC_AUTHORIZATION_FAILED = 29,
323  ERR_GROUP_AUTHORIZATION_FAILED = 30,
325  ERR_CLUSTER_AUTHORIZATION_FAILED = 31,
327  ERR_INVALID_TIMESTAMP = 32,
329  ERR_UNSUPPORTED_SASL_MECHANISM = 33,
331  ERR_ILLEGAL_SASL_STATE = 34,
333  ERR_UNSUPPORTED_VERSION = 35,
335  ERR_TOPIC_ALREADY_EXISTS = 36,
337  ERR_INVALID_PARTITIONS = 37,
339  ERR_INVALID_REPLICATION_FACTOR = 38,
341  ERR_INVALID_REPLICA_ASSIGNMENT = 39,
343  ERR_INVALID_CONFIG = 40,
345  ERR_NOT_CONTROLLER = 41,
347  ERR_INVALID_REQUEST = 42,
349  ERR_UNSUPPORTED_FOR_MESSAGE_FORMAT = 43,
351  ERR_POLICY_VIOLATION = 44,
353  ERR_OUT_OF_ORDER_SEQUENCE_NUMBER = 45,
355  ERR_DUPLICATE_SEQUENCE_NUMBER = 46,
357  ERR_INVALID_PRODUCER_EPOCH = 47,
359  ERR_INVALID_TXN_STATE = 48,
362  ERR_INVALID_PRODUCER_ID_MAPPING = 49,
365  ERR_INVALID_TRANSACTION_TIMEOUT = 50,
368  ERR_CONCURRENT_TRANSACTIONS = 51,
372  ERR_TRANSACTION_COORDINATOR_FENCED = 52,
374  ERR_TRANSACTIONAL_ID_AUTHORIZATION_FAILED = 53,
376  ERR_SECURITY_DISABLED = 54,
378  ERR_OPERATION_NOT_ATTEMPTED = 55
379 };
380 
381 
385 RD_EXPORT
386 std::string err2str(RdKafka::ErrorCode err);
387 
388 
394 /* Forward declarations */
395 class Producer;
396 class Message;
397 class Queue;
398 class Event;
399 class Topic;
400 class TopicPartition;
401 class Metadata;
402 class KafkaConsumer;
434 class RD_EXPORT DeliveryReportCb {
435  public:
439  virtual void dr_cb (Message &message) = 0;
440 
441  virtual ~DeliveryReportCb() { }
442 };
443 
444 
452 class RD_EXPORT PartitionerCb {
453  public:
470  virtual int32_t partitioner_cb (const Topic *topic,
471  const std::string *key,
472  int32_t partition_cnt,
473  void *msg_opaque) = 0;
474 
475  virtual ~PartitionerCb() { }
476 };
477 
483  public:
492  virtual int32_t partitioner_cb (const Topic *topic,
493  const void *key,
494  size_t key_len,
495  int32_t partition_cnt,
496  void *msg_opaque) = 0;
497 
498  virtual ~PartitionerKeyPointerCb() { }
499 };
500 
501 
502 
511 class RD_EXPORT EventCb {
512  public:
518  virtual void event_cb (Event &event) = 0;
519 
520  virtual ~EventCb() { }
521 };
522 
523 
527 class RD_EXPORT Event {
528  public:
530  enum Type {
534  EVENT_THROTTLE
535  };
536 
538  enum Severity {
539  EVENT_SEVERITY_EMERG = 0,
540  EVENT_SEVERITY_ALERT = 1,
541  EVENT_SEVERITY_CRITICAL = 2,
542  EVENT_SEVERITY_ERROR = 3,
543  EVENT_SEVERITY_WARNING = 4,
544  EVENT_SEVERITY_NOTICE = 5,
545  EVENT_SEVERITY_INFO = 6,
546  EVENT_SEVERITY_DEBUG = 7
547  };
548 
549  virtual ~Event () { }
550 
551  /*
552  * Event Accessor methods
553  */
554 
559  virtual Type type () const = 0;
560 
565  virtual ErrorCode err () const = 0;
566 
571  virtual Severity severity () const = 0;
572 
577  virtual std::string fac () const = 0;
578 
587  virtual std::string str () const = 0;
588 
593  virtual int throttle_time () const = 0;
594 
599  virtual std::string broker_name () const = 0;
600 
605  virtual int broker_id () const = 0;
606 };
607 
608 
609 
613 class RD_EXPORT ConsumeCb {
614  public:
622  virtual void consume_cb (Message &message, void *opaque) = 0;
623 
624  virtual ~ConsumeCb() { }
625 };
626 
627 
631 class RD_EXPORT RebalanceCb {
632 public:
682  virtual void rebalance_cb (RdKafka::KafkaConsumer *consumer,
683  RdKafka::ErrorCode err,
684  std::vector<TopicPartition*>&partitions) = 0;
685 
686  virtual ~RebalanceCb() { }
687 };
688 
689 
693 class RD_EXPORT OffsetCommitCb {
694 public:
710  virtual void offset_commit_cb(RdKafka::ErrorCode err,
711  std::vector<TopicPartition*>&offsets) = 0;
712 
713  virtual ~OffsetCommitCb() { }
714 };
715 
716 
717 
722 class RD_EXPORT SocketCb {
723  public:
737  virtual int socket_cb (int domain, int type, int protocol) = 0;
738 
739  virtual ~SocketCb() { }
740 };
741 
742 
747 class RD_EXPORT OpenCb {
748  public:
760  virtual int open_cb (const std::string &path, int flags, int mode) = 0;
761 
762  virtual ~OpenCb() { }
763 };
764 
765 
786 class RD_EXPORT Conf {
787  public:
791  enum ConfType {
793  CONF_TOPIC
794  };
795 
799  enum ConfResult {
800  CONF_UNKNOWN = -2,
801  CONF_INVALID = -1,
802  CONF_OK = 0
803  };
804 
805 
809  static Conf *create (ConfType type);
810 
811  virtual ~Conf () { }
812 
826  virtual Conf::ConfResult set (const std::string &name,
827  const std::string &value,
828  std::string &errstr) = 0;
829 
831  virtual Conf::ConfResult set (const std::string &name,
832  DeliveryReportCb *dr_cb,
833  std::string &errstr) = 0;
834 
836  virtual Conf::ConfResult set (const std::string &name,
837  EventCb *event_cb,
838  std::string &errstr) = 0;
839 
847  virtual Conf::ConfResult set (const std::string &name,
848  const Conf *topic_conf,
849  std::string &errstr) = 0;
850 
852  virtual Conf::ConfResult set (const std::string &name,
853  PartitionerCb *partitioner_cb,
854  std::string &errstr) = 0;
855 
857  virtual Conf::ConfResult set (const std::string &name,
858  PartitionerKeyPointerCb *partitioner_kp_cb,
859  std::string &errstr) = 0;
860 
862  virtual Conf::ConfResult set (const std::string &name, SocketCb *socket_cb,
863  std::string &errstr) = 0;
864 
866  virtual Conf::ConfResult set (const std::string &name, OpenCb *open_cb,
867  std::string &errstr) = 0;
868 
870  virtual Conf::ConfResult set (const std::string &name,
871  RebalanceCb *rebalance_cb,
872  std::string &errstr) = 0;
873 
875  virtual Conf::ConfResult set (const std::string &name,
876  OffsetCommitCb *offset_commit_cb,
877  std::string &errstr) = 0;
878 
890  virtual Conf::ConfResult get(const std::string &name,
891  std::string &value) const = 0;
892 
896  virtual Conf::ConfResult get(DeliveryReportCb *&dr_cb) const = 0;
897 
901  virtual Conf::ConfResult get(EventCb *&event_cb) const = 0;
902 
906  virtual Conf::ConfResult get(PartitionerCb *&partitioner_cb) const = 0;
907 
911  virtual Conf::ConfResult get(PartitionerKeyPointerCb *&partitioner_kp_cb) const = 0;
912 
916  virtual Conf::ConfResult get(SocketCb *&socket_cb) const = 0;
917 
921  virtual Conf::ConfResult get(OpenCb *&open_cb) const = 0;
922 
926  virtual Conf::ConfResult get(RebalanceCb *&rebalance_cb) const = 0;
927 
931  virtual Conf::ConfResult get(OffsetCommitCb *&offset_commit_cb) const = 0;
932 
935  virtual std::list<std::string> *dump () = 0;
936 
938  virtual Conf::ConfResult set (const std::string &name, ConsumeCb *consume_cb,
939  std::string &errstr) = 0;
940 };
941 
954 class RD_EXPORT Handle {
955  public:
956  virtual ~Handle() { }
957 
959  virtual const std::string name () const = 0;
960 
969  virtual const std::string memberid () const = 0;
970 
971 
994  virtual int poll (int timeout_ms) = 0;
995 
1002  virtual int outq_len () = 0;
1003 
1019  virtual ErrorCode metadata (bool all_topics, const Topic *only_rkt,
1020  Metadata **metadatap, int timeout_ms) = 0;
1021 
1022 
1032  virtual ErrorCode pause (std::vector<TopicPartition*> &partitions) = 0;
1033 
1034 
1044  virtual ErrorCode resume (std::vector<TopicPartition*> &partitions) = 0;
1045 
1046 
1055  virtual ErrorCode query_watermark_offsets (const std::string &topic,
1056  int32_t partition,
1057  int64_t *low, int64_t *high,
1058  int timeout_ms) = 0;
1059 
1077  virtual ErrorCode get_watermark_offsets (const std::string &topic,
1078  int32_t partition,
1079  int64_t *low, int64_t *high) = 0;
1080 
1081 
1101  virtual ErrorCode offsetsForTimes (std::vector<TopicPartition*> &offsets,
1102  int timeout_ms) = 0;
1103 
1104 
1113  virtual Queue *get_partition_queue (const TopicPartition *partition) = 0;
1114 
1131  virtual ErrorCode set_log_queue (Queue *queue) = 0;
1132 
1144  virtual void yield () = 0;
1145 
1160  virtual const std::string clusterid (int timeout_ms) = 0;
1161 
1178  virtual struct rd_kafka_s *c_ptr () = 0;
1179 };
1180 
1181 
1200 class RD_EXPORT TopicPartition {
1201 public:
1208  static TopicPartition *create (const std::string &topic, int partition);
1209  static TopicPartition *create (const std::string &topic, int partition,
1210  int64_t offset);
1211 
1212  virtual ~TopicPartition() = 0;
1213 
1218  static void destroy (std::vector<TopicPartition*> &partitions);
1219 
1221  virtual const std::string &topic () const = 0;
1222 
1224  virtual int partition () const = 0;
1225 
1227  virtual int64_t offset () const = 0;
1228 
1230  virtual void set_offset (int64_t offset) = 0;
1231 
1233  virtual ErrorCode err () const = 0;
1234 };
1235 
1236 
1237 
1242 class RD_EXPORT Topic {
1243  public:
1250  static const int32_t PARTITION_UA;
1251 
1253  static const int64_t OFFSET_BEGINNING;
1254  static const int64_t OFFSET_END;
1255  static const int64_t OFFSET_STORED;
1256  static const int64_t OFFSET_INVALID;
1268  static Topic *create (Handle *base, const std::string &topic_str,
1269  Conf *conf, std::string &errstr);
1270 
1271  virtual ~Topic () = 0;
1272 
1273 
1275  virtual const std::string name () const = 0;
1276 
1282  virtual bool partition_available (int32_t partition) const = 0;
1283 
1294  virtual ErrorCode offset_store (int32_t partition, int64_t offset) = 0;
1295 
1312  virtual struct rd_kafka_topic_s *c_ptr () = 0;
1313 };
1314 
1315 
1337 class RD_EXPORT MessageTimestamp {
1338 public:
1342  MSG_TIMESTAMP_LOG_APPEND_TIME
1343  };
1344 
1346  int64_t timestamp;
1347 };
1348 
1349 
1350 
1362 class RD_EXPORT Message {
1363  public:
1371  virtual std::string errstr() const = 0;
1372 
1374  virtual ErrorCode err () const = 0;
1375 
1380  virtual Topic *topic () const = 0;
1381 
1383  virtual std::string topic_name () const = 0;
1384 
1386  virtual int32_t partition () const = 0;
1387 
1389  virtual void *payload () const = 0 ;
1390 
1392  virtual size_t len () const = 0;
1393 
1395  virtual const std::string *key () const = 0;
1396 
1398  virtual const void *key_pointer () const = 0 ;
1399 
1401  virtual size_t key_len () const = 0;
1402 
1404  virtual int64_t offset () const = 0;
1405 
1407  virtual MessageTimestamp timestamp () const = 0;
1408 
1410  virtual void *msg_opaque () const = 0;
1411 
1412  virtual ~Message () = 0;
1413 
1416  virtual int64_t latency () const = 0;
1417 
1434  virtual struct rd_kafka_message_s *c_ptr () = 0;
1435 };
1436 
1460 class RD_EXPORT Queue {
1461  public:
1465  static Queue *create (Handle *handle);
1466 
1477  virtual ErrorCode forward (Queue *dst) = 0;
1478 
1479 
1491  virtual Message *consume (int timeout_ms) = 0;
1492 
1500  virtual int poll (int timeout_ms) = 0;
1501 
1502  virtual ~Queue () = 0;
1503 
1519  virtual void io_event_enable (int fd, const void *payload, size_t size) = 0;
1520 };
1521 
1540 class RD_EXPORT KafkaConsumer : public virtual Handle {
1541 public:
1553  static KafkaConsumer *create (Conf *conf, std::string &errstr);
1554 
1555  virtual ~KafkaConsumer () = 0;
1556 
1557 
1560  virtual ErrorCode assignment (std::vector<RdKafka::TopicPartition*> &partitions) = 0;
1561 
1564  virtual ErrorCode subscription (std::vector<std::string> &topics) = 0;
1565 
1590  virtual ErrorCode subscribe (const std::vector<std::string> &topics) = 0;
1591 
1593  virtual ErrorCode unsubscribe () = 0;
1594 
1601  virtual ErrorCode assign (const std::vector<TopicPartition*> &partitions) = 0;
1602 
1606  virtual ErrorCode unassign () = 0;
1607 
1632  virtual Message *consume (int timeout_ms) = 0;
1633 
1647  virtual ErrorCode commitSync () = 0;
1648 
1654  virtual ErrorCode commitAsync () = 0;
1655 
1663  virtual ErrorCode commitSync (Message *message) = 0;
1664 
1672  virtual ErrorCode commitAsync (Message *message) = 0;
1673 
1679  virtual ErrorCode commitSync (std::vector<TopicPartition*> &offsets) = 0;
1680 
1686  virtual ErrorCode commitAsync (const std::vector<TopicPartition*> &offsets) = 0;
1687 
1698  virtual ErrorCode commitSync (OffsetCommitCb *offset_commit_cb) = 0;
1699 
1710  virtual ErrorCode commitSync (std::vector<TopicPartition*> &offsets,
1711  OffsetCommitCb *offset_commit_cb) = 0;
1712 
1713 
1714 
1715 
1724  virtual ErrorCode committed (std::vector<TopicPartition*> &partitions,
1725  int timeout_ms) = 0;
1726 
1735  virtual ErrorCode position (std::vector<TopicPartition*> &partitions) = 0;
1736 
1737 
1760  virtual ErrorCode close () = 0;
1761 
1762 
1780  virtual ErrorCode seek (const TopicPartition &partition, int timeout_ms) = 0;
1781 
1782 
1797  virtual ErrorCode offsets_store (std::vector<TopicPartition*> &offsets) = 0;
1798 };
1799 
1800 
1815 class RD_EXPORT Consumer : public virtual Handle {
1816  public:
1827  static Consumer *create (Conf *conf, std::string &errstr);
1828 
1829  virtual ~Consumer () = 0;
1830 
1831 
1851  virtual ErrorCode start (Topic *topic, int32_t partition, int64_t offset) = 0;
1852 
1859  virtual ErrorCode start (Topic *topic, int32_t partition, int64_t offset,
1860  Queue *queue) = 0;
1861 
1871  virtual ErrorCode stop (Topic *topic, int32_t partition) = 0;
1872 
1887  virtual ErrorCode seek (Topic *topic, int32_t partition, int64_t offset,
1888  int timeout_ms) = 0;
1889 
1907  virtual Message *consume (Topic *topic, int32_t partition,
1908  int timeout_ms) = 0;
1909 
1931  virtual Message *consume (Queue *queue, int timeout_ms) = 0;
1932 
1952  virtual int consume_callback (Topic *topic, int32_t partition,
1953  int timeout_ms,
1954  ConsumeCb *consume_cb,
1955  void *opaque) = 0;
1956 
1963  virtual int consume_callback (Queue *queue, int timeout_ms,
1964  RdKafka::ConsumeCb *consume_cb,
1965  void *opaque) = 0;
1966 
1976  static int64_t OffsetTail(int64_t offset);
1977 };
1978 
1992 class RD_EXPORT Producer : public virtual Handle {
1993  public:
2004  static Producer *create (Conf *conf, std::string &errstr);
2005 
2006 
2007  virtual ~Producer () = 0;
2008 
2014  enum {
2015  RK_MSG_FREE = 0x1,
2017  RK_MSG_COPY = 0x2,
2021  RK_MSG_BLOCK = 0x4
2038  /* For backwards compatibility: */
2039 #ifndef MSG_COPY /* defined in sys/msg.h */
2040  ,
2043  MSG_FREE = RK_MSG_FREE,
2044  MSG_COPY = RK_MSG_COPY
2045 #endif
2046 
2047  };
2048 
2105  virtual ErrorCode produce (Topic *topic, int32_t partition,
2106  int msgflags,
2107  void *payload, size_t len,
2108  const std::string *key,
2109  void *msg_opaque) = 0;
2110 
2115  virtual ErrorCode produce (Topic *topic, int32_t partition,
2116  int msgflags,
2117  void *payload, size_t len,
2118  const void *key, size_t key_len,
2119  void *msg_opaque) = 0;
2120 
2127  virtual ErrorCode produce (const std::string topic_name, int32_t partition,
2128  int msgflags,
2129  void *payload, size_t len,
2130  const void *key, size_t key_len,
2131  int64_t timestamp,
2132  void *msg_opaque) = 0;
2133 
2134 
2139  virtual ErrorCode produce (Topic *topic, int32_t partition,
2140  const std::vector<char> *payload,
2141  const std::vector<char> *key,
2142  void *msg_opaque) = 0;
2143 
2144 
2156  virtual ErrorCode flush (int timeout_ms) = 0;
2157 };
2158 
2173  public:
2175  virtual int32_t id() const = 0;
2176 
2178  virtual const std::string host() const = 0;
2179 
2181  virtual int port() const = 0;
2182 
2183  virtual ~BrokerMetadata() = 0;
2184 };
2185 
2186 
2187 
2192  public:
2194  typedef std::vector<int32_t> ReplicasVector;
2196  typedef std::vector<int32_t> ISRSVector;
2197 
2199  typedef ReplicasVector::const_iterator ReplicasIterator;
2201  typedef ISRSVector::const_iterator ISRSIterator;
2202 
2203 
2205  virtual int32_t id() const = 0;
2206 
2208  virtual ErrorCode err() const = 0;
2209 
2211  virtual int32_t leader() const = 0;
2212 
2214  virtual const std::vector<int32_t> *replicas() const = 0;
2215 
2219  virtual const std::vector<int32_t> *isrs() const = 0;
2220 
2221  virtual ~PartitionMetadata() = 0;
2222 };
2223 
2224 
2225 
2230  public:
2232  typedef std::vector<const PartitionMetadata*> PartitionMetadataVector;
2234  typedef PartitionMetadataVector::const_iterator PartitionMetadataIterator;
2235 
2237  virtual const std::string topic() const = 0;
2238 
2240  virtual const PartitionMetadataVector *partitions() const = 0;
2241 
2243  virtual ErrorCode err() const = 0;
2244 
2245  virtual ~TopicMetadata() = 0;
2246 };
2247 
2248 
2252 class Metadata {
2253  public:
2255  typedef std::vector<const BrokerMetadata*> BrokerMetadataVector;
2257  typedef std::vector<const TopicMetadata*> TopicMetadataVector;
2258 
2260  typedef BrokerMetadataVector::const_iterator BrokerMetadataIterator;
2262  typedef TopicMetadataVector::const_iterator TopicMetadataIterator;
2263 
2264 
2266  virtual const BrokerMetadataVector *brokers() const = 0;
2267 
2269  virtual const TopicMetadataVector *topics() const = 0;
2270 
2272  virtual int32_t orig_broker_id() const = 0;
2273 
2275  virtual const std::string orig_broker_name() const = 0;
2276 
2277  virtual ~Metadata() = 0;
2278 };
2279 
2282 }
2283 
2284 #endif /* _RDKAFKACPP_H_ */
virtual int32_t partitioner_cb(const Topic *topic, const void *key, size_t key_len, int32_t partition_cnt, void *msg_opaque)=0
Variant partitioner callback that gets key as pointer and length instead of as a const std::string *...
virtual ErrorCode err() const =0
virtual const std::string orig_broker_name() const =0
Broker (name) originating this metadata.
ISRSVector::const_iterator ISRSIterator
ISRs iterator.
Definition: rdkafkacpp.h:2201
High-level KafkaConsumer (for brokers 0.9 and later)
Definition: rdkafkacpp.h:1540
static const int64_t OFFSET_BEGINNING
Special offsets.
Definition: rdkafkacpp.h:1253
ConfType
Configuration object type.
Definition: rdkafkacpp.h:791
Partitioner callback class.
Definition: rdkafkacpp.h:452
virtual int port() const =0
Type
Event type.
Definition: rdkafkacpp.h:530
virtual int32_t orig_broker_id() const =0
Broker (id) originating this metadata.
virtual const std::string host() const =0
std::vector< const PartitionMetadata * > PartitionMetadataVector
Partitions.
Definition: rdkafkacpp.h:2232
static const int32_t PARTITION_UA
Unassigned partition.
Definition: rdkafkacpp.h:1250
Message object.
Definition: rdkafkacpp.h:1362
Severity
EVENT_LOG severities (conforms to syslog(3) severities)
Definition: rdkafkacpp.h:538
Event callback class.
Definition: rdkafkacpp.h:511
ReplicasVector::const_iterator ReplicasIterator
Replicas iterator.
Definition: rdkafkacpp.h:2199
KafkaConsunmer: Rebalance callback class
Definition: rdkafkacpp.h:631
std::vector< int32_t > ReplicasVector
Replicas.
Definition: rdkafkacpp.h:2194
virtual const std::string topic() const =0
int64_t timestamp
Definition: rdkafkacpp.h:1346
Variant partitioner with key pointer.
Definition: rdkafkacpp.h:482
virtual int32_t id() const =0
BrokerMetadataVector::const_iterator BrokerMetadataIterator
Brokers iterator.
Definition: rdkafkacpp.h:2260
Definition: rdkafkacpp.h:533
static const int64_t OFFSET_INVALID
Definition: rdkafkacpp.h:1256
virtual const PartitionMetadataVector * partitions() const =0
virtual const BrokerMetadataVector * brokers() const =0
Broker list.
Topic handle.
Definition: rdkafkacpp.h:1242
Metadata: Partition information.
Definition: rdkafkacpp.h:2191
Producer.
Definition: rdkafkacpp.h:1992
Metadata: Topic information.
Definition: rdkafkacpp.h:2229
Definition: rdkafkacpp.h:792
Definition: rdkafkacpp.h:531
std::vector< const BrokerMetadata * > BrokerMetadataVector
Brokers.
Definition: rdkafkacpp.h:2255
ConfResult
RdKafka::Conf::Set() result code.
Definition: rdkafkacpp.h:799
Definition: rdkafkacpp.h:80
Queue interface.
Definition: rdkafkacpp.h:1460
MessageTimestampType type
Definition: rdkafkacpp.h:1345
Message timestamp object.
Definition: rdkafkacpp.h:1337
std::vector< int32_t > ISRSVector
ISRs (In-Sync-Replicas)
Definition: rdkafkacpp.h:2196
virtual const TopicMetadataVector * topics() const =0
Topic list.
static const int64_t OFFSET_END
Definition: rdkafkacpp.h:1254
PartitionMetadataVector::const_iterator PartitionMetadataIterator
Partitions iterator.
Definition: rdkafkacpp.h:2234
Portability: OpenCb callback class
Definition: rdkafkacpp.h:747
Metadata: Broker information.
Definition: rdkafkacpp.h:2172
Definition: rdkafkacpp.h:532
virtual int32_t leader() const =0
static const int64_t OFFSET_STORED
Definition: rdkafkacpp.h:1255
Configuration interface.
Definition: rdkafkacpp.h:786
Offset Commit callback class.
Definition: rdkafkacpp.h:693
virtual const std::vector< int32_t > * replicas() const =0
Base handle, super class for specific clients.
Definition: rdkafkacpp.h:954
std::vector< const TopicMetadata * > TopicMetadataVector
Topics.
Definition: rdkafkacpp.h:2257
Topic+Partition.
Definition: rdkafkacpp.h:1200
Consume callback class.
Definition: rdkafkacpp.h:613
virtual const std::vector< int32_t > * isrs() const =0
Simple Consumer (legacy)
Definition: rdkafkacpp.h:1815
MessageTimestampType
Definition: rdkafkacpp.h:1339
TopicMetadataVector::const_iterator TopicMetadataIterator
Topics iterator.
Definition: rdkafkacpp.h:2262
Event object class as passed to the EventCb callback.
Definition: rdkafkacpp.h:527
Metadata container.
Definition: rdkafkacpp.h:2252
virtual ErrorCode err() const =0
Portability: SocketCb callback class
Definition: rdkafkacpp.h:722
Delivery Report callback class.
Definition: rdkafkacpp.h:434
virtual int32_t id() const =0