librdkafka
The Apache Kafka C/C++ client library
 All Data Structures Files Functions Variables Typedefs Enumerations Enumerator Macros Pages
rdkafkacpp.h
Go to the documentation of this file.
1 /*
2  * librdkafka - Apache Kafka C/C++ library
3  *
4  * Copyright (c) 2014 Magnus Edenhill
5  * All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions are met:
9  *
10  * 1. Redistributions of source code must retain the above copyright notice,
11  * this list of conditions and the following disclaimer.
12  * 2. Redistributions in binary form must reproduce the above copyright notice,
13  * this list of conditions and the following disclaimer in the documentation
14  * and/or other materials provided with the distribution.
15  *
16  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26  * POSSIBILITY OF SUCH DAMAGE.
27  */
28 
29 #pragma once
30 
49 #include <string>
50 #include <list>
51 #include <vector>
52 #include <stdint.h>
53 
54 
55 #ifdef _MSC_VER
56 #undef RD_EXPORT
57 #ifdef LIBRDKAFKACPP_EXPORTS
58 #define RD_EXPORT __declspec(dllexport)
59 #else
60 #define RD_EXPORT __declspec(dllimport)
61 #endif
62 #else
63 #define RD_EXPORT
64 #endif
65 
68 namespace RdKafka {
69 
70 
90 #define RD_KAFKA_VERSION 0x000900ff
91 
97 RD_EXPORT
98 int version ();
99 
103 RD_EXPORT
104 std::string version_str();
105 
110 RD_EXPORT
111 std::string get_debug_contexts();
112 
122 RD_EXPORT
123 int wait_destroyed(int timeout_ms);
124 
125 
148 enum ErrorCode {
149  /* Internal errors to rdkafka: */
151  ERR__BEGIN = -200,
153  ERR__BAD_MSG = -199,
155  ERR__BAD_COMPRESSION = -198,
157  ERR__DESTROY = -197,
159  ERR__FAIL = -196,
161  ERR__TRANSPORT = -195,
163  ERR__CRIT_SYS_RESOURCE = -194,
165  ERR__RESOLVE = -193,
167  ERR__MSG_TIMED_OUT = -192,
170  ERR__PARTITION_EOF = -191,
172  ERR__UNKNOWN_PARTITION = -190,
174  ERR__FS = -189,
176  ERR__UNKNOWN_TOPIC = -188,
178  ERR__ALL_BROKERS_DOWN = -187,
180  ERR__INVALID_ARG = -186,
182  ERR__TIMED_OUT = -185,
184  ERR__QUEUE_FULL = -184,
186  ERR__ISR_INSUFF = -183,
188  ERR__NODE_UPDATE = -182,
190  ERR__SSL = -181,
192  ERR__WAIT_COORD = -180,
194  ERR__UNKNOWN_GROUP = -179,
196  ERR__IN_PROGRESS = -178,
198  ERR__PREV_IN_PROGRESS = -177,
200  ERR__EXISTING_SUBSCRIPTION = -176,
202  ERR__ASSIGN_PARTITIONS = -175,
204  ERR__REVOKE_PARTITIONS = -174,
206  ERR__CONFLICT = -173,
208  ERR__STATE = -172,
210  ERR__UNKNOWN_PROTOCOL = -171,
212  ERR__NOT_IMPLEMENTED = -170,
214  ERR__AUTHENTICATION = -169,
216  ERR__NO_OFFSET = -168,
218  ERR__END = -100,
219 
220  /* Kafka broker errors: */
222  ERR_UNKNOWN = -1,
224  ERR_NO_ERROR = 0,
226  ERR_OFFSET_OUT_OF_RANGE = 1,
228  ERR_INVALID_MSG = 2,
230  ERR_UNKNOWN_TOPIC_OR_PART = 3,
232  ERR_INVALID_MSG_SIZE = 4,
234  ERR_LEADER_NOT_AVAILABLE = 5,
236  ERR_NOT_LEADER_FOR_PARTITION = 6,
238  ERR_REQUEST_TIMED_OUT = 7,
240  ERR_BROKER_NOT_AVAILABLE = 8,
242  ERR_REPLICA_NOT_AVAILABLE = 9,
244  ERR_MSG_SIZE_TOO_LARGE = 10,
246  ERR_STALE_CTRL_EPOCH = 11,
248  ERR_OFFSET_METADATA_TOO_LARGE = 12,
250  ERR_NETWORK_EXCEPTION = 13,
252  ERR_GROUP_LOAD_IN_PROGRESS = 14,
254  ERR_GROUP_COORDINATOR_NOT_AVAILABLE = 15,
256  ERR_NOT_COORDINATOR_FOR_GROUP = 16,
258  ERR_TOPIC_EXCEPTION = 17,
260  ERR_RECORD_LIST_TOO_LARGE = 18,
262  ERR_NOT_ENOUGH_REPLICAS = 19,
264  ERR_NOT_ENOUGH_REPLICAS_AFTER_APPEND = 20,
266  ERR_INVALID_REQUIRED_ACKS = 21,
268  ERR_ILLEGAL_GENERATION = 22,
270  ERR_INCONSISTENT_GROUP_PROTOCOL = 23,
272  ERR_INVALID_GROUP_ID = 24,
274  ERR_UNKNOWN_MEMBER_ID = 25,
276  ERR_INVALID_SESSION_TIMEOUT = 26,
278  ERR_REBALANCE_IN_PROGRESS = 27,
280  ERR_INVALID_COMMIT_OFFSET_SIZE = 28,
282  ERR_TOPIC_AUTHORIZATION_FAILED = 29,
284  ERR_GROUP_AUTHORIZATION_FAILED = 30,
286  ERR_CLUSTER_AUTHORIZATION_FAILED = 31
287 };
288 
289 
293 RD_EXPORT
294 std::string err2str(RdKafka::ErrorCode err);
295 
296 
302 /* Forward declarations */
303 class Producer;
304 class Message;
305 class Event;
306 class Topic;
307 class TopicPartition;
308 class Metadata;
309 class KafkaConsumer;
341 class RD_EXPORT DeliveryReportCb {
342  public:
346  virtual void dr_cb (Message &message) = 0;
347 };
348 
349 
357 class RD_EXPORT PartitionerCb {
358  public:
375  virtual int32_t partitioner_cb (const Topic *topic,
376  const std::string *key,
377  int32_t partition_cnt,
378  void *msg_opaque) = 0;
379 };
380 
386  public:
395  virtual int32_t partitioner_cb (const Topic *topic,
396  const void *key,
397  size_t key_len,
398  int32_t partition_cnt,
399  void *msg_opaque) = 0;
400 };
401 
402 
403 
412 class RD_EXPORT EventCb {
413  public:
419  virtual void event_cb (Event &event) = 0;
420 };
421 
422 
426 class RD_EXPORT Event {
427  public:
429  enum Type {
433  EVENT_THROTTLE
434  };
435 
437  enum Severity {
438  EVENT_SEVERITY_EMERG = 0,
439  EVENT_SEVERITY_ALERT = 1,
440  EVENT_SEVERITY_CRITICAL = 2,
441  EVENT_SEVERITY_ERROR = 3,
442  EVENT_SEVERITY_WARNING = 4,
443  EVENT_SEVERITY_NOTICE = 5,
444  EVENT_SEVERITY_INFO = 6,
445  EVENT_SEVERITY_DEBUG = 7
446  };
447 
448  ~Event () {};
449 
450  /*
451  * Event Accessor methods
452  */
453 
458  virtual Type type () const = 0;
459 
464  virtual ErrorCode err () const = 0;
465 
470  virtual Severity severity () const = 0;
471 
476  virtual std::string fac () const = 0;
477 
482  virtual std::string str () const = 0;
483 
488  virtual int throttle_time () const = 0;
489 
494  virtual std::string broker_name () const = 0;
495 
500  virtual int broker_id () const = 0;
501 };
502 
503 
504 
508 class RD_EXPORT ConsumeCb {
509  public:
517  virtual void consume_cb (Message &message, void *opaque) = 0;
518 };
519 
520 
524 class RD_EXPORT RebalanceCb {
525 public:
570  virtual void rebalance_cb (RdKafka::KafkaConsumer *consumer,
571  RdKafka::ErrorCode err,
572  std::vector<TopicPartition*>&partitions) = 0;
573 };
574 
575 
579 class RD_EXPORT OffsetCommitCb {
580 public:
593  virtual void offset_commit_cb(RdKafka::ErrorCode err,
594  std::vector<TopicPartition*>&offsets) = 0;
595 };
596 
597 
598 
603 class RD_EXPORT SocketCb {
604  public:
618  virtual int socket_cb (int domain, int type, int protocol) = 0;
619 };
620 
621 
626 class RD_EXPORT OpenCb {
627  public:
639  virtual int open_cb (const std::string &path, int flags, int mode) = 0;
640 };
641 
642 
643 
644 
645 
666 class RD_EXPORT Conf {
667  public:
671  enum ConfType {
673  CONF_TOPIC
674  };
675 
679  enum ConfResult {
680  CONF_UNKNOWN = -2,
681  CONF_INVALID = -1,
682  CONF_OK = 0
683  };
684 
685 
689  static Conf *create (ConfType type);
690 
691  virtual ~Conf () { };
692 
698  virtual Conf::ConfResult set (const std::string &name,
699  const std::string &value,
700  std::string &errstr) = 0;
701 
703  virtual Conf::ConfResult set (const std::string &name,
704  DeliveryReportCb *dr_cb,
705  std::string &errstr) = 0;
706 
708  virtual Conf::ConfResult set (const std::string &name,
709  EventCb *event_cb,
710  std::string &errstr) = 0;
711 
719  virtual Conf::ConfResult set (const std::string &name,
720  const Conf *topic_conf,
721  std::string &errstr) = 0;
722 
724  virtual Conf::ConfResult set (const std::string &name,
725  PartitionerCb *partitioner_cb,
726  std::string &errstr) = 0;
727 
729  virtual Conf::ConfResult set (const std::string &name,
730  PartitionerKeyPointerCb *partitioner_kp_cb,
731  std::string &errstr) = 0;
732 
734  virtual Conf::ConfResult set (const std::string &name, SocketCb *socket_cb,
735  std::string &errstr) = 0;
736 
738  virtual Conf::ConfResult set (const std::string &name, OpenCb *open_cb,
739  std::string &errstr) = 0;
740 
742  virtual Conf::ConfResult set (const std::string &name,
743  RebalanceCb *rebalance_cb,
744  std::string &errstr) = 0;
745 
747  virtual Conf::ConfResult set (const std::string &name,
748  OffsetCommitCb *offset_commit_cb,
749  std::string &errstr) = 0;
750 
754  virtual Conf::ConfResult get(const std::string &name,
755  std::string &value) const = 0;
756 
759  virtual std::list<std::string> *dump () = 0;
760 };
761 
774 class RD_EXPORT Handle {
775  public:
776  virtual ~Handle() {};
777 
779  virtual const std::string name () const = 0;
780 
789  virtual const std::string memberid () const = 0;
790 
791 
811  virtual int poll (int timeout_ms) = 0;
812 
819  virtual int outq_len () = 0;
820 
836  virtual ErrorCode metadata (bool all_topics, const Topic *only_rkt,
837  Metadata **metadatap, int timeout_ms) = 0;
838 };
839 
840 
859 class RD_EXPORT TopicPartition {
860 public:
866  static TopicPartition *create (const std::string &topic, int partition);
867 
868  virtual ~TopicPartition() = 0;
869 
871  virtual const std::string &topic () const = 0;
872 
874  virtual int partition () = 0;
875 
877  virtual int64_t offset () = 0;
878 
880  virtual void set_offset (int64_t offset) = 0;
881 
883  virtual ErrorCode err () = 0;
884 };
885 
886 
887 
892 class RD_EXPORT Topic {
893  public:
900  static const int32_t PARTITION_UA = -1;
901 
903  static const int64_t OFFSET_BEGINNING = -2;
904  static const int64_t OFFSET_END = -1;
905  static const int64_t OFFSET_STORED = -1000;
917  static Topic *create (Handle *base, const std::string &topic_str,
918  Conf *conf, std::string &errstr);
919 
920  virtual ~Topic () = 0;
921 
922 
924  virtual const std::string name () const = 0;
925 
931  virtual bool partition_available (int32_t partition) const = 0;
932 
944  virtual ErrorCode offset_store (int32_t partition, int64_t offset) = 0;
945 };
946 
947 
969 class RD_EXPORT Message {
970  public:
978  virtual std::string errstr() const = 0;
979 
981  virtual ErrorCode err () const = 0;
982 
987  virtual Topic *topic () const = 0;
988 
990  virtual std::string topic_name () const = 0;
991 
993  virtual int32_t partition () const = 0;
994 
996  virtual void *payload () const = 0 ;
997 
999  virtual size_t len () const = 0;
1000 
1002  virtual const std::string *key () const = 0;
1003 
1005  virtual const void *key_pointer () const = 0 ;
1006 
1008  virtual size_t key_len () const = 0;
1009 
1011  virtual int64_t offset () const = 0;
1012 
1014  virtual void *msg_opaque () const = 0;
1015 
1016  virtual ~Message () = 0;
1017 };
1018 
1042 class Queue {
1043  public:
1047  static Queue *create (Handle *handle);
1048 
1049  virtual ~Queue () { }
1050 };
1051 
1070 class RD_EXPORT KafkaConsumer : public virtual Handle {
1071 public:
1083  static KafkaConsumer *create (Conf *conf, std::string &errstr);
1084 
1085  virtual ~KafkaConsumer () = 0;
1086 
1087 
1090  virtual ErrorCode assignment (std::vector<RdKafka::TopicPartition*> &partitions) = 0;
1091 
1094  virtual ErrorCode subscription (std::vector<std::string> &topics) = 0;
1095 
1118  virtual ErrorCode subscribe (const std::vector<std::string> &topics) = 0;
1119 
1121  virtual ErrorCode unsubscribe () = 0;
1122 
1129  virtual ErrorCode assign (const std::vector<TopicPartition*> &partitions) = 0;
1130 
1134  virtual ErrorCode unassign () = 0;
1135 
1151  virtual Message *consume (int timeout_ms) = 0;
1152 
1166  virtual ErrorCode commitSync () = 0;
1167 
1173  virtual ErrorCode commitAsync () = 0;
1174 
1182  virtual ErrorCode commitSync (Message *message) = 0;
1183 
1191  virtual ErrorCode commitAsync (Message *message) = 0;
1192 
1207  virtual ErrorCode close () = 0;
1208 };
1209 
1210 
1225 class RD_EXPORT Consumer : public virtual Handle {
1226  public:
1237  static Consumer *create (Conf *conf, std::string &errstr);
1238 
1239  virtual ~Consumer () = 0;
1240 
1241 
1261  virtual ErrorCode start (Topic *topic, int32_t partition, int64_t offset) = 0;
1262 
1269  virtual ErrorCode start (Topic *topic, int32_t partition, int64_t offset,
1270  Queue *queue) = 0;
1271 
1281  virtual ErrorCode stop (Topic *topic, int32_t partition) = 0;
1282 
1300  virtual Message *consume (Topic *topic, int32_t partition,
1301  int timeout_ms) = 0;
1302 
1324  virtual Message *consume (Queue *queue, int timeout_ms) = 0;
1325 
1345  virtual int consume_callback (Topic *topic, int32_t partition,
1346  int timeout_ms,
1347  ConsumeCb *consume_cb,
1348  void *opaque) = 0;
1349 
1356  virtual int consume_callback (Queue *queue, int timeout_ms,
1357  RdKafka::ConsumeCb *consume_cb,
1358  void *opaque) = 0;
1359 
1365  static int64_t OffsetTail(int64_t offset);
1366 };
1367 
1381 class RD_EXPORT Producer : public virtual Handle {
1382  public:
1393  static Producer *create (Conf *conf, std::string &errstr);
1394 
1395 
1396  virtual ~Producer () = 0;
1397 
1403  static const int RK_MSG_FREE = 0x1;
1405  static const int RK_MSG_COPY = 0x2;
1411  /* For backwards compatibility: */
1412 #ifndef MSG_COPY /* defined in sys/msg.h */
1413  static const int MSG_FREE = RK_MSG_FREE;
1414  static const int MSG_COPY = RK_MSG_COPY;
1415 #endif
1416 
1462  virtual ErrorCode produce (Topic *topic, int32_t partition,
1463  int msgflags,
1464  void *payload, size_t len,
1465  const std::string *key,
1466  void *msg_opaque) = 0;
1467 
1472  virtual ErrorCode produce (Topic *topic, int32_t partition,
1473  int msgflags,
1474  void *payload, size_t len,
1475  const void *key, size_t key_len,
1476  void *msg_opaque) = 0;
1477 
1478 
1483  virtual ErrorCode produce (Topic *topic, int32_t partition,
1484  const std::vector<char> *payload,
1485  const std::vector<char> *key,
1486  void *msg_opaque) = 0;
1487 };
1488 
1503  public:
1505  virtual int32_t id() const = 0;
1506 
1508  virtual const std::string host() const = 0;
1509 
1511  virtual int port() const = 0;
1512 
1513  virtual ~BrokerMetadata() = 0;
1514 };
1515 
1516 
1517 
1522  public:
1524  typedef std::vector<int32_t> ReplicasVector;
1526  typedef std::vector<int32_t> ISRSVector;
1527 
1529  typedef ReplicasVector::const_iterator ReplicasIterator;
1531  typedef ISRSVector::const_iterator ISRSIterator;
1532 
1533 
1535  virtual int32_t id() const = 0;
1536 
1538  virtual ErrorCode err() const = 0;
1539 
1541  virtual int32_t leader() const = 0;
1542 
1544  virtual const std::vector<int32_t> *replicas() const = 0;
1545 
1549  virtual const std::vector<int32_t> *isrs() const = 0;
1550 
1551  virtual ~PartitionMetadata() = 0;
1552 };
1553 
1554 
1555 
1560  public:
1562  typedef std::vector<const PartitionMetadata*> PartitionMetadataVector;
1564  typedef PartitionMetadataVector::const_iterator PartitionMetadataIterator;
1565 
1567  virtual const std::string topic() const = 0;
1568 
1570  virtual const PartitionMetadataVector *partitions() const = 0;
1571 
1573  virtual ErrorCode err() const = 0;
1574 
1575  virtual ~TopicMetadata() = 0;
1576 };
1577 
1578 
1582 class Metadata {
1583  public:
1585  typedef std::vector<const BrokerMetadata*> BrokerMetadataVector;
1587  typedef std::vector<const TopicMetadata*> TopicMetadataVector;
1588 
1590  typedef BrokerMetadataVector::const_iterator BrokerMetadataIterator;
1592  typedef TopicMetadataVector::const_iterator TopicMetadataIterator;
1593 
1594 
1596  virtual const BrokerMetadataVector *brokers() const = 0;
1597 
1599  virtual const TopicMetadataVector *topics() const = 0;
1600 
1602  virtual int32_t orig_broker_id() const = 0;
1603 
1605  virtual const std::string orig_broker_name() const = 0;
1606 
1607  virtual ~Metadata() = 0;
1608 };
1609 
1612 }
1613 
virtual int32_t partitioner_cb(const Topic *topic, const void *key, size_t key_len, int32_t partition_cnt, void *msg_opaque)=0
Variant partitioner callback that gets key as pointer and length instead of as a const std::string *...
virtual ErrorCode err() const =0
virtual const std::string orig_broker_name() const =0
Broker (name) originating this metadata.
ISRSVector::const_iterator ISRSIterator
ISRs iterator.
Definition: rdkafkacpp.h:1531
High-level KafkaConsumer (for brokers 0.9 and later)
Definition: rdkafkacpp.h:1070
ConfType
Configuration object type.
Definition: rdkafkacpp.h:671
Partitioner callback class.
Definition: rdkafkacpp.h:357
virtual int port() const =0
static Queue * create(Handle *handle)
Create Queue object.
Type
Event type.
Definition: rdkafkacpp.h:429
virtual int32_t orig_broker_id() const =0
Broker (id) originating this metadata.
virtual const std::string host() const =0
std::vector< const PartitionMetadata * > PartitionMetadataVector
Partitions.
Definition: rdkafkacpp.h:1562
Message object.
Definition: rdkafkacpp.h:969
Severity
EVENT_LOG severities (conforms to syslog(3) severities)
Definition: rdkafkacpp.h:437
Event callback class.
Definition: rdkafkacpp.h:412
ReplicasVector::const_iterator ReplicasIterator
Replicas iterator.
Definition: rdkafkacpp.h:1529
KafkaConsunmer: Rebalance callback class
Definition: rdkafkacpp.h:524
std::vector< int32_t > ReplicasVector
Replicas.
Definition: rdkafkacpp.h:1524
virtual const std::string topic() const =0
Variant partitioner with key pointer.
Definition: rdkafkacpp.h:385
virtual int32_t id() const =0
BrokerMetadataVector::const_iterator BrokerMetadataIterator
Brokers iterator.
Definition: rdkafkacpp.h:1590
Definition: rdkafkacpp.h:432
virtual const PartitionMetadataVector * partitions() const =0
virtual const BrokerMetadataVector * brokers() const =0
Broker list.
Topic handle.
Definition: rdkafkacpp.h:892
Metadata: Partition information.
Definition: rdkafkacpp.h:1521
Producer.
Definition: rdkafkacpp.h:1381
Metadata: Topic information.
Definition: rdkafkacpp.h:1559
Definition: rdkafkacpp.h:672
Definition: rdkafkacpp.h:430
std::vector< const BrokerMetadata * > BrokerMetadataVector
Brokers.
Definition: rdkafkacpp.h:1585
ConfResult
RdKafka::Conf::Set() result code.
Definition: rdkafkacpp.h:679
Definition: rdkafkacpp.h:68
Queue interface.
Definition: rdkafkacpp.h:1042
std::vector< int32_t > ISRSVector
ISRs (In-Sync-Replicas)
Definition: rdkafkacpp.h:1526
virtual const TopicMetadataVector * topics() const =0
Topic list.
PartitionMetadataVector::const_iterator PartitionMetadataIterator
Partitions iterator.
Definition: rdkafkacpp.h:1564
Portability: OpenCb callback class
Definition: rdkafkacpp.h:626
Metadata: Broker information.
Definition: rdkafkacpp.h:1502
Definition: rdkafkacpp.h:431
virtual int32_t leader() const =0
Configuration interface.
Definition: rdkafkacpp.h:666
Offset Commit callback class.
Definition: rdkafkacpp.h:579
virtual const std::vector< int32_t > * replicas() const =0
Base handle, super class for specific clients.
Definition: rdkafkacpp.h:774
std::vector< const TopicMetadata * > TopicMetadataVector
Topics.
Definition: rdkafkacpp.h:1587
Topic+Partition.
Definition: rdkafkacpp.h:859
Consume callback class.
Definition: rdkafkacpp.h:508
virtual const std::vector< int32_t > * isrs() const =0
Simple Consumer (legacy)
Definition: rdkafkacpp.h:1225
TopicMetadataVector::const_iterator TopicMetadataIterator
Topics iterator.
Definition: rdkafkacpp.h:1592
Event object class as passed to the EventCb callback.
Definition: rdkafkacpp.h:426
Metadata container.
Definition: rdkafkacpp.h:1582
virtual ErrorCode err() const =0
Portability: SocketCb callback class
Definition: rdkafkacpp.h:603
Delivery Report callback class.
Definition: rdkafkacpp.h:341
virtual int32_t id() const =0