librdkafka
The Apache Kafka C/C++ client library
 All Data Structures Files Functions Variables Typedefs Enumerations Enumerator Macros Pages
rdkafkacpp.h
Go to the documentation of this file.
1 /*
2  * librdkafka - Apache Kafka C/C++ library
3  *
4  * Copyright (c) 2014 Magnus Edenhill
5  * All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions are met:
9  *
10  * 1. Redistributions of source code must retain the above copyright notice,
11  * this list of conditions and the following disclaimer.
12  * 2. Redistributions in binary form must reproduce the above copyright notice,
13  * this list of conditions and the following disclaimer in the documentation
14  * and/or other materials provided with the distribution.
15  *
16  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26  * POSSIBILITY OF SUCH DAMAGE.
27  */
28 
29 #ifndef _RDKAFKACPP_H_
30 #define _RDKAFKACPP_H_
31 
50 #include <string>
51 #include <list>
52 #include <vector>
53 #include <cstdlib>
54 #include <cstring>
55 #include <stdint.h>
56 #include <sys/types.h>
57 
58 #ifdef _WIN32
59 #ifndef ssize_t
60 #ifndef _BASETSD_H_
61 #include <basetsd.h>
62 #endif
63 #ifndef _SSIZE_T_DEFINED
64 #define _SSIZE_T_DEFINED
65 typedef SSIZE_T ssize_t;
66 #endif
67 #endif
68 #undef RD_EXPORT
69 #ifdef LIBRDKAFKA_STATICLIB
70 #define RD_EXPORT
71 #else
72 #ifdef LIBRDKAFKACPP_EXPORTS
73 #define RD_EXPORT __declspec(dllexport)
74 #else
75 #define RD_EXPORT __declspec(dllimport)
76 #endif
77 #endif
78 #else
79 #define RD_EXPORT
80 #endif
81 
84 extern "C" {
85 /* Forward declarations */
86 struct rd_kafka_s;
87 struct rd_kafka_topic_s;
88 struct rd_kafka_message_s;
89 struct rd_kafka_conf_s;
90 struct rd_kafka_topic_conf_s;
91 }
92 
93 namespace RdKafka {
94 
114 #define RD_KAFKA_VERSION 0x010900ff
115 
121 RD_EXPORT
122 int version();
123 
127 RD_EXPORT
128 std::string version_str();
129 
134 RD_EXPORT
135 std::string get_debug_contexts();
136 
146 RD_EXPORT
147 int wait_destroyed(int timeout_ms);
148 
159 RD_EXPORT
160 void *mem_malloc(size_t size);
161 
175 RD_EXPORT
176 void mem_free(void *ptr);
177 
200 enum ErrorCode {
201  /* Internal errors to rdkafka: */
203  ERR__BEGIN = -200,
205  ERR__BAD_MSG = -199,
207  ERR__BAD_COMPRESSION = -198,
209  ERR__DESTROY = -197,
211  ERR__FAIL = -196,
213  ERR__TRANSPORT = -195,
215  ERR__CRIT_SYS_RESOURCE = -194,
217  ERR__RESOLVE = -193,
219  ERR__MSG_TIMED_OUT = -192,
224  ERR__PARTITION_EOF = -191,
226  ERR__UNKNOWN_PARTITION = -190,
228  ERR__FS = -189,
230  ERR__UNKNOWN_TOPIC = -188,
232  ERR__ALL_BROKERS_DOWN = -187,
234  ERR__INVALID_ARG = -186,
236  ERR__TIMED_OUT = -185,
238  ERR__QUEUE_FULL = -184,
240  ERR__ISR_INSUFF = -183,
242  ERR__NODE_UPDATE = -182,
244  ERR__SSL = -181,
246  ERR__WAIT_COORD = -180,
248  ERR__UNKNOWN_GROUP = -179,
250  ERR__IN_PROGRESS = -178,
252  ERR__PREV_IN_PROGRESS = -177,
254  ERR__EXISTING_SUBSCRIPTION = -176,
256  ERR__ASSIGN_PARTITIONS = -175,
258  ERR__REVOKE_PARTITIONS = -174,
260  ERR__CONFLICT = -173,
262  ERR__STATE = -172,
264  ERR__UNKNOWN_PROTOCOL = -171,
266  ERR__NOT_IMPLEMENTED = -170,
268  ERR__AUTHENTICATION = -169,
270  ERR__NO_OFFSET = -168,
272  ERR__OUTDATED = -167,
274  ERR__TIMED_OUT_QUEUE = -166,
276  ERR__UNSUPPORTED_FEATURE = -165,
278  ERR__WAIT_CACHE = -164,
280  ERR__INTR = -163,
282  ERR__KEY_SERIALIZATION = -162,
284  ERR__VALUE_SERIALIZATION = -161,
286  ERR__KEY_DESERIALIZATION = -160,
288  ERR__VALUE_DESERIALIZATION = -159,
290  ERR__PARTIAL = -158,
292  ERR__READ_ONLY = -157,
294  ERR__NOENT = -156,
296  ERR__UNDERFLOW = -155,
298  ERR__INVALID_TYPE = -154,
300  ERR__RETRY = -153,
302  ERR__PURGE_QUEUE = -152,
304  ERR__PURGE_INFLIGHT = -151,
306  ERR__FATAL = -150,
308  ERR__INCONSISTENT = -149,
310  ERR__GAPLESS_GUARANTEE = -148,
312  ERR__MAX_POLL_EXCEEDED = -147,
314  ERR__UNKNOWN_BROKER = -146,
316  ERR__NOT_CONFIGURED = -145,
318  ERR__FENCED = -144,
320  ERR__APPLICATION = -143,
322  ERR__ASSIGNMENT_LOST = -142,
324  ERR__NOOP = -141,
326  ERR__AUTO_OFFSET_RESET = -140,
327 
329  ERR__END = -100,
330 
331  /* Kafka broker errors: */
333  ERR_UNKNOWN = -1,
335  ERR_NO_ERROR = 0,
337  ERR_OFFSET_OUT_OF_RANGE = 1,
339  ERR_INVALID_MSG = 2,
341  ERR_UNKNOWN_TOPIC_OR_PART = 3,
343  ERR_INVALID_MSG_SIZE = 4,
345  ERR_LEADER_NOT_AVAILABLE = 5,
347  ERR_NOT_LEADER_FOR_PARTITION = 6,
349  ERR_REQUEST_TIMED_OUT = 7,
351  ERR_BROKER_NOT_AVAILABLE = 8,
353  ERR_REPLICA_NOT_AVAILABLE = 9,
355  ERR_MSG_SIZE_TOO_LARGE = 10,
357  ERR_STALE_CTRL_EPOCH = 11,
359  ERR_OFFSET_METADATA_TOO_LARGE = 12,
361  ERR_NETWORK_EXCEPTION = 13,
363  ERR_COORDINATOR_LOAD_IN_PROGRESS = 14,
365 #define ERR_GROUP_LOAD_IN_PROGRESS ERR_COORDINATOR_LOAD_IN_PROGRESS
366 
367  ERR_COORDINATOR_NOT_AVAILABLE = 15,
369 #define ERR_GROUP_COORDINATOR_NOT_AVAILABLE ERR_COORDINATOR_NOT_AVAILABLE
370 
371  ERR_NOT_COORDINATOR = 16,
373 #define ERR_NOT_COORDINATOR_FOR_GROUP ERR_NOT_COORDINATOR
374 
375  ERR_TOPIC_EXCEPTION = 17,
377  ERR_RECORD_LIST_TOO_LARGE = 18,
379  ERR_NOT_ENOUGH_REPLICAS = 19,
381  ERR_NOT_ENOUGH_REPLICAS_AFTER_APPEND = 20,
383  ERR_INVALID_REQUIRED_ACKS = 21,
385  ERR_ILLEGAL_GENERATION = 22,
387  ERR_INCONSISTENT_GROUP_PROTOCOL = 23,
389  ERR_INVALID_GROUP_ID = 24,
391  ERR_UNKNOWN_MEMBER_ID = 25,
393  ERR_INVALID_SESSION_TIMEOUT = 26,
395  ERR_REBALANCE_IN_PROGRESS = 27,
397  ERR_INVALID_COMMIT_OFFSET_SIZE = 28,
399  ERR_TOPIC_AUTHORIZATION_FAILED = 29,
401  ERR_GROUP_AUTHORIZATION_FAILED = 30,
403  ERR_CLUSTER_AUTHORIZATION_FAILED = 31,
405  ERR_INVALID_TIMESTAMP = 32,
407  ERR_UNSUPPORTED_SASL_MECHANISM = 33,
409  ERR_ILLEGAL_SASL_STATE = 34,
411  ERR_UNSUPPORTED_VERSION = 35,
413  ERR_TOPIC_ALREADY_EXISTS = 36,
415  ERR_INVALID_PARTITIONS = 37,
417  ERR_INVALID_REPLICATION_FACTOR = 38,
419  ERR_INVALID_REPLICA_ASSIGNMENT = 39,
421  ERR_INVALID_CONFIG = 40,
423  ERR_NOT_CONTROLLER = 41,
425  ERR_INVALID_REQUEST = 42,
427  ERR_UNSUPPORTED_FOR_MESSAGE_FORMAT = 43,
429  ERR_POLICY_VIOLATION = 44,
431  ERR_OUT_OF_ORDER_SEQUENCE_NUMBER = 45,
433  ERR_DUPLICATE_SEQUENCE_NUMBER = 46,
435  ERR_INVALID_PRODUCER_EPOCH = 47,
437  ERR_INVALID_TXN_STATE = 48,
440  ERR_INVALID_PRODUCER_ID_MAPPING = 49,
443  ERR_INVALID_TRANSACTION_TIMEOUT = 50,
446  ERR_CONCURRENT_TRANSACTIONS = 51,
450  ERR_TRANSACTION_COORDINATOR_FENCED = 52,
452  ERR_TRANSACTIONAL_ID_AUTHORIZATION_FAILED = 53,
454  ERR_SECURITY_DISABLED = 54,
456  ERR_OPERATION_NOT_ATTEMPTED = 55,
458  ERR_KAFKA_STORAGE_ERROR = 56,
460  ERR_LOG_DIR_NOT_FOUND = 57,
462  ERR_SASL_AUTHENTICATION_FAILED = 58,
464  ERR_UNKNOWN_PRODUCER_ID = 59,
466  ERR_REASSIGNMENT_IN_PROGRESS = 60,
468  ERR_DELEGATION_TOKEN_AUTH_DISABLED = 61,
470  ERR_DELEGATION_TOKEN_NOT_FOUND = 62,
472  ERR_DELEGATION_TOKEN_OWNER_MISMATCH = 63,
474  ERR_DELEGATION_TOKEN_REQUEST_NOT_ALLOWED = 64,
476  ERR_DELEGATION_TOKEN_AUTHORIZATION_FAILED = 65,
478  ERR_DELEGATION_TOKEN_EXPIRED = 66,
480  ERR_INVALID_PRINCIPAL_TYPE = 67,
482  ERR_NON_EMPTY_GROUP = 68,
484  ERR_GROUP_ID_NOT_FOUND = 69,
486  ERR_FETCH_SESSION_ID_NOT_FOUND = 70,
488  ERR_INVALID_FETCH_SESSION_EPOCH = 71,
490  ERR_LISTENER_NOT_FOUND = 72,
492  ERR_TOPIC_DELETION_DISABLED = 73,
494  ERR_FENCED_LEADER_EPOCH = 74,
496  ERR_UNKNOWN_LEADER_EPOCH = 75,
498  ERR_UNSUPPORTED_COMPRESSION_TYPE = 76,
500  ERR_STALE_BROKER_EPOCH = 77,
502  ERR_OFFSET_NOT_AVAILABLE = 78,
504  ERR_MEMBER_ID_REQUIRED = 79,
506  ERR_PREFERRED_LEADER_NOT_AVAILABLE = 80,
508  ERR_GROUP_MAX_SIZE_REACHED = 81,
511  ERR_FENCED_INSTANCE_ID = 82,
513  ERR_ELIGIBLE_LEADERS_NOT_AVAILABLE = 83,
515  ERR_ELECTION_NOT_NEEDED = 84,
517  ERR_NO_REASSIGNMENT_IN_PROGRESS = 85,
520  ERR_GROUP_SUBSCRIBED_TO_TOPIC = 86,
522  ERR_INVALID_RECORD = 87,
524  ERR_UNSTABLE_OFFSET_COMMIT = 88,
526  ERR_THROTTLING_QUOTA_EXCEEDED = 89,
529  ERR_PRODUCER_FENCED = 90,
531  ERR_RESOURCE_NOT_FOUND = 91,
533  ERR_DUPLICATE_RESOURCE = 92,
535  ERR_UNACCEPTABLE_CREDENTIAL = 93,
538  ERR_INCONSISTENT_VOTER_SET = 94,
540  ERR_INVALID_UPDATE_VERSION = 95,
542  ERR_FEATURE_UPDATE_FAILED = 96,
544  ERR_PRINCIPAL_DESERIALIZATION_FAILURE = 97
545 };
546 
547 
551 RD_EXPORT
552 std::string err2str(RdKafka::ErrorCode err);
553 
554 
555 
560 enum CertificateType {
561  CERT_PUBLIC_KEY,
562  CERT_PRIVATE_KEY,
563  CERT_CA,
564  CERT__CNT
565 };
566 
571 enum CertificateEncoding {
572  CERT_ENC_PKCS12,
573  CERT_ENC_DER,
574  CERT_ENC_PEM,
575  CERT_ENC__CNT
576 };
577 
583 /* Forward declarations */
584 class Handle;
585 class Producer;
586 class Message;
587 class Headers;
588 class Queue;
589 class Event;
590 class Topic;
591 class TopicPartition;
592 class Metadata;
593 class KafkaConsumer;
613 class RD_EXPORT Error {
614  public:
618  static Error *create(ErrorCode code, const std::string *errstr);
619 
620  virtual ~Error() {
621  }
622 
623  /*
624  * Error accessor methods
625  */
626 
630  virtual ErrorCode code() const = 0;
631 
635  virtual std::string name() const = 0;
636 
640  virtual std::string str() const = 0;
641 
646  virtual bool is_fatal() const = 0;
647 
651  virtual bool is_retriable() const = 0;
652 
664  virtual bool txn_requires_abort() const = 0;
665 };
666 
698 class RD_EXPORT DeliveryReportCb {
699  public:
703  virtual void dr_cb(Message &message) = 0;
704 
705  virtual ~DeliveryReportCb() {
706  }
707 };
708 
709 
737 class RD_EXPORT OAuthBearerTokenRefreshCb {
738  public:
746  virtual void oauthbearer_token_refresh_cb(
747  RdKafka::Handle *handle,
748  const std::string &oauthbearer_config) = 0;
749 
750  virtual ~OAuthBearerTokenRefreshCb() {
751  }
752 };
753 
754 
762 class RD_EXPORT PartitionerCb {
763  public:
781  virtual int32_t partitioner_cb(const Topic *topic,
782  const std::string *key,
783  int32_t partition_cnt,
784  void *msg_opaque) = 0;
785 
786  virtual ~PartitionerCb() {
787  }
788 };
789 
795  public:
804  virtual int32_t partitioner_cb(const Topic *topic,
805  const void *key,
806  size_t key_len,
807  int32_t partition_cnt,
808  void *msg_opaque) = 0;
809 
810  virtual ~PartitionerKeyPointerCb() {
811  }
812 };
813 
814 
815 
824 class RD_EXPORT EventCb {
825  public:
831  virtual void event_cb(Event &event) = 0;
832 
833  virtual ~EventCb() {
834  }
835 };
836 
837 
841 class RD_EXPORT Event {
842  public:
844  enum Type {
848  EVENT_THROTTLE
849  };
850 
852  enum Severity {
853  EVENT_SEVERITY_EMERG = 0,
854  EVENT_SEVERITY_ALERT = 1,
855  EVENT_SEVERITY_CRITICAL = 2,
856  EVENT_SEVERITY_ERROR = 3,
857  EVENT_SEVERITY_WARNING = 4,
858  EVENT_SEVERITY_NOTICE = 5,
859  EVENT_SEVERITY_INFO = 6,
860  EVENT_SEVERITY_DEBUG = 7
861  };
862 
863  virtual ~Event() {
864  }
865 
866  /*
867  * Event Accessor methods
868  */
869 
874  virtual Type type() const = 0;
875 
880  virtual ErrorCode err() const = 0;
881 
886  virtual Severity severity() const = 0;
887 
892  virtual std::string fac() const = 0;
893 
902  virtual std::string str() const = 0;
903 
908  virtual int throttle_time() const = 0;
909 
914  virtual std::string broker_name() const = 0;
915 
920  virtual int broker_id() const = 0;
921 
922 
928  virtual bool fatal() const = 0;
929 };
930 
931 
932 
936 class RD_EXPORT ConsumeCb {
937  public:
945  virtual void consume_cb(Message &message, void *opaque) = 0;
946 
947  virtual ~ConsumeCb() {
948  }
949 };
950 
951 
955 class RD_EXPORT RebalanceCb {
956  public:
1025  virtual void rebalance_cb(RdKafka::KafkaConsumer *consumer,
1026  RdKafka::ErrorCode err,
1027  std::vector<TopicPartition *> &partitions) = 0;
1028 
1029  virtual ~RebalanceCb() {
1030  }
1031 };
1032 
1033 
1037 class RD_EXPORT OffsetCommitCb {
1038  public:
1054  virtual void offset_commit_cb(RdKafka::ErrorCode err,
1055  std::vector<TopicPartition *> &offsets) = 0;
1056 
1057  virtual ~OffsetCommitCb() {
1058  }
1059 };
1060 
1061 
1062 
1068 class RD_EXPORT SslCertificateVerifyCb {
1069  public:
1106  virtual bool ssl_cert_verify_cb(const std::string &broker_name,
1107  int32_t broker_id,
1108  int *x509_error,
1109  int depth,
1110  const char *buf,
1111  size_t size,
1112  std::string &errstr) = 0;
1113 
1114  virtual ~SslCertificateVerifyCb() {
1115  }
1116 };
1117 
1118 
1123 class RD_EXPORT SocketCb {
1124  public:
1138  virtual int socket_cb(int domain, int type, int protocol) = 0;
1139 
1140  virtual ~SocketCb() {
1141  }
1142 };
1143 
1144 
1149 class RD_EXPORT OpenCb {
1150  public:
1162  virtual int open_cb(const std::string &path, int flags, int mode) = 0;
1163 
1164  virtual ~OpenCb() {
1165  }
1166 };
1167 
1168 
1188 class RD_EXPORT Conf {
1189  public:
1193  enum ConfType {
1195  CONF_TOPIC
1196  };
1197 
1201  enum ConfResult {
1202  CONF_UNKNOWN = -2,
1203  CONF_INVALID = -1,
1204  CONF_OK = 0
1205  };
1206 
1207 
1211  static Conf *create(ConfType type);
1212 
1213  virtual ~Conf() {
1214  }
1215 
1229  virtual Conf::ConfResult set(const std::string &name,
1230  const std::string &value,
1231  std::string &errstr) = 0;
1232 
1234  virtual Conf::ConfResult set(const std::string &name,
1235  DeliveryReportCb *dr_cb,
1236  std::string &errstr) = 0;
1237 
1239  virtual Conf::ConfResult set(
1240  const std::string &name,
1241  OAuthBearerTokenRefreshCb *oauthbearer_token_refresh_cb,
1242  std::string &errstr) = 0;
1243 
1245  virtual Conf::ConfResult set(const std::string &name,
1246  EventCb *event_cb,
1247  std::string &errstr) = 0;
1248 
1256  virtual Conf::ConfResult set(const std::string &name,
1257  const Conf *topic_conf,
1258  std::string &errstr) = 0;
1259 
1261  virtual Conf::ConfResult set(const std::string &name,
1262  PartitionerCb *partitioner_cb,
1263  std::string &errstr) = 0;
1264 
1266  virtual Conf::ConfResult set(const std::string &name,
1267  PartitionerKeyPointerCb *partitioner_kp_cb,
1268  std::string &errstr) = 0;
1269 
1271  virtual Conf::ConfResult set(const std::string &name,
1272  SocketCb *socket_cb,
1273  std::string &errstr) = 0;
1274 
1276  virtual Conf::ConfResult set(const std::string &name,
1277  OpenCb *open_cb,
1278  std::string &errstr) = 0;
1279 
1281  virtual Conf::ConfResult set(const std::string &name,
1282  RebalanceCb *rebalance_cb,
1283  std::string &errstr) = 0;
1284 
1286  virtual Conf::ConfResult set(const std::string &name,
1287  OffsetCommitCb *offset_commit_cb,
1288  std::string &errstr) = 0;
1289 
1294  virtual Conf::ConfResult set(const std::string &name,
1295  SslCertificateVerifyCb *ssl_cert_verify_cb,
1296  std::string &errstr) = 0;
1297 
1330  virtual Conf::ConfResult set_ssl_cert(RdKafka::CertificateType cert_type,
1331  RdKafka::CertificateEncoding cert_enc,
1332  const void *buffer,
1333  size_t size,
1334  std::string &errstr) = 0;
1335 
1348  virtual Conf::ConfResult get(const std::string &name,
1349  std::string &value) const = 0;
1350 
1354  virtual Conf::ConfResult get(DeliveryReportCb *&dr_cb) const = 0;
1355 
1359  virtual Conf::ConfResult get(
1360  OAuthBearerTokenRefreshCb *&oauthbearer_token_refresh_cb) const = 0;
1361 
1365  virtual Conf::ConfResult get(EventCb *&event_cb) const = 0;
1366 
1370  virtual Conf::ConfResult get(PartitionerCb *&partitioner_cb) const = 0;
1371 
1375  virtual Conf::ConfResult get(
1376  PartitionerKeyPointerCb *&partitioner_kp_cb) const = 0;
1377 
1381  virtual Conf::ConfResult get(SocketCb *&socket_cb) const = 0;
1382 
1386  virtual Conf::ConfResult get(OpenCb *&open_cb) const = 0;
1387 
1391  virtual Conf::ConfResult get(RebalanceCb *&rebalance_cb) const = 0;
1392 
1396  virtual Conf::ConfResult get(OffsetCommitCb *&offset_commit_cb) const = 0;
1397 
1399  virtual Conf::ConfResult get(
1400  SslCertificateVerifyCb *&ssl_cert_verify_cb) const = 0;
1401 
1404  virtual std::list<std::string> *dump() = 0;
1405 
1407  virtual Conf::ConfResult set(const std::string &name,
1408  ConsumeCb *consume_cb,
1409  std::string &errstr) = 0;
1410 
1427  virtual struct rd_kafka_conf_s *c_ptr_global() = 0;
1428 
1446  virtual struct rd_kafka_topic_conf_s *c_ptr_topic() = 0;
1447 
1460  virtual Conf::ConfResult set_engine_callback_data(void *value,
1461  std::string &errstr) = 0;
1462 
1463 
1486  virtual Conf::ConfResult enable_sasl_queue(bool enable,
1487  std::string &errstr) = 0;
1488 };
1489 
1502 class RD_EXPORT Handle {
1503  public:
1504  virtual ~Handle() {
1505  }
1506 
1508  virtual const std::string name() const = 0;
1509 
1518  virtual const std::string memberid() const = 0;
1519 
1520 
1545  virtual int poll(int timeout_ms) = 0;
1546 
1553  virtual int outq_len() = 0;
1554 
1570  virtual ErrorCode metadata(bool all_topics,
1571  const Topic *only_rkt,
1572  Metadata **metadatap,
1573  int timeout_ms) = 0;
1574 
1575 
1585  virtual ErrorCode pause(std::vector<TopicPartition *> &partitions) = 0;
1586 
1587 
1597  virtual ErrorCode resume(std::vector<TopicPartition *> &partitions) = 0;
1598 
1599 
1608  virtual ErrorCode query_watermark_offsets(const std::string &topic,
1609  int32_t partition,
1610  int64_t *low,
1611  int64_t *high,
1612  int timeout_ms) = 0;
1613 
1631  virtual ErrorCode get_watermark_offsets(const std::string &topic,
1632  int32_t partition,
1633  int64_t *low,
1634  int64_t *high) = 0;
1635 
1636 
1658  virtual ErrorCode offsetsForTimes(std::vector<TopicPartition *> &offsets,
1659  int timeout_ms) = 0;
1660 
1661 
1670  virtual Queue *get_partition_queue(const TopicPartition *partition) = 0;
1671 
1688  virtual ErrorCode set_log_queue(Queue *queue) = 0;
1689 
1701  virtual void yield() = 0;
1702 
1717  virtual const std::string clusterid(int timeout_ms) = 0;
1718 
1735  virtual struct rd_kafka_s *c_ptr() = 0;
1736 
1752  virtual int32_t controllerid(int timeout_ms) = 0;
1753 
1754 
1776  virtual ErrorCode fatal_error(std::string &errstr) const = 0;
1777 
1817  virtual ErrorCode oauthbearer_set_token(
1818  const std::string &token_value,
1819  int64_t md_lifetime_ms,
1820  const std::string &md_principal_name,
1821  const std::list<std::string> &extensions,
1822  std::string &errstr) = 0;
1823 
1841  virtual ErrorCode oauthbearer_set_token_failure(
1842  const std::string &errstr) = 0;
1843 
1851  virtual Error *sasl_background_callbacks_enable() = 0;
1852 
1853 
1859  virtual Queue *get_sasl_queue() = 0;
1860 
1864  virtual Queue *get_background_queue() = 0;
1865 
1866 
1867 
1878  virtual void *mem_malloc(size_t size) = 0;
1879 
1893  virtual void mem_free(void *ptr) = 0;
1894 };
1895 
1896 
1915 class RD_EXPORT TopicPartition {
1916  public:
1922  static TopicPartition *create(const std::string &topic, int partition);
1923 
1930  static TopicPartition *create(const std::string &topic,
1931  int partition,
1932  int64_t offset);
1933 
1934  virtual ~TopicPartition() = 0;
1935 
1940  static void destroy(std::vector<TopicPartition *> &partitions);
1941 
1943  virtual const std::string &topic() const = 0;
1944 
1946  virtual int partition() const = 0;
1947 
1949  virtual int64_t offset() const = 0;
1950 
1952  virtual void set_offset(int64_t offset) = 0;
1953 
1955  virtual ErrorCode err() const = 0;
1956 };
1957 
1958 
1959 
1964 class RD_EXPORT Topic {
1965  public:
1972  static const int32_t PARTITION_UA;
1973 
1975  static const int64_t OFFSET_BEGINNING;
1976  static const int64_t OFFSET_END;
1977  static const int64_t OFFSET_STORED;
1978  static const int64_t OFFSET_INVALID;
1990  static Topic *create(Handle *base,
1991  const std::string &topic_str,
1992  const Conf *conf,
1993  std::string &errstr);
1994 
1995  virtual ~Topic() = 0;
1996 
1997 
1999  virtual const std::string name() const = 0;
2000 
2006  virtual bool partition_available(int32_t partition) const = 0;
2007 
2019  virtual ErrorCode offset_store(int32_t partition, int64_t offset) = 0;
2020 
2037  virtual struct rd_kafka_topic_s *c_ptr() = 0;
2038 };
2039 
2040 
2062 class RD_EXPORT MessageTimestamp {
2063  public:
2068  MSG_TIMESTAMP_LOG_APPEND_TIME
2069  };
2070 
2072  int64_t timestamp;
2073 };
2074 
2075 
2085 class RD_EXPORT Headers {
2086  public:
2087  virtual ~Headers() = 0;
2088 
2097  class Header {
2098  public:
2109  Header(const std::string &key, const void *value, size_t value_size) :
2110  key_(key), err_(ERR_NO_ERROR), value_size_(value_size) {
2111  value_ = copy_value(value, value_size);
2112  }
2113 
2127  Header(const std::string &key,
2128  const void *value,
2129  size_t value_size,
2130  const RdKafka::ErrorCode err) :
2131  key_(key), err_(err), value_(NULL), value_size_(value_size) {
2132  if (err == ERR_NO_ERROR)
2133  value_ = copy_value(value, value_size);
2134  }
2135 
2141  Header(const Header &other) :
2142  key_(other.key_), err_(other.err_), value_size_(other.value_size_) {
2143  value_ = copy_value(other.value_, value_size_);
2144  }
2145 
2151  Header &operator=(const Header &other) {
2152  if (&other == this) {
2153  return *this;
2154  }
2155 
2156  key_ = other.key_;
2157  err_ = other.err_;
2158  value_size_ = other.value_size_;
2159 
2160  if (value_ != NULL)
2161  mem_free(value_);
2162 
2163  value_ = copy_value(other.value_, value_size_);
2164 
2165  return *this;
2166  }
2167 
2168  ~Header() {
2169  if (value_ != NULL)
2170  mem_free(value_);
2171  }
2172 
2174  std::string key() const {
2175  return key_;
2176  }
2177 
2179  const void *value() const {
2180  return value_;
2181  }
2182 
2185  const char *value_string() const {
2186  return static_cast<const char *>(value_);
2187  }
2188 
2190  size_t value_size() const {
2191  return value_size_;
2192  }
2193 
2195  RdKafka::ErrorCode err() const {
2196  return err_;
2197  }
2198 
2199  private:
2200  char *copy_value(const void *value, size_t value_size) {
2201  if (!value)
2202  return NULL;
2203 
2204  char *dest = (char *)mem_malloc(value_size + 1);
2205  memcpy(dest, (const char *)value, value_size);
2206  dest[value_size] = '\0';
2207 
2208  return dest;
2209  }
2210 
2211  std::string key_;
2212  RdKafka::ErrorCode err_;
2213  char *value_;
2214  size_t value_size_;
2215  void *operator new(size_t); /* Prevent dynamic allocation */
2216  };
2217 
2223  static Headers *create();
2224 
2233  static Headers *create(const std::vector<Header> &headers);
2234 
2244  virtual ErrorCode add(const std::string &key,
2245  const void *value,
2246  size_t value_size) = 0;
2247 
2258  virtual ErrorCode add(const std::string &key, const std::string &value) = 0;
2259 
2269  virtual ErrorCode add(const Header &header) = 0;
2270 
2278  virtual ErrorCode remove(const std::string &key) = 0;
2279 
2289  virtual std::vector<Header> get(const std::string &key) const = 0;
2290 
2301  virtual Header get_last(const std::string &key) const = 0;
2302 
2308  virtual std::vector<Header> get_all() const = 0;
2309 
2313  virtual size_t size() const = 0;
2314 };
2315 
2316 
2328 class RD_EXPORT Message {
2329  public:
2332  enum Status {
2336  MSG_STATUS_NOT_PERSISTED = 0,
2337 
2341  MSG_STATUS_POSSIBLY_PERSISTED = 1,
2342 
2346  MSG_STATUS_PERSISTED = 2,
2347  };
2348 
2356  virtual std::string errstr() const = 0;
2357 
2359  virtual ErrorCode err() const = 0;
2360 
2365  virtual Topic *topic() const = 0;
2366 
2368  virtual std::string topic_name() const = 0;
2369 
2371  virtual int32_t partition() const = 0;
2372 
2374  virtual void *payload() const = 0;
2375 
2377  virtual size_t len() const = 0;
2378 
2380  virtual const std::string *key() const = 0;
2381 
2383  virtual const void *key_pointer() const = 0;
2384 
2386  virtual size_t key_len() const = 0;
2387 
2389  virtual int64_t offset() const = 0;
2390 
2392  virtual MessageTimestamp timestamp() const = 0;
2393 
2395  virtual void *msg_opaque() const = 0;
2396 
2397  virtual ~Message() = 0;
2398 
2401  virtual int64_t latency() const = 0;
2402 
2419  virtual struct rd_kafka_message_s *c_ptr() = 0;
2420 
2424  virtual Status status() const = 0;
2425 
2430  virtual RdKafka::Headers *headers() = 0;
2431 
2438  virtual RdKafka::Headers *headers(RdKafka::ErrorCode *err) = 0;
2439 
2442  virtual int32_t broker_id() const = 0;
2443 };
2444 
2468 class RD_EXPORT Queue {
2469  public:
2473  static Queue *create(Handle *handle);
2474 
2485  virtual ErrorCode forward(Queue *dst) = 0;
2486 
2487 
2499  virtual Message *consume(int timeout_ms) = 0;
2500 
2508  virtual int poll(int timeout_ms) = 0;
2509 
2510  virtual ~Queue() = 0;
2511 
2527  virtual void io_event_enable(int fd, const void *payload, size_t size) = 0;
2528 };
2529 
2543 class RD_EXPORT ConsumerGroupMetadata {
2544  public:
2545  virtual ~ConsumerGroupMetadata() = 0;
2546 };
2547 
2565 class RD_EXPORT KafkaConsumer : public virtual Handle {
2566  public:
2578  static KafkaConsumer *create(const Conf *conf, std::string &errstr);
2579 
2580  virtual ~KafkaConsumer() = 0;
2581 
2582 
2585  virtual ErrorCode assignment(
2586  std::vector<RdKafka::TopicPartition *> &partitions) = 0;
2587 
2590  virtual ErrorCode subscription(std::vector<std::string> &topics) = 0;
2591 
2626  virtual ErrorCode subscribe(const std::vector<std::string> &topics) = 0;
2627 
2629  virtual ErrorCode unsubscribe() = 0;
2630 
2637  virtual ErrorCode assign(const std::vector<TopicPartition *> &partitions) = 0;
2638 
2642  virtual ErrorCode unassign() = 0;
2643 
2668  virtual Message *consume(int timeout_ms) = 0;
2669 
2683  virtual ErrorCode commitSync() = 0;
2684 
2690  virtual ErrorCode commitAsync() = 0;
2691 
2701  virtual ErrorCode commitSync(Message *message) = 0;
2702 
2712  virtual ErrorCode commitAsync(Message *message) = 0;
2713 
2723  virtual ErrorCode commitSync(std::vector<TopicPartition *> &offsets) = 0;
2724 
2734  virtual ErrorCode commitAsync(
2735  const std::vector<TopicPartition *> &offsets) = 0;
2736 
2747  virtual ErrorCode commitSync(OffsetCommitCb *offset_commit_cb) = 0;
2748 
2759  virtual ErrorCode commitSync(std::vector<TopicPartition *> &offsets,
2760  OffsetCommitCb *offset_commit_cb) = 0;
2761 
2762 
2763 
2772  virtual ErrorCode committed(std::vector<TopicPartition *> &partitions,
2773  int timeout_ms) = 0;
2774 
2783  virtual ErrorCode position(std::vector<TopicPartition *> &partitions) = 0;
2784 
2785 
2808  virtual ErrorCode close() = 0;
2809 
2810 
2828  virtual ErrorCode seek(const TopicPartition &partition, int timeout_ms) = 0;
2829 
2830 
2848  virtual ErrorCode offsets_store(std::vector<TopicPartition *> &offsets) = 0;
2849 
2850 
2861  virtual ConsumerGroupMetadata *groupMetadata() = 0;
2862 
2863 
2878  virtual bool assignment_lost() = 0;
2879 
2895  virtual std::string rebalance_protocol() = 0;
2896 
2897 
2913  virtual Error *incremental_assign(
2914  const std::vector<TopicPartition *> &partitions) = 0;
2915 
2916 
2932  virtual Error *incremental_unassign(
2933  const std::vector<TopicPartition *> &partitions) = 0;
2934 
2952  virtual Error *close(Queue *queue) = 0;
2953 
2954 
2959  virtual bool closed() = 0;
2960 };
2961 
2962 
2977 class RD_EXPORT Consumer : public virtual Handle {
2978  public:
2989  static Consumer *create(const Conf *conf, std::string &errstr);
2990 
2991  virtual ~Consumer() = 0;
2992 
2993 
3013  virtual ErrorCode start(Topic *topic, int32_t partition, int64_t offset) = 0;
3014 
3021  virtual ErrorCode start(Topic *topic,
3022  int32_t partition,
3023  int64_t offset,
3024  Queue *queue) = 0;
3025 
3035  virtual ErrorCode stop(Topic *topic, int32_t partition) = 0;
3036 
3051  virtual ErrorCode seek(Topic *topic,
3052  int32_t partition,
3053  int64_t offset,
3054  int timeout_ms) = 0;
3055 
3073  virtual Message *consume(Topic *topic, int32_t partition, int timeout_ms) = 0;
3074 
3096  virtual Message *consume(Queue *queue, int timeout_ms) = 0;
3097 
3117  virtual int consume_callback(Topic *topic,
3118  int32_t partition,
3119  int timeout_ms,
3120  ConsumeCb *consume_cb,
3121  void *opaque) = 0;
3122 
3129  virtual int consume_callback(Queue *queue,
3130  int timeout_ms,
3131  RdKafka::ConsumeCb *consume_cb,
3132  void *opaque) = 0;
3133 
3143  static int64_t OffsetTail(int64_t offset);
3144 };
3145 
3159 class RD_EXPORT Producer : public virtual Handle {
3160  public:
3171  static Producer *create(const Conf *conf, std::string &errstr);
3172 
3173 
3174  virtual ~Producer() = 0;
3175 
3181  enum {
3182  RK_MSG_FREE = 0x1,
3185  RK_MSG_COPY = 0x2,
3190  RK_MSG_BLOCK = 0x4
3207  /* For backwards compatibility: */
3208 #ifndef MSG_COPY /* defined in sys/msg.h */
3209  ,
3212  MSG_FREE = RK_MSG_FREE,
3213  MSG_COPY = RK_MSG_COPY
3214 #endif
3215 
3216  };
3217 
3274  virtual ErrorCode produce(Topic *topic,
3275  int32_t partition,
3276  int msgflags,
3277  void *payload,
3278  size_t len,
3279  const std::string *key,
3280  void *msg_opaque) = 0;
3281 
3286  virtual ErrorCode produce(Topic *topic,
3287  int32_t partition,
3288  int msgflags,
3289  void *payload,
3290  size_t len,
3291  const void *key,
3292  size_t key_len,
3293  void *msg_opaque) = 0;
3294 
3301  virtual ErrorCode produce(const std::string topic_name,
3302  int32_t partition,
3303  int msgflags,
3304  void *payload,
3305  size_t len,
3306  const void *key,
3307  size_t key_len,
3308  int64_t timestamp,
3309  void *msg_opaque) = 0;
3310 
3318  virtual ErrorCode produce(const std::string topic_name,
3319  int32_t partition,
3320  int msgflags,
3321  void *payload,
3322  size_t len,
3323  const void *key,
3324  size_t key_len,
3325  int64_t timestamp,
3326  RdKafka::Headers *headers,
3327  void *msg_opaque) = 0;
3328 
3329 
3334  virtual ErrorCode produce(Topic *topic,
3335  int32_t partition,
3336  const std::vector<char> *payload,
3337  const std::vector<char> *key,
3338  void *msg_opaque) = 0;
3339 
3340 
3356  virtual ErrorCode flush(int timeout_ms) = 0;
3357 
3358 
3386  virtual ErrorCode purge(int purge_flags) = 0;
3387 
3391  enum {
3392  PURGE_QUEUE = 0x1,
3394  PURGE_INFLIGHT = 0x2,
3401  PURGE_NON_BLOCKING = 0x4 /* Don't wait for background queue
3402  * purging to finish. */
3403  };
3404 
3431  virtual Error *init_transactions(int timeout_ms) = 0;
3432 
3433 
3446  virtual Error *begin_transaction() = 0;
3447 
3494  virtual Error *send_offsets_to_transaction(
3495  const std::vector<TopicPartition *> &offsets,
3496  const ConsumerGroupMetadata *group_metadata,
3497  int timeout_ms) = 0;
3498 
3527  virtual Error *commit_transaction(int timeout_ms) = 0;
3528 
3559  virtual Error *abort_transaction(int timeout_ms) = 0;
3560 
3562 };
3563 
3578  public:
3580  virtual int32_t id() const = 0;
3581 
3583  virtual const std::string host() const = 0;
3584 
3586  virtual int port() const = 0;
3587 
3588  virtual ~BrokerMetadata() = 0;
3589 };
3590 
3591 
3592 
3597  public:
3599  typedef std::vector<int32_t> ReplicasVector;
3601  typedef std::vector<int32_t> ISRSVector;
3602 
3604  typedef ReplicasVector::const_iterator ReplicasIterator;
3606  typedef ISRSVector::const_iterator ISRSIterator;
3607 
3608 
3610  virtual int32_t id() const = 0;
3611 
3613  virtual ErrorCode err() const = 0;
3614 
3616  virtual int32_t leader() const = 0;
3617 
3619  virtual const std::vector<int32_t> *replicas() const = 0;
3620 
3624  virtual const std::vector<int32_t> *isrs() const = 0;
3625 
3626  virtual ~PartitionMetadata() = 0;
3627 };
3628 
3629 
3630 
3635  public:
3637  typedef std::vector<const PartitionMetadata *> PartitionMetadataVector;
3639  typedef PartitionMetadataVector::const_iterator PartitionMetadataIterator;
3640 
3642  virtual const std::string topic() const = 0;
3643 
3645  virtual const PartitionMetadataVector *partitions() const = 0;
3646 
3648  virtual ErrorCode err() const = 0;
3649 
3650  virtual ~TopicMetadata() = 0;
3651 };
3652 
3653 
3657 class Metadata {
3658  public:
3660  typedef std::vector<const BrokerMetadata *> BrokerMetadataVector;
3662  typedef std::vector<const TopicMetadata *> TopicMetadataVector;
3663 
3665  typedef BrokerMetadataVector::const_iterator BrokerMetadataIterator;
3667  typedef TopicMetadataVector::const_iterator TopicMetadataIterator;
3668 
3669 
3675  virtual const BrokerMetadataVector *brokers() const = 0;
3676 
3682  virtual const TopicMetadataVector *topics() const = 0;
3683 
3685  virtual int32_t orig_broker_id() const = 0;
3686 
3688  virtual const std::string orig_broker_name() const = 0;
3689 
3690  virtual ~Metadata() = 0;
3691 };
3692 
3695 } // namespace RdKafka
3696 
3697 
3698 #endif /* _RDKAFKACPP_H_ */
virtual int32_t partitioner_cb(const Topic *topic, const void *key, size_t key_len, int32_t partition_cnt, void *msg_opaque)=0
Variant partitioner callback that gets key as pointer and length instead of as a const std::string *...
virtual ErrorCode err() const =0
std::string key() const
Definition: rdkafkacpp.h:2174
virtual const std::string orig_broker_name() const =0
Broker (name) originating this metadata.
ISRSVector::const_iterator ISRSIterator
ISRs iterator.
Definition: rdkafkacpp.h:3606
Header(const std::string &key, const void *value, size_t value_size, const RdKafka::ErrorCode err)
Header object to encapsulate a single Header.
Definition: rdkafkacpp.h:2127
Header object.
Definition: rdkafkacpp.h:2097
High-level KafkaConsumer (for brokers 0.9 and later)
Definition: rdkafkacpp.h:2565
static const int64_t OFFSET_BEGINNING
Special offsets.
Definition: rdkafkacpp.h:1975
ConfType
Configuration object type.
Definition: rdkafkacpp.h:1193
Partitioner callback class.
Definition: rdkafkacpp.h:762
virtual int port() const =0
Type
Event type.
Definition: rdkafkacpp.h:844
const char * value_string() const
Definition: rdkafkacpp.h:2185
virtual int32_t orig_broker_id() const =0
Broker (id) originating this metadata.
size_t value_size() const
Definition: rdkafkacpp.h:2190
virtual const std::string host() const =0
SASL/OAUTHBEARER token refresh callback class.
Definition: rdkafkacpp.h:737
static const int32_t PARTITION_UA
Unassigned partition.
Definition: rdkafkacpp.h:1972
Header(const Header &other)
Copy constructor.
Definition: rdkafkacpp.h:2141
Message object.
Definition: rdkafkacpp.h:2328
Severity
EVENT_LOG severities (conforms to syslog(3) severities)
Definition: rdkafkacpp.h:852
Event callback class.
Definition: rdkafkacpp.h:824
Headers object.
Definition: rdkafkacpp.h:2085
ReplicasVector::const_iterator ReplicasIterator
Replicas iterator.
Definition: rdkafkacpp.h:3604
Status
Message persistence status can be used by the application to find out if a produced message was persi...
Definition: rdkafkacpp.h:2332
KafkaConsumer: Rebalance callback class
Definition: rdkafkacpp.h:955
std::vector< int32_t > ReplicasVector
Replicas.
Definition: rdkafkacpp.h:3599
virtual const std::string topic() const =0
int64_t timestamp
Definition: rdkafkacpp.h:2072
Variant partitioner with key pointer.
Definition: rdkafkacpp.h:794
virtual int32_t id() const =0
BrokerMetadataVector::const_iterator BrokerMetadataIterator
Brokers iterator.
Definition: rdkafkacpp.h:3665
ConsumerGroupMetadata holds a consumer instance's group metadata state.
Definition: rdkafkacpp.h:2543
std::vector< const PartitionMetadata * > PartitionMetadataVector
Partitions.
Definition: rdkafkacpp.h:3637
Definition: rdkafkacpp.h:847
static const int64_t OFFSET_INVALID
Definition: rdkafkacpp.h:1978
virtual const PartitionMetadataVector * partitions() const =0
std::vector< const TopicMetadata * > TopicMetadataVector
Topics.
Definition: rdkafkacpp.h:3662
virtual const BrokerMetadataVector * brokers() const =0
Broker list.
RdKafka::ErrorCode err() const
Definition: rdkafkacpp.h:2195
Topic handle.
Definition: rdkafkacpp.h:1964
Metadata: Partition information.
Definition: rdkafkacpp.h:3596
Producer.
Definition: rdkafkacpp.h:3159
Metadata: Topic information.
Definition: rdkafkacpp.h:3634
Definition: rdkafkacpp.h:1194
Definition: rdkafkacpp.h:845
ConfResult
RdKafka::Conf::Set() result code.
Definition: rdkafkacpp.h:1201
Definition: rdkafkacpp.h:93
Queue interface.
Definition: rdkafkacpp.h:2468
MessageTimestampType type
Definition: rdkafkacpp.h:2071
Message timestamp object.
Definition: rdkafkacpp.h:2062
std::vector< int32_t > ISRSVector
ISRs (In-Sync-Replicas)
Definition: rdkafkacpp.h:3601
virtual const TopicMetadataVector * topics() const =0
Topic list.
static const int64_t OFFSET_END
Definition: rdkafkacpp.h:1976
PartitionMetadataVector::const_iterator PartitionMetadataIterator
Partitions iterator.
Definition: rdkafkacpp.h:3639
Portability: OpenCb callback class
Definition: rdkafkacpp.h:1149
SSL broker certificate verification class.
Definition: rdkafkacpp.h:1068
Metadata: Broker information.
Definition: rdkafkacpp.h:3577
Definition: rdkafkacpp.h:846
virtual int32_t leader() const =0
static const int64_t OFFSET_STORED
Definition: rdkafkacpp.h:1977
Header(const std::string &key, const void *value, size_t value_size)
Header object to encapsulate a single Header.
Definition: rdkafkacpp.h:2109
Configuration interface.
Definition: rdkafkacpp.h:1188
Offset Commit callback class.
Definition: rdkafkacpp.h:1037
virtual const std::vector< int32_t > * replicas() const =0
Base handle, super class for specific clients.
Definition: rdkafkacpp.h:1502
Topic+Partition.
Definition: rdkafkacpp.h:1915
The Error class is used as a return value from APIs to propagate an error. The error consists of an e...
Definition: rdkafkacpp.h:613
Consume callback class.
Definition: rdkafkacpp.h:936
virtual const std::vector< int32_t > * isrs() const =0
std::vector< const BrokerMetadata * > BrokerMetadataVector
Brokers.
Definition: rdkafkacpp.h:3660
Simple Consumer (legacy)
Definition: rdkafkacpp.h:2977
MessageTimestampType
Definition: rdkafkacpp.h:2065
TopicMetadataVector::const_iterator TopicMetadataIterator
Topics iterator.
Definition: rdkafkacpp.h:3667
Event object class as passed to the EventCb callback.
Definition: rdkafkacpp.h:841
Metadata container.
Definition: rdkafkacpp.h:3657
virtual ErrorCode err() const =0
Header & operator=(const Header &other)
Assignment operator.
Definition: rdkafkacpp.h:2151
Portability: SocketCb callback class
Definition: rdkafkacpp.h:1123
Delivery Report callback class.
Definition: rdkafkacpp.h:698
const void * value() const
Definition: rdkafkacpp.h:2179
virtual int32_t id() const =0