librdkafka
The Apache Kafka C/C++ client library
 All Data Structures Files Functions Variables Typedefs Enumerations Enumerator Macros Pages
rdkafkacpp.h
Go to the documentation of this file.
1 /*
2  * librdkafka - Apache Kafka C/C++ library
3  *
4  * Copyright (c) 2014 Magnus Edenhill
5  * All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions are met:
9  *
10  * 1. Redistributions of source code must retain the above copyright notice,
11  * this list of conditions and the following disclaimer.
12  * 2. Redistributions in binary form must reproduce the above copyright notice,
13  * this list of conditions and the following disclaimer in the documentation
14  * and/or other materials provided with the distribution.
15  *
16  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26  * POSSIBILITY OF SUCH DAMAGE.
27  */
28 
29 #ifndef _RDKAFKACPP_H_
30 #define _RDKAFKACPP_H_
31 
50 #include <string>
51 #include <list>
52 #include <vector>
53 #include <stdint.h>
54 
55 
56 #ifdef _MSC_VER
57 #undef RD_EXPORT
58 #ifdef LIBRDKAFKA_STATICLIB
59 #define RD_EXPORT
60 #else
61 #ifdef LIBRDKAFKACPP_EXPORTS
62 #define RD_EXPORT __declspec(dllexport)
63 #else
64 #define RD_EXPORT __declspec(dllimport)
65 #endif
66 #endif
67 #else
68 #define RD_EXPORT
69 #endif
70 
73 extern "C" {
74  /* Forward declarations */
75  struct rd_kafka_s;
76  struct rd_kafka_topic_s;
77  struct rd_kafka_message_s;
78 }
79 
80 namespace RdKafka {
81 
82 
102 #define RD_KAFKA_VERSION 0x000b06ff
103 
109 RD_EXPORT
110 int version ();
111 
115 RD_EXPORT
116 std::string version_str();
117 
122 RD_EXPORT
123 std::string get_debug_contexts();
124 
134 RD_EXPORT
135 int wait_destroyed(int timeout_ms);
136 
137 
160 enum ErrorCode {
161  /* Internal errors to rdkafka: */
163  ERR__BEGIN = -200,
165  ERR__BAD_MSG = -199,
167  ERR__BAD_COMPRESSION = -198,
169  ERR__DESTROY = -197,
171  ERR__FAIL = -196,
173  ERR__TRANSPORT = -195,
175  ERR__CRIT_SYS_RESOURCE = -194,
177  ERR__RESOLVE = -193,
179  ERR__MSG_TIMED_OUT = -192,
182  ERR__PARTITION_EOF = -191,
184  ERR__UNKNOWN_PARTITION = -190,
186  ERR__FS = -189,
188  ERR__UNKNOWN_TOPIC = -188,
190  ERR__ALL_BROKERS_DOWN = -187,
192  ERR__INVALID_ARG = -186,
194  ERR__TIMED_OUT = -185,
196  ERR__QUEUE_FULL = -184,
198  ERR__ISR_INSUFF = -183,
200  ERR__NODE_UPDATE = -182,
202  ERR__SSL = -181,
204  ERR__WAIT_COORD = -180,
206  ERR__UNKNOWN_GROUP = -179,
208  ERR__IN_PROGRESS = -178,
210  ERR__PREV_IN_PROGRESS = -177,
212  ERR__EXISTING_SUBSCRIPTION = -176,
214  ERR__ASSIGN_PARTITIONS = -175,
216  ERR__REVOKE_PARTITIONS = -174,
218  ERR__CONFLICT = -173,
220  ERR__STATE = -172,
222  ERR__UNKNOWN_PROTOCOL = -171,
224  ERR__NOT_IMPLEMENTED = -170,
226  ERR__AUTHENTICATION = -169,
228  ERR__NO_OFFSET = -168,
230  ERR__OUTDATED = -167,
232  ERR__TIMED_OUT_QUEUE = -166,
234  ERR__UNSUPPORTED_FEATURE = -165,
236  ERR__WAIT_CACHE = -164,
238  ERR__INTR = -163,
240  ERR__KEY_SERIALIZATION = -162,
242  ERR__VALUE_SERIALIZATION = -161,
244  ERR__KEY_DESERIALIZATION = -160,
246  ERR__VALUE_DESERIALIZATION = -159,
248  ERR__PARTIAL = -158,
250  ERR__READ_ONLY = -157,
252  ERR__NOENT = -156,
254  ERR__UNDERFLOW = -155,
256  ERR__INVALID_TYPE = -154,
257 
259  ERR__END = -100,
260 
261  /* Kafka broker errors: */
263  ERR_UNKNOWN = -1,
265  ERR_NO_ERROR = 0,
267  ERR_OFFSET_OUT_OF_RANGE = 1,
269  ERR_INVALID_MSG = 2,
271  ERR_UNKNOWN_TOPIC_OR_PART = 3,
273  ERR_INVALID_MSG_SIZE = 4,
275  ERR_LEADER_NOT_AVAILABLE = 5,
277  ERR_NOT_LEADER_FOR_PARTITION = 6,
279  ERR_REQUEST_TIMED_OUT = 7,
281  ERR_BROKER_NOT_AVAILABLE = 8,
283  ERR_REPLICA_NOT_AVAILABLE = 9,
285  ERR_MSG_SIZE_TOO_LARGE = 10,
287  ERR_STALE_CTRL_EPOCH = 11,
289  ERR_OFFSET_METADATA_TOO_LARGE = 12,
291  ERR_NETWORK_EXCEPTION = 13,
293  ERR_GROUP_LOAD_IN_PROGRESS = 14,
295  ERR_GROUP_COORDINATOR_NOT_AVAILABLE = 15,
297  ERR_NOT_COORDINATOR_FOR_GROUP = 16,
299  ERR_TOPIC_EXCEPTION = 17,
301  ERR_RECORD_LIST_TOO_LARGE = 18,
303  ERR_NOT_ENOUGH_REPLICAS = 19,
305  ERR_NOT_ENOUGH_REPLICAS_AFTER_APPEND = 20,
307  ERR_INVALID_REQUIRED_ACKS = 21,
309  ERR_ILLEGAL_GENERATION = 22,
311  ERR_INCONSISTENT_GROUP_PROTOCOL = 23,
313  ERR_INVALID_GROUP_ID = 24,
315  ERR_UNKNOWN_MEMBER_ID = 25,
317  ERR_INVALID_SESSION_TIMEOUT = 26,
319  ERR_REBALANCE_IN_PROGRESS = 27,
321  ERR_INVALID_COMMIT_OFFSET_SIZE = 28,
323  ERR_TOPIC_AUTHORIZATION_FAILED = 29,
325  ERR_GROUP_AUTHORIZATION_FAILED = 30,
327  ERR_CLUSTER_AUTHORIZATION_FAILED = 31,
329  ERR_INVALID_TIMESTAMP = 32,
331  ERR_UNSUPPORTED_SASL_MECHANISM = 33,
333  ERR_ILLEGAL_SASL_STATE = 34,
335  ERR_UNSUPPORTED_VERSION = 35,
337  ERR_TOPIC_ALREADY_EXISTS = 36,
339  ERR_INVALID_PARTITIONS = 37,
341  ERR_INVALID_REPLICATION_FACTOR = 38,
343  ERR_INVALID_REPLICA_ASSIGNMENT = 39,
345  ERR_INVALID_CONFIG = 40,
347  ERR_NOT_CONTROLLER = 41,
349  ERR_INVALID_REQUEST = 42,
351  ERR_UNSUPPORTED_FOR_MESSAGE_FORMAT = 43,
353  ERR_POLICY_VIOLATION = 44,
355  ERR_OUT_OF_ORDER_SEQUENCE_NUMBER = 45,
357  ERR_DUPLICATE_SEQUENCE_NUMBER = 46,
359  ERR_INVALID_PRODUCER_EPOCH = 47,
361  ERR_INVALID_TXN_STATE = 48,
364  ERR_INVALID_PRODUCER_ID_MAPPING = 49,
367  ERR_INVALID_TRANSACTION_TIMEOUT = 50,
370  ERR_CONCURRENT_TRANSACTIONS = 51,
374  ERR_TRANSACTION_COORDINATOR_FENCED = 52,
376  ERR_TRANSACTIONAL_ID_AUTHORIZATION_FAILED = 53,
378  ERR_SECURITY_DISABLED = 54,
380  ERR_OPERATION_NOT_ATTEMPTED = 55
381 };
382 
383 
387 RD_EXPORT
388 std::string err2str(RdKafka::ErrorCode err);
389 
390 
396 /* Forward declarations */
397 class Producer;
398 class Message;
399 class Queue;
400 class Event;
401 class Topic;
402 class TopicPartition;
403 class Metadata;
404 class KafkaConsumer;
436 class RD_EXPORT DeliveryReportCb {
437  public:
441  virtual void dr_cb (Message &message) = 0;
442 
443  virtual ~DeliveryReportCb() { }
444 };
445 
446 
454 class RD_EXPORT PartitionerCb {
455  public:
472  virtual int32_t partitioner_cb (const Topic *topic,
473  const std::string *key,
474  int32_t partition_cnt,
475  void *msg_opaque) = 0;
476 
477  virtual ~PartitionerCb() { }
478 };
479 
485  public:
494  virtual int32_t partitioner_cb (const Topic *topic,
495  const void *key,
496  size_t key_len,
497  int32_t partition_cnt,
498  void *msg_opaque) = 0;
499 
500  virtual ~PartitionerKeyPointerCb() { }
501 };
502 
503 
504 
513 class RD_EXPORT EventCb {
514  public:
520  virtual void event_cb (Event &event) = 0;
521 
522  virtual ~EventCb() { }
523 };
524 
525 
529 class RD_EXPORT Event {
530  public:
532  enum Type {
536  EVENT_THROTTLE
537  };
538 
540  enum Severity {
541  EVENT_SEVERITY_EMERG = 0,
542  EVENT_SEVERITY_ALERT = 1,
543  EVENT_SEVERITY_CRITICAL = 2,
544  EVENT_SEVERITY_ERROR = 3,
545  EVENT_SEVERITY_WARNING = 4,
546  EVENT_SEVERITY_NOTICE = 5,
547  EVENT_SEVERITY_INFO = 6,
548  EVENT_SEVERITY_DEBUG = 7
549  };
550 
551  virtual ~Event () { }
552 
553  /*
554  * Event Accessor methods
555  */
556 
561  virtual Type type () const = 0;
562 
567  virtual ErrorCode err () const = 0;
568 
573  virtual Severity severity () const = 0;
574 
579  virtual std::string fac () const = 0;
580 
589  virtual std::string str () const = 0;
590 
595  virtual int throttle_time () const = 0;
596 
601  virtual std::string broker_name () const = 0;
602 
607  virtual int broker_id () const = 0;
608 };
609 
610 
611 
615 class RD_EXPORT ConsumeCb {
616  public:
624  virtual void consume_cb (Message &message, void *opaque) = 0;
625 
626  virtual ~ConsumeCb() { }
627 };
628 
629 
633 class RD_EXPORT RebalanceCb {
634 public:
684  virtual void rebalance_cb (RdKafka::KafkaConsumer *consumer,
685  RdKafka::ErrorCode err,
686  std::vector<TopicPartition*>&partitions) = 0;
687 
688  virtual ~RebalanceCb() { }
689 };
690 
691 
695 class RD_EXPORT OffsetCommitCb {
696 public:
712  virtual void offset_commit_cb(RdKafka::ErrorCode err,
713  std::vector<TopicPartition*>&offsets) = 0;
714 
715  virtual ~OffsetCommitCb() { }
716 };
717 
718 
719 
724 class RD_EXPORT SocketCb {
725  public:
739  virtual int socket_cb (int domain, int type, int protocol) = 0;
740 
741  virtual ~SocketCb() { }
742 };
743 
744 
749 class RD_EXPORT OpenCb {
750  public:
762  virtual int open_cb (const std::string &path, int flags, int mode) = 0;
763 
764  virtual ~OpenCb() { }
765 };
766 
767 
788 class RD_EXPORT Conf {
789  public:
793  enum ConfType {
795  CONF_TOPIC
796  };
797 
801  enum ConfResult {
802  CONF_UNKNOWN = -2,
803  CONF_INVALID = -1,
804  CONF_OK = 0
805  };
806 
807 
811  static Conf *create (ConfType type);
812 
813  virtual ~Conf () { }
814 
828  virtual Conf::ConfResult set (const std::string &name,
829  const std::string &value,
830  std::string &errstr) = 0;
831 
833  virtual Conf::ConfResult set (const std::string &name,
834  DeliveryReportCb *dr_cb,
835  std::string &errstr) = 0;
836 
838  virtual Conf::ConfResult set (const std::string &name,
839  EventCb *event_cb,
840  std::string &errstr) = 0;
841 
849  virtual Conf::ConfResult set (const std::string &name,
850  const Conf *topic_conf,
851  std::string &errstr) = 0;
852 
854  virtual Conf::ConfResult set (const std::string &name,
855  PartitionerCb *partitioner_cb,
856  std::string &errstr) = 0;
857 
859  virtual Conf::ConfResult set (const std::string &name,
860  PartitionerKeyPointerCb *partitioner_kp_cb,
861  std::string &errstr) = 0;
862 
864  virtual Conf::ConfResult set (const std::string &name, SocketCb *socket_cb,
865  std::string &errstr) = 0;
866 
868  virtual Conf::ConfResult set (const std::string &name, OpenCb *open_cb,
869  std::string &errstr) = 0;
870 
872  virtual Conf::ConfResult set (const std::string &name,
873  RebalanceCb *rebalance_cb,
874  std::string &errstr) = 0;
875 
877  virtual Conf::ConfResult set (const std::string &name,
878  OffsetCommitCb *offset_commit_cb,
879  std::string &errstr) = 0;
880 
892  virtual Conf::ConfResult get(const std::string &name,
893  std::string &value) const = 0;
894 
898  virtual Conf::ConfResult get(DeliveryReportCb *&dr_cb) const = 0;
899 
903  virtual Conf::ConfResult get(EventCb *&event_cb) const = 0;
904 
908  virtual Conf::ConfResult get(PartitionerCb *&partitioner_cb) const = 0;
909 
913  virtual Conf::ConfResult get(PartitionerKeyPointerCb *&partitioner_kp_cb) const = 0;
914 
918  virtual Conf::ConfResult get(SocketCb *&socket_cb) const = 0;
919 
923  virtual Conf::ConfResult get(OpenCb *&open_cb) const = 0;
924 
928  virtual Conf::ConfResult get(RebalanceCb *&rebalance_cb) const = 0;
929 
933  virtual Conf::ConfResult get(OffsetCommitCb *&offset_commit_cb) const = 0;
934 
937  virtual std::list<std::string> *dump () = 0;
938 
940  virtual Conf::ConfResult set (const std::string &name, ConsumeCb *consume_cb,
941  std::string &errstr) = 0;
942 };
943 
956 class RD_EXPORT Handle {
957  public:
958  virtual ~Handle() { }
959 
961  virtual const std::string name () const = 0;
962 
971  virtual const std::string memberid () const = 0;
972 
973 
996  virtual int poll (int timeout_ms) = 0;
997 
1004  virtual int outq_len () = 0;
1005 
1021  virtual ErrorCode metadata (bool all_topics, const Topic *only_rkt,
1022  Metadata **metadatap, int timeout_ms) = 0;
1023 
1024 
1034  virtual ErrorCode pause (std::vector<TopicPartition*> &partitions) = 0;
1035 
1036 
1046  virtual ErrorCode resume (std::vector<TopicPartition*> &partitions) = 0;
1047 
1048 
1057  virtual ErrorCode query_watermark_offsets (const std::string &topic,
1058  int32_t partition,
1059  int64_t *low, int64_t *high,
1060  int timeout_ms) = 0;
1061 
1079  virtual ErrorCode get_watermark_offsets (const std::string &topic,
1080  int32_t partition,
1081  int64_t *low, int64_t *high) = 0;
1082 
1083 
1103  virtual ErrorCode offsetsForTimes (std::vector<TopicPartition*> &offsets,
1104  int timeout_ms) = 0;
1105 
1106 
1115  virtual Queue *get_partition_queue (const TopicPartition *partition) = 0;
1116 
1133  virtual ErrorCode set_log_queue (Queue *queue) = 0;
1134 
1146  virtual void yield () = 0;
1147 
1162  virtual const std::string clusterid (int timeout_ms) = 0;
1163 
1180  virtual struct rd_kafka_s *c_ptr () = 0;
1181 
1197  virtual int32_t controllerid (int timeout_ms) = 0;
1198 };
1199 
1200 
1219 class RD_EXPORT TopicPartition {
1220 public:
1227  static TopicPartition *create (const std::string &topic, int partition);
1228  static TopicPartition *create (const std::string &topic, int partition,
1229  int64_t offset);
1230 
1231  virtual ~TopicPartition() = 0;
1232 
1237  static void destroy (std::vector<TopicPartition*> &partitions);
1238 
1240  virtual const std::string &topic () const = 0;
1241 
1243  virtual int partition () const = 0;
1244 
1246  virtual int64_t offset () const = 0;
1247 
1249  virtual void set_offset (int64_t offset) = 0;
1250 
1252  virtual ErrorCode err () const = 0;
1253 };
1254 
1255 
1256 
1261 class RD_EXPORT Topic {
1262  public:
1269  static const int32_t PARTITION_UA;
1270 
1272  static const int64_t OFFSET_BEGINNING;
1273  static const int64_t OFFSET_END;
1274  static const int64_t OFFSET_STORED;
1275  static const int64_t OFFSET_INVALID;
1287  static Topic *create (Handle *base, const std::string &topic_str,
1288  Conf *conf, std::string &errstr);
1289 
1290  virtual ~Topic () = 0;
1291 
1292 
1294  virtual const std::string name () const = 0;
1295 
1301  virtual bool partition_available (int32_t partition) const = 0;
1302 
1313  virtual ErrorCode offset_store (int32_t partition, int64_t offset) = 0;
1314 
1331  virtual struct rd_kafka_topic_s *c_ptr () = 0;
1332 };
1333 
1334 
1356 class RD_EXPORT MessageTimestamp {
1357 public:
1361  MSG_TIMESTAMP_LOG_APPEND_TIME
1362  };
1363 
1365  int64_t timestamp;
1366 };
1367 
1368 
1369 
1381 class RD_EXPORT Message {
1382  public:
1390  virtual std::string errstr() const = 0;
1391 
1393  virtual ErrorCode err () const = 0;
1394 
1399  virtual Topic *topic () const = 0;
1400 
1402  virtual std::string topic_name () const = 0;
1403 
1405  virtual int32_t partition () const = 0;
1406 
1408  virtual void *payload () const = 0 ;
1409 
1411  virtual size_t len () const = 0;
1412 
1414  virtual const std::string *key () const = 0;
1415 
1417  virtual const void *key_pointer () const = 0 ;
1418 
1420  virtual size_t key_len () const = 0;
1421 
1423  virtual int64_t offset () const = 0;
1424 
1426  virtual MessageTimestamp timestamp () const = 0;
1427 
1429  virtual void *msg_opaque () const = 0;
1430 
1431  virtual ~Message () = 0;
1432 
1435  virtual int64_t latency () const = 0;
1436 
1453  virtual struct rd_kafka_message_s *c_ptr () = 0;
1454 };
1455 
1479 class RD_EXPORT Queue {
1480  public:
1484  static Queue *create (Handle *handle);
1485 
1496  virtual ErrorCode forward (Queue *dst) = 0;
1497 
1498 
1510  virtual Message *consume (int timeout_ms) = 0;
1511 
1519  virtual int poll (int timeout_ms) = 0;
1520 
1521  virtual ~Queue () = 0;
1522 
1538  virtual void io_event_enable (int fd, const void *payload, size_t size) = 0;
1539 };
1540 
1559 class RD_EXPORT KafkaConsumer : public virtual Handle {
1560 public:
1572  static KafkaConsumer *create (Conf *conf, std::string &errstr);
1573 
1574  virtual ~KafkaConsumer () = 0;
1575 
1576 
1579  virtual ErrorCode assignment (std::vector<RdKafka::TopicPartition*> &partitions) = 0;
1580 
1583  virtual ErrorCode subscription (std::vector<std::string> &topics) = 0;
1584 
1609  virtual ErrorCode subscribe (const std::vector<std::string> &topics) = 0;
1610 
1612  virtual ErrorCode unsubscribe () = 0;
1613 
1620  virtual ErrorCode assign (const std::vector<TopicPartition*> &partitions) = 0;
1621 
1625  virtual ErrorCode unassign () = 0;
1626 
1651  virtual Message *consume (int timeout_ms) = 0;
1652 
1666  virtual ErrorCode commitSync () = 0;
1667 
1673  virtual ErrorCode commitAsync () = 0;
1674 
1682  virtual ErrorCode commitSync (Message *message) = 0;
1683 
1691  virtual ErrorCode commitAsync (Message *message) = 0;
1692 
1698  virtual ErrorCode commitSync (std::vector<TopicPartition*> &offsets) = 0;
1699 
1705  virtual ErrorCode commitAsync (const std::vector<TopicPartition*> &offsets) = 0;
1706 
1717  virtual ErrorCode commitSync (OffsetCommitCb *offset_commit_cb) = 0;
1718 
1729  virtual ErrorCode commitSync (std::vector<TopicPartition*> &offsets,
1730  OffsetCommitCb *offset_commit_cb) = 0;
1731 
1732 
1733 
1734 
1743  virtual ErrorCode committed (std::vector<TopicPartition*> &partitions,
1744  int timeout_ms) = 0;
1745 
1754  virtual ErrorCode position (std::vector<TopicPartition*> &partitions) = 0;
1755 
1756 
1779  virtual ErrorCode close () = 0;
1780 
1781 
1799  virtual ErrorCode seek (const TopicPartition &partition, int timeout_ms) = 0;
1800 
1801 
1816  virtual ErrorCode offsets_store (std::vector<TopicPartition*> &offsets) = 0;
1817 };
1818 
1819 
1834 class RD_EXPORT Consumer : public virtual Handle {
1835  public:
1846  static Consumer *create (Conf *conf, std::string &errstr);
1847 
1848  virtual ~Consumer () = 0;
1849 
1850 
1870  virtual ErrorCode start (Topic *topic, int32_t partition, int64_t offset) = 0;
1871 
1878  virtual ErrorCode start (Topic *topic, int32_t partition, int64_t offset,
1879  Queue *queue) = 0;
1880 
1890  virtual ErrorCode stop (Topic *topic, int32_t partition) = 0;
1891 
1906  virtual ErrorCode seek (Topic *topic, int32_t partition, int64_t offset,
1907  int timeout_ms) = 0;
1908 
1926  virtual Message *consume (Topic *topic, int32_t partition,
1927  int timeout_ms) = 0;
1928 
1950  virtual Message *consume (Queue *queue, int timeout_ms) = 0;
1951 
1971  virtual int consume_callback (Topic *topic, int32_t partition,
1972  int timeout_ms,
1973  ConsumeCb *consume_cb,
1974  void *opaque) = 0;
1975 
1982  virtual int consume_callback (Queue *queue, int timeout_ms,
1983  RdKafka::ConsumeCb *consume_cb,
1984  void *opaque) = 0;
1985 
1995  static int64_t OffsetTail(int64_t offset);
1996 };
1997 
2011 class RD_EXPORT Producer : public virtual Handle {
2012  public:
2023  static Producer *create (Conf *conf, std::string &errstr);
2024 
2025 
2026  virtual ~Producer () = 0;
2027 
2033  enum {
2034  RK_MSG_FREE = 0x1,
2036  RK_MSG_COPY = 0x2,
2040  RK_MSG_BLOCK = 0x4
2057  /* For backwards compatibility: */
2058 #ifndef MSG_COPY /* defined in sys/msg.h */
2059  ,
2062  MSG_FREE = RK_MSG_FREE,
2063  MSG_COPY = RK_MSG_COPY
2064 #endif
2065 
2066  };
2067 
2124  virtual ErrorCode produce (Topic *topic, int32_t partition,
2125  int msgflags,
2126  void *payload, size_t len,
2127  const std::string *key,
2128  void *msg_opaque) = 0;
2129 
2134  virtual ErrorCode produce (Topic *topic, int32_t partition,
2135  int msgflags,
2136  void *payload, size_t len,
2137  const void *key, size_t key_len,
2138  void *msg_opaque) = 0;
2139 
2146  virtual ErrorCode produce (const std::string topic_name, int32_t partition,
2147  int msgflags,
2148  void *payload, size_t len,
2149  const void *key, size_t key_len,
2150  int64_t timestamp,
2151  void *msg_opaque) = 0;
2152 
2153 
2158  virtual ErrorCode produce (Topic *topic, int32_t partition,
2159  const std::vector<char> *payload,
2160  const std::vector<char> *key,
2161  void *msg_opaque) = 0;
2162 
2163 
2175  virtual ErrorCode flush (int timeout_ms) = 0;
2176 };
2177 
2192  public:
2194  virtual int32_t id() const = 0;
2195 
2197  virtual const std::string host() const = 0;
2198 
2200  virtual int port() const = 0;
2201 
2202  virtual ~BrokerMetadata() = 0;
2203 };
2204 
2205 
2206 
2211  public:
2213  typedef std::vector<int32_t> ReplicasVector;
2215  typedef std::vector<int32_t> ISRSVector;
2216 
2218  typedef ReplicasVector::const_iterator ReplicasIterator;
2220  typedef ISRSVector::const_iterator ISRSIterator;
2221 
2222 
2224  virtual int32_t id() const = 0;
2225 
2227  virtual ErrorCode err() const = 0;
2228 
2230  virtual int32_t leader() const = 0;
2231 
2233  virtual const std::vector<int32_t> *replicas() const = 0;
2234 
2238  virtual const std::vector<int32_t> *isrs() const = 0;
2239 
2240  virtual ~PartitionMetadata() = 0;
2241 };
2242 
2243 
2244 
2249  public:
2251  typedef std::vector<const PartitionMetadata*> PartitionMetadataVector;
2253  typedef PartitionMetadataVector::const_iterator PartitionMetadataIterator;
2254 
2256  virtual const std::string topic() const = 0;
2257 
2259  virtual const PartitionMetadataVector *partitions() const = 0;
2260 
2262  virtual ErrorCode err() const = 0;
2263 
2264  virtual ~TopicMetadata() = 0;
2265 };
2266 
2267 
2271 class Metadata {
2272  public:
2274  typedef std::vector<const BrokerMetadata*> BrokerMetadataVector;
2276  typedef std::vector<const TopicMetadata*> TopicMetadataVector;
2277 
2279  typedef BrokerMetadataVector::const_iterator BrokerMetadataIterator;
2281  typedef TopicMetadataVector::const_iterator TopicMetadataIterator;
2282 
2283 
2285  virtual const BrokerMetadataVector *brokers() const = 0;
2286 
2288  virtual const TopicMetadataVector *topics() const = 0;
2289 
2291  virtual int32_t orig_broker_id() const = 0;
2292 
2294  virtual const std::string orig_broker_name() const = 0;
2295 
2296  virtual ~Metadata() = 0;
2297 };
2298 
2301 }
2302 
2303 #endif /* _RDKAFKACPP_H_ */
virtual int32_t partitioner_cb(const Topic *topic, const void *key, size_t key_len, int32_t partition_cnt, void *msg_opaque)=0
Variant partitioner callback that gets key as pointer and length instead of as a const std::string *...
virtual ErrorCode err() const =0
virtual const std::string orig_broker_name() const =0
Broker (name) originating this metadata.
ISRSVector::const_iterator ISRSIterator
ISRs iterator.
Definition: rdkafkacpp.h:2220
High-level KafkaConsumer (for brokers 0.9 and later)
Definition: rdkafkacpp.h:1559
static const int64_t OFFSET_BEGINNING
Special offsets.
Definition: rdkafkacpp.h:1272
ConfType
Configuration object type.
Definition: rdkafkacpp.h:793
Partitioner callback class.
Definition: rdkafkacpp.h:454
virtual int port() const =0
Type
Event type.
Definition: rdkafkacpp.h:532
virtual int32_t orig_broker_id() const =0
Broker (id) originating this metadata.
virtual const std::string host() const =0
std::vector< const PartitionMetadata * > PartitionMetadataVector
Partitions.
Definition: rdkafkacpp.h:2251
static const int32_t PARTITION_UA
Unassigned partition.
Definition: rdkafkacpp.h:1269
Message object.
Definition: rdkafkacpp.h:1381
Severity
EVENT_LOG severities (conforms to syslog(3) severities)
Definition: rdkafkacpp.h:540
Event callback class.
Definition: rdkafkacpp.h:513
ReplicasVector::const_iterator ReplicasIterator
Replicas iterator.
Definition: rdkafkacpp.h:2218
KafkaConsunmer: Rebalance callback class
Definition: rdkafkacpp.h:633
std::vector< int32_t > ReplicasVector
Replicas.
Definition: rdkafkacpp.h:2213
virtual const std::string topic() const =0
int64_t timestamp
Definition: rdkafkacpp.h:1365
Variant partitioner with key pointer.
Definition: rdkafkacpp.h:484
virtual int32_t id() const =0
BrokerMetadataVector::const_iterator BrokerMetadataIterator
Brokers iterator.
Definition: rdkafkacpp.h:2279
Definition: rdkafkacpp.h:535
static const int64_t OFFSET_INVALID
Definition: rdkafkacpp.h:1275
virtual const PartitionMetadataVector * partitions() const =0
virtual const BrokerMetadataVector * brokers() const =0
Broker list.
Topic handle.
Definition: rdkafkacpp.h:1261
Metadata: Partition information.
Definition: rdkafkacpp.h:2210
Producer.
Definition: rdkafkacpp.h:2011
Metadata: Topic information.
Definition: rdkafkacpp.h:2248
Definition: rdkafkacpp.h:794
Definition: rdkafkacpp.h:533
std::vector< const BrokerMetadata * > BrokerMetadataVector
Brokers.
Definition: rdkafkacpp.h:2274
ConfResult
RdKafka::Conf::Set() result code.
Definition: rdkafkacpp.h:801
Definition: rdkafkacpp.h:80
Queue interface.
Definition: rdkafkacpp.h:1479
MessageTimestampType type
Definition: rdkafkacpp.h:1364
Message timestamp object.
Definition: rdkafkacpp.h:1356
std::vector< int32_t > ISRSVector
ISRs (In-Sync-Replicas)
Definition: rdkafkacpp.h:2215
virtual const TopicMetadataVector * topics() const =0
Topic list.
static const int64_t OFFSET_END
Definition: rdkafkacpp.h:1273
PartitionMetadataVector::const_iterator PartitionMetadataIterator
Partitions iterator.
Definition: rdkafkacpp.h:2253
Portability: OpenCb callback class
Definition: rdkafkacpp.h:749
Metadata: Broker information.
Definition: rdkafkacpp.h:2191
Definition: rdkafkacpp.h:534
virtual int32_t leader() const =0
static const int64_t OFFSET_STORED
Definition: rdkafkacpp.h:1274
Configuration interface.
Definition: rdkafkacpp.h:788
Offset Commit callback class.
Definition: rdkafkacpp.h:695
virtual const std::vector< int32_t > * replicas() const =0
Base handle, super class for specific clients.
Definition: rdkafkacpp.h:956
std::vector< const TopicMetadata * > TopicMetadataVector
Topics.
Definition: rdkafkacpp.h:2276
Topic+Partition.
Definition: rdkafkacpp.h:1219
Consume callback class.
Definition: rdkafkacpp.h:615
virtual const std::vector< int32_t > * isrs() const =0
Simple Consumer (legacy)
Definition: rdkafkacpp.h:1834
MessageTimestampType
Definition: rdkafkacpp.h:1358
TopicMetadataVector::const_iterator TopicMetadataIterator
Topics iterator.
Definition: rdkafkacpp.h:2281
Event object class as passed to the EventCb callback.
Definition: rdkafkacpp.h:529
Metadata container.
Definition: rdkafkacpp.h:2271
virtual ErrorCode err() const =0
Portability: SocketCb callback class
Definition: rdkafkacpp.h:724
Delivery Report callback class.
Definition: rdkafkacpp.h:436
virtual int32_t id() const =0