librdkafka
The Apache Kafka C/C++ client library
 All Data Structures Files Functions Variables Typedefs Enumerations Enumerator Macros Pages
rdkafkacpp.h
Go to the documentation of this file.
1 /*
2  * librdkafka - Apache Kafka C/C++ library
3  *
4  * Copyright (c) 2014 Magnus Edenhill
5  * All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions are met:
9  *
10  * 1. Redistributions of source code must retain the above copyright notice,
11  * this list of conditions and the following disclaimer.
12  * 2. Redistributions in binary form must reproduce the above copyright notice,
13  * this list of conditions and the following disclaimer in the documentation
14  * and/or other materials provided with the distribution.
15  *
16  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26  * POSSIBILITY OF SUCH DAMAGE.
27  */
28 
29 #ifndef _RDKAFKACPP_H_
30 #define _RDKAFKACPP_H_
31 
50 #include <string>
51 #include <list>
52 #include <vector>
53 #include <cstdlib>
54 #include <cstring>
55 #include <stdint.h>
56 #include <sys/types.h>
57 
58 #ifdef _WIN32
59 #ifndef ssize_t
60 #ifndef _BASETSD_H_
61 #include <basetsd.h>
62 #endif
63 #ifndef _SSIZE_T_DEFINED
64 #define _SSIZE_T_DEFINED
65 typedef SSIZE_T ssize_t;
66 #endif
67 #endif
68 #undef RD_EXPORT
69 #ifdef LIBRDKAFKA_STATICLIB
70 #define RD_EXPORT
71 #else
72 #ifdef LIBRDKAFKACPP_EXPORTS
73 #define RD_EXPORT __declspec(dllexport)
74 #else
75 #define RD_EXPORT __declspec(dllimport)
76 #endif
77 #endif
78 #else
79 #define RD_EXPORT
80 #endif
81 
84 extern "C" {
85  /* Forward declarations */
86  struct rd_kafka_s;
87  struct rd_kafka_topic_s;
88  struct rd_kafka_message_s;
89  struct rd_kafka_conf_s;
90  struct rd_kafka_topic_conf_s;
91 }
92 
93 namespace RdKafka {
94 
114 #define RD_KAFKA_VERSION 0x010700ff
115 
121 RD_EXPORT
122 int version ();
123 
127 RD_EXPORT
128 std::string version_str();
129 
134 RD_EXPORT
135 std::string get_debug_contexts();
136 
146 RD_EXPORT
147 int wait_destroyed(int timeout_ms);
148 
159 RD_EXPORT
160 void *mem_malloc (size_t size);
161 
175 RD_EXPORT
176 void mem_free (void *ptr);
177 
200 enum ErrorCode {
201  /* Internal errors to rdkafka: */
203  ERR__BEGIN = -200,
205  ERR__BAD_MSG = -199,
207  ERR__BAD_COMPRESSION = -198,
209  ERR__DESTROY = -197,
211  ERR__FAIL = -196,
213  ERR__TRANSPORT = -195,
215  ERR__CRIT_SYS_RESOURCE = -194,
217  ERR__RESOLVE = -193,
219  ERR__MSG_TIMED_OUT = -192,
224  ERR__PARTITION_EOF = -191,
226  ERR__UNKNOWN_PARTITION = -190,
228  ERR__FS = -189,
230  ERR__UNKNOWN_TOPIC = -188,
232  ERR__ALL_BROKERS_DOWN = -187,
234  ERR__INVALID_ARG = -186,
236  ERR__TIMED_OUT = -185,
238  ERR__QUEUE_FULL = -184,
240  ERR__ISR_INSUFF = -183,
242  ERR__NODE_UPDATE = -182,
244  ERR__SSL = -181,
246  ERR__WAIT_COORD = -180,
248  ERR__UNKNOWN_GROUP = -179,
250  ERR__IN_PROGRESS = -178,
252  ERR__PREV_IN_PROGRESS = -177,
254  ERR__EXISTING_SUBSCRIPTION = -176,
256  ERR__ASSIGN_PARTITIONS = -175,
258  ERR__REVOKE_PARTITIONS = -174,
260  ERR__CONFLICT = -173,
262  ERR__STATE = -172,
264  ERR__UNKNOWN_PROTOCOL = -171,
266  ERR__NOT_IMPLEMENTED = -170,
268  ERR__AUTHENTICATION = -169,
270  ERR__NO_OFFSET = -168,
272  ERR__OUTDATED = -167,
274  ERR__TIMED_OUT_QUEUE = -166,
276  ERR__UNSUPPORTED_FEATURE = -165,
278  ERR__WAIT_CACHE = -164,
280  ERR__INTR = -163,
282  ERR__KEY_SERIALIZATION = -162,
284  ERR__VALUE_SERIALIZATION = -161,
286  ERR__KEY_DESERIALIZATION = -160,
288  ERR__VALUE_DESERIALIZATION = -159,
290  ERR__PARTIAL = -158,
292  ERR__READ_ONLY = -157,
294  ERR__NOENT = -156,
296  ERR__UNDERFLOW = -155,
298  ERR__INVALID_TYPE = -154,
300  ERR__RETRY = -153,
302  ERR__PURGE_QUEUE = -152,
304  ERR__PURGE_INFLIGHT = -151,
306  ERR__FATAL = -150,
308  ERR__INCONSISTENT = -149,
310  ERR__GAPLESS_GUARANTEE = -148,
312  ERR__MAX_POLL_EXCEEDED = -147,
314  ERR__UNKNOWN_BROKER = -146,
316  ERR__NOT_CONFIGURED = -145,
318  ERR__FENCED = -144,
320  ERR__APPLICATION = -143,
322  ERR__ASSIGNMENT_LOST = -142,
324  ERR__NOOP = -141,
326  ERR__AUTO_OFFSET_RESET = -140,
327 
329  ERR__END = -100,
330 
331  /* Kafka broker errors: */
333  ERR_UNKNOWN = -1,
335  ERR_NO_ERROR = 0,
337  ERR_OFFSET_OUT_OF_RANGE = 1,
339  ERR_INVALID_MSG = 2,
341  ERR_UNKNOWN_TOPIC_OR_PART = 3,
343  ERR_INVALID_MSG_SIZE = 4,
345  ERR_LEADER_NOT_AVAILABLE = 5,
347  ERR_NOT_LEADER_FOR_PARTITION = 6,
349  ERR_REQUEST_TIMED_OUT = 7,
351  ERR_BROKER_NOT_AVAILABLE = 8,
353  ERR_REPLICA_NOT_AVAILABLE = 9,
355  ERR_MSG_SIZE_TOO_LARGE = 10,
357  ERR_STALE_CTRL_EPOCH = 11,
359  ERR_OFFSET_METADATA_TOO_LARGE = 12,
361  ERR_NETWORK_EXCEPTION = 13,
363  ERR_COORDINATOR_LOAD_IN_PROGRESS = 14,
365 #define ERR_GROUP_LOAD_IN_PROGRESS ERR_COORDINATOR_LOAD_IN_PROGRESS
366 
367  ERR_COORDINATOR_NOT_AVAILABLE = 15,
369 #define ERR_GROUP_COORDINATOR_NOT_AVAILABLE ERR_COORDINATOR_NOT_AVAILABLE
370 
371  ERR_NOT_COORDINATOR = 16,
373 #define ERR_NOT_COORDINATOR_FOR_GROUP ERR_NOT_COORDINATOR
374 
375  ERR_TOPIC_EXCEPTION = 17,
377  ERR_RECORD_LIST_TOO_LARGE = 18,
379  ERR_NOT_ENOUGH_REPLICAS = 19,
381  ERR_NOT_ENOUGH_REPLICAS_AFTER_APPEND = 20,
383  ERR_INVALID_REQUIRED_ACKS = 21,
385  ERR_ILLEGAL_GENERATION = 22,
387  ERR_INCONSISTENT_GROUP_PROTOCOL = 23,
389  ERR_INVALID_GROUP_ID = 24,
391  ERR_UNKNOWN_MEMBER_ID = 25,
393  ERR_INVALID_SESSION_TIMEOUT = 26,
395  ERR_REBALANCE_IN_PROGRESS = 27,
397  ERR_INVALID_COMMIT_OFFSET_SIZE = 28,
399  ERR_TOPIC_AUTHORIZATION_FAILED = 29,
401  ERR_GROUP_AUTHORIZATION_FAILED = 30,
403  ERR_CLUSTER_AUTHORIZATION_FAILED = 31,
405  ERR_INVALID_TIMESTAMP = 32,
407  ERR_UNSUPPORTED_SASL_MECHANISM = 33,
409  ERR_ILLEGAL_SASL_STATE = 34,
411  ERR_UNSUPPORTED_VERSION = 35,
413  ERR_TOPIC_ALREADY_EXISTS = 36,
415  ERR_INVALID_PARTITIONS = 37,
417  ERR_INVALID_REPLICATION_FACTOR = 38,
419  ERR_INVALID_REPLICA_ASSIGNMENT = 39,
421  ERR_INVALID_CONFIG = 40,
423  ERR_NOT_CONTROLLER = 41,
425  ERR_INVALID_REQUEST = 42,
427  ERR_UNSUPPORTED_FOR_MESSAGE_FORMAT = 43,
429  ERR_POLICY_VIOLATION = 44,
431  ERR_OUT_OF_ORDER_SEQUENCE_NUMBER = 45,
433  ERR_DUPLICATE_SEQUENCE_NUMBER = 46,
435  ERR_INVALID_PRODUCER_EPOCH = 47,
437  ERR_INVALID_TXN_STATE = 48,
440  ERR_INVALID_PRODUCER_ID_MAPPING = 49,
443  ERR_INVALID_TRANSACTION_TIMEOUT = 50,
446  ERR_CONCURRENT_TRANSACTIONS = 51,
450  ERR_TRANSACTION_COORDINATOR_FENCED = 52,
452  ERR_TRANSACTIONAL_ID_AUTHORIZATION_FAILED = 53,
454  ERR_SECURITY_DISABLED = 54,
456  ERR_OPERATION_NOT_ATTEMPTED = 55,
458  ERR_KAFKA_STORAGE_ERROR = 56,
460  ERR_LOG_DIR_NOT_FOUND = 57,
462  ERR_SASL_AUTHENTICATION_FAILED = 58,
464  ERR_UNKNOWN_PRODUCER_ID = 59,
466  ERR_REASSIGNMENT_IN_PROGRESS = 60,
468  ERR_DELEGATION_TOKEN_AUTH_DISABLED = 61,
470  ERR_DELEGATION_TOKEN_NOT_FOUND = 62,
472  ERR_DELEGATION_TOKEN_OWNER_MISMATCH = 63,
474  ERR_DELEGATION_TOKEN_REQUEST_NOT_ALLOWED = 64,
476  ERR_DELEGATION_TOKEN_AUTHORIZATION_FAILED = 65,
478  ERR_DELEGATION_TOKEN_EXPIRED = 66,
480  ERR_INVALID_PRINCIPAL_TYPE = 67,
482  ERR_NON_EMPTY_GROUP = 68,
484  ERR_GROUP_ID_NOT_FOUND = 69,
486  ERR_FETCH_SESSION_ID_NOT_FOUND = 70,
488  ERR_INVALID_FETCH_SESSION_EPOCH = 71,
490  ERR_LISTENER_NOT_FOUND = 72,
492  ERR_TOPIC_DELETION_DISABLED = 73,
494  ERR_FENCED_LEADER_EPOCH = 74,
496  ERR_UNKNOWN_LEADER_EPOCH = 75,
498  ERR_UNSUPPORTED_COMPRESSION_TYPE = 76,
500  ERR_STALE_BROKER_EPOCH = 77,
502  ERR_OFFSET_NOT_AVAILABLE = 78,
504  ERR_MEMBER_ID_REQUIRED = 79,
506  ERR_PREFERRED_LEADER_NOT_AVAILABLE = 80,
508  ERR_GROUP_MAX_SIZE_REACHED = 81,
511  ERR_FENCED_INSTANCE_ID = 82,
513  ERR_ELIGIBLE_LEADERS_NOT_AVAILABLE = 83,
515  ERR_ELECTION_NOT_NEEDED = 84,
517  ERR_NO_REASSIGNMENT_IN_PROGRESS = 85,
520  ERR_GROUP_SUBSCRIBED_TO_TOPIC = 86,
522  ERR_INVALID_RECORD = 87,
524  ERR_UNSTABLE_OFFSET_COMMIT = 88,
526  ERR_THROTTLING_QUOTA_EXCEEDED = 89,
529  ERR_PRODUCER_FENCED = 90,
531  ERR_RESOURCE_NOT_FOUND = 91,
533  ERR_DUPLICATE_RESOURCE = 92,
535  ERR_UNACCEPTABLE_CREDENTIAL = 93,
538  ERR_INCONSISTENT_VOTER_SET = 94,
540  ERR_INVALID_UPDATE_VERSION = 95,
542  ERR_FEATURE_UPDATE_FAILED = 96,
544  ERR_PRINCIPAL_DESERIALIZATION_FAILURE = 97
545 };
546 
547 
551 RD_EXPORT
552 std::string err2str(RdKafka::ErrorCode err);
553 
554 
555 
560 enum CertificateType {
561  CERT_PUBLIC_KEY,
562  CERT_PRIVATE_KEY,
563  CERT_CA,
564  CERT__CNT
565 };
566 
571 enum CertificateEncoding {
572  CERT_ENC_PKCS12,
573  CERT_ENC_DER,
574  CERT_ENC_PEM,
575  CERT_ENC__CNT
576 };
577 
583 /* Forward declarations */
584 class Handle;
585 class Producer;
586 class Message;
587 class Headers;
588 class Queue;
589 class Event;
590 class Topic;
591 class TopicPartition;
592 class Metadata;
593 class KafkaConsumer;
613 class RD_EXPORT Error {
614  public:
615 
619  static Error *create (ErrorCode code, const std::string *errstr);
620 
621  virtual ~Error () { }
622 
623  /*
624  * Error accessor methods
625  */
626 
630  virtual ErrorCode code () const = 0;
631 
635  virtual std::string name () const = 0;
636 
640  virtual std::string str () const = 0;
641 
646  virtual bool is_fatal () const = 0;
647 
651  virtual bool is_retriable () const = 0;
652 
664  virtual bool txn_requires_abort () const = 0;
665 };
666 
698 class RD_EXPORT DeliveryReportCb {
699  public:
703  virtual void dr_cb (Message &message) = 0;
704 
705  virtual ~DeliveryReportCb() { }
706 };
707 
708 
736 class RD_EXPORT OAuthBearerTokenRefreshCb {
737  public:
745  virtual void oauthbearer_token_refresh_cb (RdKafka::Handle* handle,
746  const std::string &oauthbearer_config) = 0;
747 
748  virtual ~OAuthBearerTokenRefreshCb() { }
749 };
750 
751 
759 class RD_EXPORT PartitionerCb {
760  public:
777  virtual int32_t partitioner_cb (const Topic *topic,
778  const std::string *key,
779  int32_t partition_cnt,
780  void *msg_opaque) = 0;
781 
782  virtual ~PartitionerCb() { }
783 };
784 
790  public:
799  virtual int32_t partitioner_cb (const Topic *topic,
800  const void *key,
801  size_t key_len,
802  int32_t partition_cnt,
803  void *msg_opaque) = 0;
804 
805  virtual ~PartitionerKeyPointerCb() { }
806 };
807 
808 
809 
818 class RD_EXPORT EventCb {
819  public:
825  virtual void event_cb (Event &event) = 0;
826 
827  virtual ~EventCb() { }
828 };
829 
830 
834 class RD_EXPORT Event {
835  public:
837  enum Type {
841  EVENT_THROTTLE
842  };
843 
845  enum Severity {
846  EVENT_SEVERITY_EMERG = 0,
847  EVENT_SEVERITY_ALERT = 1,
848  EVENT_SEVERITY_CRITICAL = 2,
849  EVENT_SEVERITY_ERROR = 3,
850  EVENT_SEVERITY_WARNING = 4,
851  EVENT_SEVERITY_NOTICE = 5,
852  EVENT_SEVERITY_INFO = 6,
853  EVENT_SEVERITY_DEBUG = 7
854  };
855 
856  virtual ~Event () { }
857 
858  /*
859  * Event Accessor methods
860  */
861 
866  virtual Type type () const = 0;
867 
872  virtual ErrorCode err () const = 0;
873 
878  virtual Severity severity () const = 0;
879 
884  virtual std::string fac () const = 0;
885 
894  virtual std::string str () const = 0;
895 
900  virtual int throttle_time () const = 0;
901 
906  virtual std::string broker_name () const = 0;
907 
912  virtual int broker_id () const = 0;
913 
914 
920  virtual bool fatal () const = 0;
921 };
922 
923 
924 
928 class RD_EXPORT ConsumeCb {
929  public:
937  virtual void consume_cb (Message &message, void *opaque) = 0;
938 
939  virtual ~ConsumeCb() { }
940 };
941 
942 
946 class RD_EXPORT RebalanceCb {
947 public:
1016  virtual void rebalance_cb (RdKafka::KafkaConsumer *consumer,
1017  RdKafka::ErrorCode err,
1018  std::vector<TopicPartition*>&partitions) = 0;
1019 
1020  virtual ~RebalanceCb() { }
1021 };
1022 
1023 
1027 class RD_EXPORT OffsetCommitCb {
1028 public:
1044  virtual void offset_commit_cb(RdKafka::ErrorCode err,
1045  std::vector<TopicPartition*>&offsets) = 0;
1046 
1047  virtual ~OffsetCommitCb() { }
1048 };
1049 
1050 
1051 
1057 class RD_EXPORT SslCertificateVerifyCb {
1058 public:
1095  virtual bool ssl_cert_verify_cb (const std::string &broker_name,
1096  int32_t broker_id,
1097  int *x509_error,
1098  int depth,
1099  const char *buf, size_t size,
1100  std::string &errstr) = 0;
1101 
1102  virtual ~SslCertificateVerifyCb() {}
1103 };
1104 
1105 
1110 class RD_EXPORT SocketCb {
1111  public:
1125  virtual int socket_cb (int domain, int type, int protocol) = 0;
1126 
1127  virtual ~SocketCb() { }
1128 };
1129 
1130 
1135 class RD_EXPORT OpenCb {
1136  public:
1148  virtual int open_cb (const std::string &path, int flags, int mode) = 0;
1149 
1150  virtual ~OpenCb() { }
1151 };
1152 
1153 
1174 class RD_EXPORT Conf {
1175  public:
1179  enum ConfType {
1181  CONF_TOPIC
1182  };
1183 
1187  enum ConfResult {
1188  CONF_UNKNOWN = -2,
1189  CONF_INVALID = -1,
1190  CONF_OK = 0
1191  };
1192 
1193 
1197  static Conf *create (ConfType type);
1198 
1199  virtual ~Conf () { }
1200 
1214  virtual Conf::ConfResult set (const std::string &name,
1215  const std::string &value,
1216  std::string &errstr) = 0;
1217 
1219  virtual Conf::ConfResult set (const std::string &name,
1220  DeliveryReportCb *dr_cb,
1221  std::string &errstr) = 0;
1222 
1224  virtual Conf::ConfResult set (const std::string &name,
1225  OAuthBearerTokenRefreshCb *oauthbearer_token_refresh_cb,
1226  std::string &errstr) = 0;
1227 
1229  virtual Conf::ConfResult set (const std::string &name,
1230  EventCb *event_cb,
1231  std::string &errstr) = 0;
1232 
1240  virtual Conf::ConfResult set (const std::string &name,
1241  const Conf *topic_conf,
1242  std::string &errstr) = 0;
1243 
1245  virtual Conf::ConfResult set (const std::string &name,
1246  PartitionerCb *partitioner_cb,
1247  std::string &errstr) = 0;
1248 
1250  virtual Conf::ConfResult set (const std::string &name,
1251  PartitionerKeyPointerCb *partitioner_kp_cb,
1252  std::string &errstr) = 0;
1253 
1255  virtual Conf::ConfResult set (const std::string &name, SocketCb *socket_cb,
1256  std::string &errstr) = 0;
1257 
1259  virtual Conf::ConfResult set (const std::string &name, OpenCb *open_cb,
1260  std::string &errstr) = 0;
1261 
1263  virtual Conf::ConfResult set (const std::string &name,
1264  RebalanceCb *rebalance_cb,
1265  std::string &errstr) = 0;
1266 
1268  virtual Conf::ConfResult set (const std::string &name,
1269  OffsetCommitCb *offset_commit_cb,
1270  std::string &errstr) = 0;
1271 
1276  virtual Conf::ConfResult set(const std::string &name,
1277  SslCertificateVerifyCb *ssl_cert_verify_cb,
1278  std::string &errstr) = 0;
1279 
1309  virtual Conf::ConfResult set_ssl_cert (RdKafka::CertificateType cert_type,
1310  RdKafka::CertificateEncoding cert_enc,
1311  const void *buffer, size_t size,
1312  std::string &errstr) = 0;
1313 
1325  virtual Conf::ConfResult get(const std::string &name,
1326  std::string &value) const = 0;
1327 
1331  virtual Conf::ConfResult get(DeliveryReportCb *&dr_cb) const = 0;
1332 
1336  virtual Conf::ConfResult get(
1337  OAuthBearerTokenRefreshCb *&oauthbearer_token_refresh_cb) const = 0;
1338 
1342  virtual Conf::ConfResult get(EventCb *&event_cb) const = 0;
1343 
1347  virtual Conf::ConfResult get(PartitionerCb *&partitioner_cb) const = 0;
1348 
1352  virtual Conf::ConfResult get(PartitionerKeyPointerCb *&partitioner_kp_cb) const = 0;
1353 
1357  virtual Conf::ConfResult get(SocketCb *&socket_cb) const = 0;
1358 
1362  virtual Conf::ConfResult get(OpenCb *&open_cb) const = 0;
1363 
1367  virtual Conf::ConfResult get(RebalanceCb *&rebalance_cb) const = 0;
1368 
1372  virtual Conf::ConfResult get(OffsetCommitCb *&offset_commit_cb) const = 0;
1373 
1375  virtual Conf::ConfResult get(SslCertificateVerifyCb *&ssl_cert_verify_cb) const = 0;
1376 
1379  virtual std::list<std::string> *dump () = 0;
1380 
1382  virtual Conf::ConfResult set (const std::string &name, ConsumeCb *consume_cb,
1383  std::string &errstr) = 0;
1384 
1401  virtual struct rd_kafka_conf_s *c_ptr_global () = 0;
1402 
1420  virtual struct rd_kafka_topic_conf_s *c_ptr_topic () = 0;
1421 
1434  virtual Conf::ConfResult set_engine_callback_data (void *value,
1435  std::string &errstr) = 0;
1436 };
1437 
1450 class RD_EXPORT Handle {
1451  public:
1452  virtual ~Handle() { }
1453 
1455  virtual const std::string name () const = 0;
1456 
1465  virtual const std::string memberid () const = 0;
1466 
1467 
1490  virtual int poll (int timeout_ms) = 0;
1491 
1498  virtual int outq_len () = 0;
1499 
1515  virtual ErrorCode metadata (bool all_topics, const Topic *only_rkt,
1516  Metadata **metadatap, int timeout_ms) = 0;
1517 
1518 
1528  virtual ErrorCode pause (std::vector<TopicPartition*> &partitions) = 0;
1529 
1530 
1540  virtual ErrorCode resume (std::vector<TopicPartition*> &partitions) = 0;
1541 
1542 
1551  virtual ErrorCode query_watermark_offsets (const std::string &topic,
1552  int32_t partition,
1553  int64_t *low, int64_t *high,
1554  int timeout_ms) = 0;
1555 
1573  virtual ErrorCode get_watermark_offsets (const std::string &topic,
1574  int32_t partition,
1575  int64_t *low, int64_t *high) = 0;
1576 
1577 
1599  virtual ErrorCode offsetsForTimes (std::vector<TopicPartition*> &offsets,
1600  int timeout_ms) = 0;
1601 
1602 
1611  virtual Queue *get_partition_queue (const TopicPartition *partition) = 0;
1612 
1629  virtual ErrorCode set_log_queue (Queue *queue) = 0;
1630 
1642  virtual void yield () = 0;
1643 
1658  virtual const std::string clusterid (int timeout_ms) = 0;
1659 
1676  virtual struct rd_kafka_s *c_ptr () = 0;
1677 
1693  virtual int32_t controllerid (int timeout_ms) = 0;
1694 
1695 
1717  virtual ErrorCode fatal_error (std::string &errstr) const = 0;
1718 
1758  virtual ErrorCode oauthbearer_set_token (const std::string &token_value,
1759  int64_t md_lifetime_ms,
1760  const std::string &md_principal_name,
1761  const std::list<std::string> &extensions,
1762  std::string &errstr) = 0;
1763 
1781  virtual ErrorCode oauthbearer_set_token_failure (const std::string &errstr) = 0;
1782 
1793  virtual void *mem_malloc (size_t size) = 0;
1794 
1808  virtual void mem_free (void *ptr) = 0;
1809 };
1810 
1811 
1830 class RD_EXPORT TopicPartition {
1831 public:
1837  static TopicPartition *create (const std::string &topic, int partition);
1838 
1845  static TopicPartition *create (const std::string &topic, int partition,
1846  int64_t offset);
1847 
1848  virtual ~TopicPartition() = 0;
1849 
1854  static void destroy (std::vector<TopicPartition*> &partitions);
1855 
1857  virtual const std::string &topic () const = 0;
1858 
1860  virtual int partition () const = 0;
1861 
1863  virtual int64_t offset () const = 0;
1864 
1866  virtual void set_offset (int64_t offset) = 0;
1867 
1869  virtual ErrorCode err () const = 0;
1870 };
1871 
1872 
1873 
1878 class RD_EXPORT Topic {
1879  public:
1886  static const int32_t PARTITION_UA;
1887 
1889  static const int64_t OFFSET_BEGINNING;
1890  static const int64_t OFFSET_END;
1891  static const int64_t OFFSET_STORED;
1892  static const int64_t OFFSET_INVALID;
1904  static Topic *create (Handle *base, const std::string &topic_str,
1905  const Conf *conf, std::string &errstr);
1906 
1907  virtual ~Topic () = 0;
1908 
1909 
1911  virtual const std::string name () const = 0;
1912 
1918  virtual bool partition_available (int32_t partition) const = 0;
1919 
1931  virtual ErrorCode offset_store (int32_t partition, int64_t offset) = 0;
1932 
1949  virtual struct rd_kafka_topic_s *c_ptr () = 0;
1950 };
1951 
1952 
1974 class RD_EXPORT MessageTimestamp {
1975 public:
1980  MSG_TIMESTAMP_LOG_APPEND_TIME
1981  };
1982 
1984  int64_t timestamp;
1985 };
1986 
1987 
1997 class RD_EXPORT Headers {
1998 public:
1999  virtual ~Headers() = 0;
2000 
2009  class Header {
2010  public:
2021  Header(const std::string &key,
2022  const void *value,
2023  size_t value_size):
2024  key_(key), err_(ERR_NO_ERROR), value_size_(value_size) {
2025  value_ = copy_value(value, value_size);
2026  }
2027 
2041  Header(const std::string &key,
2042  const void *value,
2043  size_t value_size,
2044  const RdKafka::ErrorCode err):
2045  key_(key), err_(err), value_(NULL), value_size_(value_size) {
2046  if (err == ERR_NO_ERROR)
2047  value_ = copy_value(value, value_size);
2048  }
2049 
2055  Header(const Header &other):
2056  key_(other.key_), err_(other.err_), value_size_(other.value_size_) {
2057  value_ = copy_value(other.value_, value_size_);
2058  }
2059 
2065  Header& operator=(const Header &other)
2066  {
2067  if (&other == this) {
2068  return *this;
2069  }
2070 
2071  key_ = other.key_;
2072  err_ = other.err_;
2073  value_size_ = other.value_size_;
2074 
2075  if (value_ != NULL)
2076  mem_free(value_);
2077 
2078  value_ = copy_value(other.value_, value_size_);
2079 
2080  return *this;
2081  }
2082 
2083  ~Header() {
2084  if (value_ != NULL)
2085  mem_free(value_);
2086  }
2087 
2089  std::string key() const {
2090  return key_;
2091  }
2092 
2094  const void *value() const {
2095  return value_;
2096  }
2097 
2100  const char *value_string() const {
2101  return static_cast<const char *>(value_);
2102  }
2103 
2105  size_t value_size() const {
2106  return value_size_;
2107  }
2108 
2110  RdKafka::ErrorCode err() const {
2111  return err_;
2112  }
2113 
2114  private:
2115  char *copy_value(const void *value, size_t value_size) {
2116  if (!value)
2117  return NULL;
2118 
2119  char *dest = (char *)mem_malloc(value_size + 1);
2120  memcpy(dest, (const char *)value, value_size);
2121  dest[value_size] = '\0';
2122 
2123  return dest;
2124  }
2125 
2126  std::string key_;
2127  RdKafka::ErrorCode err_;
2128  char *value_;
2129  size_t value_size_;
2130  void *operator new(size_t); /* Prevent dynamic allocation */
2131  };
2132 
2138  static Headers *create();
2139 
2148  static Headers *create(const std::vector<Header> &headers);
2149 
2159  virtual ErrorCode add(const std::string &key, const void *value,
2160  size_t value_size) = 0;
2161 
2172  virtual ErrorCode add(const std::string &key, const std::string &value) = 0;
2173 
2183  virtual ErrorCode add(const Header &header) = 0;
2184 
2192  virtual ErrorCode remove(const std::string &key) = 0;
2193 
2203  virtual std::vector<Header> get(const std::string &key) const = 0;
2204 
2215  virtual Header get_last(const std::string &key) const = 0;
2216 
2222  virtual std::vector<Header> get_all() const = 0;
2223 
2227  virtual size_t size() const = 0;
2228 };
2229 
2230 
2242 class RD_EXPORT Message {
2243  public:
2246  enum Status {
2250  MSG_STATUS_NOT_PERSISTED = 0,
2251 
2255  MSG_STATUS_POSSIBLY_PERSISTED = 1,
2256 
2260  MSG_STATUS_PERSISTED = 2,
2261  };
2262 
2270  virtual std::string errstr() const = 0;
2271 
2273  virtual ErrorCode err () const = 0;
2274 
2279  virtual Topic *topic () const = 0;
2280 
2282  virtual std::string topic_name () const = 0;
2283 
2285  virtual int32_t partition () const = 0;
2286 
2288  virtual void *payload () const = 0 ;
2289 
2291  virtual size_t len () const = 0;
2292 
2294  virtual const std::string *key () const = 0;
2295 
2297  virtual const void *key_pointer () const = 0 ;
2298 
2300  virtual size_t key_len () const = 0;
2301 
2303  virtual int64_t offset () const = 0;
2304 
2306  virtual MessageTimestamp timestamp () const = 0;
2307 
2309  virtual void *msg_opaque () const = 0;
2310 
2311  virtual ~Message () = 0;
2312 
2315  virtual int64_t latency () const = 0;
2316 
2333  virtual struct rd_kafka_message_s *c_ptr () = 0;
2334 
2338  virtual Status status () const = 0;
2339 
2344  virtual RdKafka::Headers *headers () = 0;
2345 
2352  virtual RdKafka::Headers *headers (RdKafka::ErrorCode *err) = 0;
2353 
2356  virtual int32_t broker_id () const = 0;
2357 };
2358 
2382 class RD_EXPORT Queue {
2383  public:
2387  static Queue *create (Handle *handle);
2388 
2399  virtual ErrorCode forward (Queue *dst) = 0;
2400 
2401 
2413  virtual Message *consume (int timeout_ms) = 0;
2414 
2422  virtual int poll (int timeout_ms) = 0;
2423 
2424  virtual ~Queue () = 0;
2425 
2441  virtual void io_event_enable (int fd, const void *payload, size_t size) = 0;
2442 };
2443 
2457 class RD_EXPORT ConsumerGroupMetadata {
2458 public:
2459  virtual ~ConsumerGroupMetadata () = 0;
2460 };
2461 
2479 class RD_EXPORT KafkaConsumer : public virtual Handle {
2480 public:
2492  static KafkaConsumer *create (const Conf *conf, std::string &errstr);
2493 
2494  virtual ~KafkaConsumer () = 0;
2495 
2496 
2499  virtual ErrorCode assignment (std::vector<RdKafka::TopicPartition*> &partitions) = 0;
2500 
2503  virtual ErrorCode subscription (std::vector<std::string> &topics) = 0;
2504 
2539  virtual ErrorCode subscribe (const std::vector<std::string> &topics) = 0;
2540 
2542  virtual ErrorCode unsubscribe () = 0;
2543 
2550  virtual ErrorCode assign (const std::vector<TopicPartition*> &partitions) = 0;
2551 
2555  virtual ErrorCode unassign () = 0;
2556 
2581  virtual Message *consume (int timeout_ms) = 0;
2582 
2596  virtual ErrorCode commitSync () = 0;
2597 
2603  virtual ErrorCode commitAsync () = 0;
2604 
2614  virtual ErrorCode commitSync (Message *message) = 0;
2615 
2625  virtual ErrorCode commitAsync (Message *message) = 0;
2626 
2636  virtual ErrorCode commitSync (std::vector<TopicPartition*> &offsets) = 0;
2637 
2647  virtual ErrorCode commitAsync (const std::vector<TopicPartition*> &offsets) = 0;
2648 
2659  virtual ErrorCode commitSync (OffsetCommitCb *offset_commit_cb) = 0;
2660 
2671  virtual ErrorCode commitSync (std::vector<TopicPartition*> &offsets,
2672  OffsetCommitCb *offset_commit_cb) = 0;
2673 
2674 
2675 
2676 
2685  virtual ErrorCode committed (std::vector<TopicPartition*> &partitions,
2686  int timeout_ms) = 0;
2687 
2696  virtual ErrorCode position (std::vector<TopicPartition*> &partitions) = 0;
2697 
2698 
2721  virtual ErrorCode close () = 0;
2722 
2723 
2741  virtual ErrorCode seek (const TopicPartition &partition, int timeout_ms) = 0;
2742 
2743 
2761  virtual ErrorCode offsets_store (std::vector<TopicPartition*> &offsets) = 0;
2762 
2763 
2774  virtual ConsumerGroupMetadata *groupMetadata () = 0;
2775 
2776 
2791  virtual bool assignment_lost () = 0;
2792 
2808  virtual std::string rebalance_protocol () = 0;
2809 
2810 
2826  virtual Error *incremental_assign (const std::vector<TopicPartition*> &partitions) = 0;
2827 
2828 
2844  virtual Error *incremental_unassign (const std::vector<TopicPartition*> &partitions) = 0;
2845 
2846 };
2847 
2848 
2863 class RD_EXPORT Consumer : public virtual Handle {
2864  public:
2875  static Consumer *create (const Conf *conf, std::string &errstr);
2876 
2877  virtual ~Consumer () = 0;
2878 
2879 
2899  virtual ErrorCode start (Topic *topic, int32_t partition, int64_t offset) = 0;
2900 
2907  virtual ErrorCode start (Topic *topic, int32_t partition, int64_t offset,
2908  Queue *queue) = 0;
2909 
2919  virtual ErrorCode stop (Topic *topic, int32_t partition) = 0;
2920 
2935  virtual ErrorCode seek (Topic *topic, int32_t partition, int64_t offset,
2936  int timeout_ms) = 0;
2937 
2955  virtual Message *consume (Topic *topic, int32_t partition,
2956  int timeout_ms) = 0;
2957 
2979  virtual Message *consume (Queue *queue, int timeout_ms) = 0;
2980 
3000  virtual int consume_callback (Topic *topic, int32_t partition,
3001  int timeout_ms,
3002  ConsumeCb *consume_cb,
3003  void *opaque) = 0;
3004 
3011  virtual int consume_callback (Queue *queue, int timeout_ms,
3012  RdKafka::ConsumeCb *consume_cb,
3013  void *opaque) = 0;
3014 
3024  static int64_t OffsetTail(int64_t offset);
3025 };
3026 
3040 class RD_EXPORT Producer : public virtual Handle {
3041  public:
3052  static Producer *create (const Conf *conf, std::string &errstr);
3053 
3054 
3055  virtual ~Producer () = 0;
3056 
3062  enum {
3063  RK_MSG_FREE = 0x1,
3066  RK_MSG_COPY = 0x2,
3071  RK_MSG_BLOCK = 0x4
3088  /* For backwards compatibility: */
3089 #ifndef MSG_COPY /* defined in sys/msg.h */
3090  ,
3093  MSG_FREE = RK_MSG_FREE,
3094  MSG_COPY = RK_MSG_COPY
3095 #endif
3096 
3097  };
3098 
3155  virtual ErrorCode produce (Topic *topic, int32_t partition,
3156  int msgflags,
3157  void *payload, size_t len,
3158  const std::string *key,
3159  void *msg_opaque) = 0;
3160 
3165  virtual ErrorCode produce (Topic *topic, int32_t partition,
3166  int msgflags,
3167  void *payload, size_t len,
3168  const void *key, size_t key_len,
3169  void *msg_opaque) = 0;
3170 
3177  virtual ErrorCode produce (const std::string topic_name, int32_t partition,
3178  int msgflags,
3179  void *payload, size_t len,
3180  const void *key, size_t key_len,
3181  int64_t timestamp, void *msg_opaque) = 0;
3182 
3190  virtual ErrorCode produce (const std::string topic_name, int32_t partition,
3191  int msgflags,
3192  void *payload, size_t len,
3193  const void *key, size_t key_len,
3194  int64_t timestamp,
3195  RdKafka::Headers *headers,
3196  void *msg_opaque) = 0;
3197 
3198 
3203  virtual ErrorCode produce (Topic *topic, int32_t partition,
3204  const std::vector<char> *payload,
3205  const std::vector<char> *key,
3206  void *msg_opaque) = 0;
3207 
3208 
3221  virtual ErrorCode flush (int timeout_ms) = 0;
3222 
3223 
3251  virtual ErrorCode purge (int purge_flags) = 0;
3252 
3256  enum {
3257  PURGE_QUEUE = 0x1,
3259  PURGE_INFLIGHT = 0x2,
3266  PURGE_NON_BLOCKING = 0x4 /* Don't wait for background queue
3267  * purging to finish. */
3268  };
3269 
3296  virtual Error *init_transactions (int timeout_ms) = 0;
3297 
3298 
3311  virtual Error *begin_transaction () = 0;
3312 
3359  virtual Error *send_offsets_to_transaction (
3360  const std::vector<TopicPartition*> &offsets,
3361  const ConsumerGroupMetadata *group_metadata,
3362  int timeout_ms) = 0;
3363 
3392  virtual Error *commit_transaction (int timeout_ms) = 0;
3393 
3424  virtual Error *abort_transaction (int timeout_ms) = 0;
3425 
3427 };
3428 
3443  public:
3445  virtual int32_t id() const = 0;
3446 
3448  virtual const std::string host() const = 0;
3449 
3451  virtual int port() const = 0;
3452 
3453  virtual ~BrokerMetadata() = 0;
3454 };
3455 
3456 
3457 
3462  public:
3464  typedef std::vector<int32_t> ReplicasVector;
3466  typedef std::vector<int32_t> ISRSVector;
3467 
3469  typedef ReplicasVector::const_iterator ReplicasIterator;
3471  typedef ISRSVector::const_iterator ISRSIterator;
3472 
3473 
3475  virtual int32_t id() const = 0;
3476 
3478  virtual ErrorCode err() const = 0;
3479 
3481  virtual int32_t leader() const = 0;
3482 
3484  virtual const std::vector<int32_t> *replicas() const = 0;
3485 
3489  virtual const std::vector<int32_t> *isrs() const = 0;
3490 
3491  virtual ~PartitionMetadata() = 0;
3492 };
3493 
3494 
3495 
3500  public:
3502  typedef std::vector<const PartitionMetadata*> PartitionMetadataVector;
3504  typedef PartitionMetadataVector::const_iterator PartitionMetadataIterator;
3505 
3507  virtual const std::string topic() const = 0;
3508 
3510  virtual const PartitionMetadataVector *partitions() const = 0;
3511 
3513  virtual ErrorCode err() const = 0;
3514 
3515  virtual ~TopicMetadata() = 0;
3516 };
3517 
3518 
3522 class Metadata {
3523  public:
3525  typedef std::vector<const BrokerMetadata*> BrokerMetadataVector;
3527  typedef std::vector<const TopicMetadata*> TopicMetadataVector;
3528 
3530  typedef BrokerMetadataVector::const_iterator BrokerMetadataIterator;
3532  typedef TopicMetadataVector::const_iterator TopicMetadataIterator;
3533 
3534 
3540  virtual const BrokerMetadataVector *brokers() const = 0;
3541 
3547  virtual const TopicMetadataVector *topics() const = 0;
3548 
3550  virtual int32_t orig_broker_id() const = 0;
3551 
3553  virtual const std::string orig_broker_name() const = 0;
3554 
3555  virtual ~Metadata() = 0;
3556 };
3557 
3560 }
3561 
3562 
3563 #endif /* _RDKAFKACPP_H_ */
virtual int32_t partitioner_cb(const Topic *topic, const void *key, size_t key_len, int32_t partition_cnt, void *msg_opaque)=0
Variant partitioner callback that gets key as pointer and length instead of as a const std::string *...
virtual ErrorCode err() const =0
std::string key() const
Definition: rdkafkacpp.h:2089
virtual const std::string orig_broker_name() const =0
Broker (name) originating this metadata.
ISRSVector::const_iterator ISRSIterator
ISRs iterator.
Definition: rdkafkacpp.h:3471
Header(const std::string &key, const void *value, size_t value_size, const RdKafka::ErrorCode err)
Header object to encapsulate a single Header.
Definition: rdkafkacpp.h:2041
Header object.
Definition: rdkafkacpp.h:2009
High-level KafkaConsumer (for brokers 0.9 and later)
Definition: rdkafkacpp.h:2479
static const int64_t OFFSET_BEGINNING
Special offsets.
Definition: rdkafkacpp.h:1889
ConfType
Configuration object type.
Definition: rdkafkacpp.h:1179
Partitioner callback class.
Definition: rdkafkacpp.h:759
virtual int port() const =0
Type
Event type.
Definition: rdkafkacpp.h:837
const char * value_string() const
Definition: rdkafkacpp.h:2100
virtual int32_t orig_broker_id() const =0
Broker (id) originating this metadata.
size_t value_size() const
Definition: rdkafkacpp.h:2105
virtual const std::string host() const =0
SASL/OAUTHBEARER token refresh callback class.
Definition: rdkafkacpp.h:736
std::vector< const PartitionMetadata * > PartitionMetadataVector
Partitions.
Definition: rdkafkacpp.h:3502
static const int32_t PARTITION_UA
Unassigned partition.
Definition: rdkafkacpp.h:1886
Header(const Header &other)
Copy constructor.
Definition: rdkafkacpp.h:2055
Message object.
Definition: rdkafkacpp.h:2242
Severity
EVENT_LOG severities (conforms to syslog(3) severities)
Definition: rdkafkacpp.h:845
Event callback class.
Definition: rdkafkacpp.h:818
Headers object.
Definition: rdkafkacpp.h:1997
ReplicasVector::const_iterator ReplicasIterator
Replicas iterator.
Definition: rdkafkacpp.h:3469
Status
Message persistence status can be used by the application to find out if a produced message was persi...
Definition: rdkafkacpp.h:2246
KafkaConsumer: Rebalance callback class
Definition: rdkafkacpp.h:946
std::vector< int32_t > ReplicasVector
Replicas.
Definition: rdkafkacpp.h:3464
virtual const std::string topic() const =0
int64_t timestamp
Definition: rdkafkacpp.h:1984
Variant partitioner with key pointer.
Definition: rdkafkacpp.h:789
virtual int32_t id() const =0
BrokerMetadataVector::const_iterator BrokerMetadataIterator
Brokers iterator.
Definition: rdkafkacpp.h:3530
ConsumerGroupMetadata holds a consumer instance's group metadata state.
Definition: rdkafkacpp.h:2457
Definition: rdkafkacpp.h:840
static const int64_t OFFSET_INVALID
Definition: rdkafkacpp.h:1892
virtual const PartitionMetadataVector * partitions() const =0
virtual const BrokerMetadataVector * brokers() const =0
Broker list.
RdKafka::ErrorCode err() const
Definition: rdkafkacpp.h:2110
Topic handle.
Definition: rdkafkacpp.h:1878
Metadata: Partition information.
Definition: rdkafkacpp.h:3461
Producer.
Definition: rdkafkacpp.h:3040
Metadata: Topic information.
Definition: rdkafkacpp.h:3499
Definition: rdkafkacpp.h:1180
Definition: rdkafkacpp.h:838
std::vector< const BrokerMetadata * > BrokerMetadataVector
Brokers.
Definition: rdkafkacpp.h:3525
ConfResult
RdKafka::Conf::Set() result code.
Definition: rdkafkacpp.h:1187
Definition: rdkafkacpp.h:93
Queue interface.
Definition: rdkafkacpp.h:2382
MessageTimestampType type
Definition: rdkafkacpp.h:1983
Message timestamp object.
Definition: rdkafkacpp.h:1974
std::vector< int32_t > ISRSVector
ISRs (In-Sync-Replicas)
Definition: rdkafkacpp.h:3466
virtual const TopicMetadataVector * topics() const =0
Topic list.
static const int64_t OFFSET_END
Definition: rdkafkacpp.h:1890
PartitionMetadataVector::const_iterator PartitionMetadataIterator
Partitions iterator.
Definition: rdkafkacpp.h:3504
Portability: OpenCb callback class
Definition: rdkafkacpp.h:1135
SSL broker certificate verification class.
Definition: rdkafkacpp.h:1057
Metadata: Broker information.
Definition: rdkafkacpp.h:3442
Definition: rdkafkacpp.h:839
virtual int32_t leader() const =0
static const int64_t OFFSET_STORED
Definition: rdkafkacpp.h:1891
Header(const std::string &key, const void *value, size_t value_size)
Header object to encapsulate a single Header.
Definition: rdkafkacpp.h:2021
Configuration interface.
Definition: rdkafkacpp.h:1174
Offset Commit callback class.
Definition: rdkafkacpp.h:1027
virtual const std::vector< int32_t > * replicas() const =0
Base handle, super class for specific clients.
Definition: rdkafkacpp.h:1450
std::vector< const TopicMetadata * > TopicMetadataVector
Topics.
Definition: rdkafkacpp.h:3527
Topic+Partition.
Definition: rdkafkacpp.h:1830
The Error class is used as a return value from APIs to propagate an error. The error consists of an e...
Definition: rdkafkacpp.h:613
Consume callback class.
Definition: rdkafkacpp.h:928
virtual const std::vector< int32_t > * isrs() const =0
Simple Consumer (legacy)
Definition: rdkafkacpp.h:2863
MessageTimestampType
Definition: rdkafkacpp.h:1977
TopicMetadataVector::const_iterator TopicMetadataIterator
Topics iterator.
Definition: rdkafkacpp.h:3532
Event object class as passed to the EventCb callback.
Definition: rdkafkacpp.h:834
Metadata container.
Definition: rdkafkacpp.h:3522
virtual ErrorCode err() const =0
Header & operator=(const Header &other)
Assignment operator.
Definition: rdkafkacpp.h:2065
Portability: SocketCb callback class
Definition: rdkafkacpp.h:1110
Delivery Report callback class.
Definition: rdkafkacpp.h:698
const void * value() const
Definition: rdkafkacpp.h:2094
virtual int32_t id() const =0