29 #ifndef _RDKAFKACPP_H_
30 #define _RDKAFKACPP_H_
58 #ifdef LIBRDKAFKA_STATICLIB
61 #ifdef LIBRDKAFKACPP_EXPORTS
62 #define RD_EXPORT __declspec(dllexport)
64 #define RD_EXPORT __declspec(dllimport)
76 struct rd_kafka_topic_s;
77 struct rd_kafka_message_s;
102 #define RD_KAFKA_VERSION 0x000b04ff
116 std::string version_str();
123 std::string get_debug_contexts();
135 int wait_destroyed(
int timeout_ms);
167 ERR__BAD_COMPRESSION = -198,
173 ERR__TRANSPORT = -195,
175 ERR__CRIT_SYS_RESOURCE = -194,
179 ERR__MSG_TIMED_OUT = -192,
182 ERR__PARTITION_EOF = -191,
184 ERR__UNKNOWN_PARTITION = -190,
188 ERR__UNKNOWN_TOPIC = -188,
190 ERR__ALL_BROKERS_DOWN = -187,
192 ERR__INVALID_ARG = -186,
194 ERR__TIMED_OUT = -185,
196 ERR__QUEUE_FULL = -184,
198 ERR__ISR_INSUFF = -183,
200 ERR__NODE_UPDATE = -182,
204 ERR__WAIT_COORD = -180,
206 ERR__UNKNOWN_GROUP = -179,
208 ERR__IN_PROGRESS = -178,
210 ERR__PREV_IN_PROGRESS = -177,
212 ERR__EXISTING_SUBSCRIPTION = -176,
214 ERR__ASSIGN_PARTITIONS = -175,
216 ERR__REVOKE_PARTITIONS = -174,
218 ERR__CONFLICT = -173,
222 ERR__UNKNOWN_PROTOCOL = -171,
224 ERR__NOT_IMPLEMENTED = -170,
226 ERR__AUTHENTICATION = -169,
228 ERR__NO_OFFSET = -168,
230 ERR__OUTDATED = -167,
232 ERR__TIMED_OUT_QUEUE = -166,
234 ERR__UNSUPPORTED_FEATURE = -165,
236 ERR__WAIT_CACHE = -164,
240 ERR__KEY_SERIALIZATION = -162,
242 ERR__VALUE_SERIALIZATION = -161,
244 ERR__KEY_DESERIALIZATION = -160,
246 ERR__VALUE_DESERIALIZATION = -159,
250 ERR__READ_ONLY = -157,
254 ERR__UNDERFLOW = -155,
265 ERR_OFFSET_OUT_OF_RANGE = 1,
269 ERR_UNKNOWN_TOPIC_OR_PART = 3,
271 ERR_INVALID_MSG_SIZE = 4,
273 ERR_LEADER_NOT_AVAILABLE = 5,
275 ERR_NOT_LEADER_FOR_PARTITION = 6,
277 ERR_REQUEST_TIMED_OUT = 7,
279 ERR_BROKER_NOT_AVAILABLE = 8,
281 ERR_REPLICA_NOT_AVAILABLE = 9,
283 ERR_MSG_SIZE_TOO_LARGE = 10,
285 ERR_STALE_CTRL_EPOCH = 11,
287 ERR_OFFSET_METADATA_TOO_LARGE = 12,
289 ERR_NETWORK_EXCEPTION = 13,
291 ERR_GROUP_LOAD_IN_PROGRESS = 14,
293 ERR_GROUP_COORDINATOR_NOT_AVAILABLE = 15,
295 ERR_NOT_COORDINATOR_FOR_GROUP = 16,
297 ERR_TOPIC_EXCEPTION = 17,
299 ERR_RECORD_LIST_TOO_LARGE = 18,
301 ERR_NOT_ENOUGH_REPLICAS = 19,
303 ERR_NOT_ENOUGH_REPLICAS_AFTER_APPEND = 20,
305 ERR_INVALID_REQUIRED_ACKS = 21,
307 ERR_ILLEGAL_GENERATION = 22,
309 ERR_INCONSISTENT_GROUP_PROTOCOL = 23,
311 ERR_INVALID_GROUP_ID = 24,
313 ERR_UNKNOWN_MEMBER_ID = 25,
315 ERR_INVALID_SESSION_TIMEOUT = 26,
317 ERR_REBALANCE_IN_PROGRESS = 27,
319 ERR_INVALID_COMMIT_OFFSET_SIZE = 28,
321 ERR_TOPIC_AUTHORIZATION_FAILED = 29,
323 ERR_GROUP_AUTHORIZATION_FAILED = 30,
325 ERR_CLUSTER_AUTHORIZATION_FAILED = 31,
327 ERR_INVALID_TIMESTAMP = 32,
329 ERR_UNSUPPORTED_SASL_MECHANISM = 33,
331 ERR_ILLEGAL_SASL_STATE = 34,
333 ERR_UNSUPPORTED_VERSION = 35,
335 ERR_TOPIC_ALREADY_EXISTS = 36,
337 ERR_INVALID_PARTITIONS = 37,
339 ERR_INVALID_REPLICATION_FACTOR = 38,
341 ERR_INVALID_REPLICA_ASSIGNMENT = 39,
343 ERR_INVALID_CONFIG = 40,
345 ERR_NOT_CONTROLLER = 41,
347 ERR_INVALID_REQUEST = 42,
349 ERR_UNSUPPORTED_FOR_MESSAGE_FORMAT = 43,
351 ERR_POLICY_VIOLATION = 44,
353 ERR_OUT_OF_ORDER_SEQUENCE_NUMBER = 45,
355 ERR_DUPLICATE_SEQUENCE_NUMBER = 46,
357 ERR_INVALID_PRODUCER_EPOCH = 47,
359 ERR_INVALID_TXN_STATE = 48,
362 ERR_INVALID_PRODUCER_ID_MAPPING = 49,
365 ERR_INVALID_TRANSACTION_TIMEOUT = 50,
368 ERR_CONCURRENT_TRANSACTIONS = 51,
372 ERR_TRANSACTION_COORDINATOR_FENCED = 52,
374 ERR_TRANSACTIONAL_ID_AUTHORIZATION_FAILED = 53,
376 ERR_SECURITY_DISABLED = 54,
378 ERR_OPERATION_NOT_ATTEMPTED = 55
386 std::string err2str(RdKafka::ErrorCode err);
400 class TopicPartition;
439 virtual void dr_cb (
Message &message) = 0;
470 virtual int32_t partitioner_cb (
const Topic *topic,
471 const std::string *key,
472 int32_t partition_cnt,
473 void *msg_opaque) = 0;
495 int32_t partition_cnt,
496 void *msg_opaque) = 0;
518 virtual void event_cb (
Event &event) = 0;
539 EVENT_SEVERITY_EMERG = 0,
540 EVENT_SEVERITY_ALERT = 1,
541 EVENT_SEVERITY_CRITICAL = 2,
542 EVENT_SEVERITY_ERROR = 3,
543 EVENT_SEVERITY_WARNING = 4,
544 EVENT_SEVERITY_NOTICE = 5,
545 EVENT_SEVERITY_INFO = 6,
546 EVENT_SEVERITY_DEBUG = 7
549 virtual ~
Event () { }
559 virtual Type type ()
const = 0;
565 virtual ErrorCode err ()
const = 0;
571 virtual Severity severity ()
const = 0;
577 virtual std::string fac ()
const = 0;
587 virtual std::string str ()
const = 0;
593 virtual int throttle_time ()
const = 0;
599 virtual std::string broker_name ()
const = 0;
605 virtual int broker_id ()
const = 0;
622 virtual void consume_cb (
Message &message,
void *opaque) = 0;
683 RdKafka::ErrorCode err,
684 std::vector<TopicPartition*>&partitions) = 0;
710 virtual void offset_commit_cb(RdKafka::ErrorCode err,
711 std::vector<TopicPartition*>&offsets) = 0;
737 virtual int socket_cb (
int domain,
int type,
int protocol) = 0;
760 virtual int open_cb (
const std::string &path,
int flags,
int mode) = 0;
809 static Conf *create (ConfType type);
827 const std::string &value,
828 std::string &errstr) = 0;
832 DeliveryReportCb *dr_cb,
833 std::string &errstr) = 0;
838 std::string &errstr) = 0;
848 const Conf *topic_conf,
849 std::string &errstr) = 0;
853 PartitionerCb *partitioner_cb,
854 std::string &errstr) = 0;
858 PartitionerKeyPointerCb *partitioner_kp_cb,
859 std::string &errstr) = 0;
863 std::string &errstr) = 0;
867 std::string &errstr) = 0;
871 RebalanceCb *rebalance_cb,
872 std::string &errstr) = 0;
876 OffsetCommitCb *offset_commit_cb,
877 std::string &errstr) = 0;
891 std::string &value)
const = 0;
911 virtual Conf::ConfResult get(PartitionerKeyPointerCb *&partitioner_kp_cb)
const = 0;
935 virtual std::list<std::string> *dump () = 0;
938 virtual Conf::ConfResult set (
const std::string &name, ConsumeCb *consume_cb,
939 std::string &errstr) = 0;
959 virtual const std::string name ()
const = 0;
969 virtual const std::string memberid ()
const = 0;
994 virtual int poll (
int timeout_ms) = 0;
1002 virtual int outq_len () = 0;
1019 virtual ErrorCode metadata (
bool all_topics,
const Topic *only_rkt,
1020 Metadata **metadatap,
int timeout_ms) = 0;
1032 virtual ErrorCode pause (std::vector<TopicPartition*> &partitions) = 0;
1044 virtual ErrorCode resume (std::vector<TopicPartition*> &partitions) = 0;
1055 virtual ErrorCode query_watermark_offsets (
const std::string &topic,
1057 int64_t *low, int64_t *high,
1058 int timeout_ms) = 0;
1077 virtual ErrorCode get_watermark_offsets (
const std::string &topic,
1079 int64_t *low, int64_t *high) = 0;
1101 virtual ErrorCode offsetsForTimes (std::vector<TopicPartition*> &offsets,
1102 int timeout_ms) = 0;
1131 virtual ErrorCode set_log_queue (
Queue *queue) = 0;
1144 virtual void yield () = 0;
1160 virtual const std::string clusterid (
int timeout_ms) = 0;
1178 virtual struct rd_kafka_s *c_ptr () = 0;
1208 static TopicPartition *create (
const std::string &topic,
int partition);
1209 static TopicPartition *create (
const std::string &topic,
int partition,
1218 static void destroy (std::vector<TopicPartition*> &partitions);
1221 virtual const std::string &topic ()
const = 0;
1224 virtual int partition ()
const = 0;
1227 virtual int64_t offset ()
const = 0;
1230 virtual void set_offset (int64_t offset) = 0;
1233 virtual ErrorCode err ()
const = 0;
1268 static Topic *create (
Handle *base,
const std::string &topic_str,
1269 Conf *conf, std::string &errstr);
1271 virtual ~
Topic () = 0;
1275 virtual const std::string name ()
const = 0;
1282 virtual bool partition_available (int32_t partition)
const = 0;
1294 virtual ErrorCode offset_store (int32_t partition, int64_t offset) = 0;
1312 virtual struct rd_kafka_topic_s *c_ptr () = 0;
1342 MSG_TIMESTAMP_LOG_APPEND_TIME
1371 virtual std::string errstr()
const = 0;
1374 virtual ErrorCode err ()
const = 0;
1380 virtual Topic *topic ()
const = 0;
1383 virtual std::string topic_name ()
const = 0;
1386 virtual int32_t partition ()
const = 0;
1389 virtual void *payload ()
const = 0 ;
1392 virtual size_t len ()
const = 0;
1395 virtual const std::string *key ()
const = 0;
1398 virtual const void *key_pointer ()
const = 0 ;
1401 virtual size_t key_len ()
const = 0;
1404 virtual int64_t offset ()
const = 0;
1410 virtual void *msg_opaque ()
const = 0;
1416 virtual int64_t latency ()
const = 0;
1434 virtual struct rd_kafka_message_s *c_ptr () = 0;
1477 virtual ErrorCode forward (
Queue *dst) = 0;
1491 virtual Message *consume (
int timeout_ms) = 0;
1500 virtual int poll (
int timeout_ms) = 0;
1502 virtual ~
Queue () = 0;
1519 virtual void io_event_enable (
int fd,
const void *payload,
size_t size) = 0;
1560 virtual ErrorCode assignment (std::vector<RdKafka::TopicPartition*> &partitions) = 0;
1564 virtual ErrorCode subscription (std::vector<std::string> &topics) = 0;
1590 virtual ErrorCode subscribe (
const std::vector<std::string> &topics) = 0;
1593 virtual ErrorCode unsubscribe () = 0;
1601 virtual ErrorCode assign (
const std::vector<TopicPartition*> &partitions) = 0;
1606 virtual ErrorCode unassign () = 0;
1632 virtual Message *consume (
int timeout_ms) = 0;
1647 virtual ErrorCode commitSync () = 0;
1654 virtual ErrorCode commitAsync () = 0;
1663 virtual ErrorCode commitSync (
Message *message) = 0;
1672 virtual ErrorCode commitAsync (
Message *message) = 0;
1679 virtual ErrorCode commitSync (std::vector<TopicPartition*> &offsets) = 0;
1686 virtual ErrorCode commitAsync (
const std::vector<TopicPartition*> &offsets) = 0;
1698 virtual ErrorCode commitSync (
OffsetCommitCb *offset_commit_cb) = 0;
1710 virtual ErrorCode commitSync (std::vector<TopicPartition*> &offsets,
1724 virtual ErrorCode committed (std::vector<TopicPartition*> &partitions,
1725 int timeout_ms) = 0;
1735 virtual ErrorCode position (std::vector<TopicPartition*> &partitions) = 0;
1760 virtual ErrorCode close () = 0;
1780 virtual ErrorCode seek (
const TopicPartition &partition,
int timeout_ms) = 0;
1797 virtual ErrorCode offsets_store (std::vector<TopicPartition*> &offsets) = 0;
1827 static Consumer *create (
Conf *conf, std::string &errstr);
1851 virtual ErrorCode start (
Topic *topic, int32_t partition, int64_t offset) = 0;
1859 virtual ErrorCode start (
Topic *topic, int32_t partition, int64_t offset,
1871 virtual ErrorCode stop (
Topic *topic, int32_t partition) = 0;
1887 virtual ErrorCode seek (
Topic *topic, int32_t partition, int64_t offset,
1888 int timeout_ms) = 0;
1907 virtual Message *consume (
Topic *topic, int32_t partition,
1908 int timeout_ms) = 0;
1931 virtual Message *consume (
Queue *queue,
int timeout_ms) = 0;
1952 virtual int consume_callback (
Topic *topic, int32_t partition,
1963 virtual int consume_callback (
Queue *queue,
int timeout_ms,
1976 static int64_t OffsetTail(int64_t offset);
2004 static Producer *create (
Conf *conf, std::string &errstr);
2043 MSG_FREE = RK_MSG_FREE,
2044 MSG_COPY = RK_MSG_COPY
2105 virtual ErrorCode produce (
Topic *topic, int32_t partition,
2107 void *payload,
size_t len,
2108 const std::string *key,
2109 void *msg_opaque) = 0;
2115 virtual ErrorCode produce (
Topic *topic, int32_t partition,
2117 void *payload,
size_t len,
2118 const void *key,
size_t key_len,
2119 void *msg_opaque) = 0;
2127 virtual ErrorCode produce (
const std::string topic_name, int32_t partition,
2129 void *payload,
size_t len,
2130 const void *key,
size_t key_len,
2132 void *msg_opaque) = 0;
2139 virtual ErrorCode produce (
Topic *topic, int32_t partition,
2140 const std::vector<char> *payload,
2141 const std::vector<char> *key,
2142 void *msg_opaque) = 0;
2156 virtual ErrorCode flush (
int timeout_ms) = 0;
2175 virtual int32_t
id()
const = 0;
2178 virtual const std::string
host()
const = 0;
2181 virtual int port()
const = 0;
2205 virtual int32_t
id()
const = 0;
2208 virtual ErrorCode
err()
const = 0;
2211 virtual int32_t
leader()
const = 0;
2214 virtual const std::vector<int32_t> *
replicas()
const = 0;
2219 virtual const std::vector<int32_t> *
isrs()
const = 0;
2237 virtual const std::string
topic()
const = 0;
2240 virtual const PartitionMetadataVector *
partitions()
const = 0;
2243 virtual ErrorCode
err()
const = 0;
2266 virtual const BrokerMetadataVector *
brokers()
const = 0;
2269 virtual const TopicMetadataVector *
topics()
const = 0;
virtual int32_t partitioner_cb(const Topic *topic, const void *key, size_t key_len, int32_t partition_cnt, void *msg_opaque)=0
Variant partitioner callback that gets key as pointer and length instead of as a const std::string *...
High-level KafkaConsumer (for brokers 0.9 and later)
Definition: rdkafkacpp.h:1540
Definition: rdkafkacpp.h:1341
static const int64_t OFFSET_BEGINNING
Special offsets.
Definition: rdkafkacpp.h:1253
ConfType
Configuration object type.
Definition: rdkafkacpp.h:791
Partitioner callback class.
Definition: rdkafkacpp.h:452
Type
Event type.
Definition: rdkafkacpp.h:530
static const int32_t PARTITION_UA
Unassigned partition.
Definition: rdkafkacpp.h:1250
Message object.
Definition: rdkafkacpp.h:1362
Severity
EVENT_LOG severities (conforms to syslog(3) severities)
Definition: rdkafkacpp.h:538
Event callback class.
Definition: rdkafkacpp.h:511
KafkaConsunmer: Rebalance callback class
Definition: rdkafkacpp.h:631
int64_t timestamp
Definition: rdkafkacpp.h:1346
Variant partitioner with key pointer.
Definition: rdkafkacpp.h:482
Definition: rdkafkacpp.h:533
static const int64_t OFFSET_INVALID
Definition: rdkafkacpp.h:1256
Definition: rdkafkacpp.h:1340
Topic handle.
Definition: rdkafkacpp.h:1242
Producer.
Definition: rdkafkacpp.h:1992
Definition: rdkafkacpp.h:792
Definition: rdkafkacpp.h:531
ConfResult
RdKafka::Conf::Set() result code.
Definition: rdkafkacpp.h:799
Definition: rdkafkacpp.h:80
Queue interface.
Definition: rdkafkacpp.h:1460
MessageTimestampType type
Definition: rdkafkacpp.h:1345
Message timestamp object.
Definition: rdkafkacpp.h:1337
static const int64_t OFFSET_END
Definition: rdkafkacpp.h:1254
Portability: OpenCb callback class
Definition: rdkafkacpp.h:747
Definition: rdkafkacpp.h:532
static const int64_t OFFSET_STORED
Definition: rdkafkacpp.h:1255
Configuration interface.
Definition: rdkafkacpp.h:786
Offset Commit callback class.
Definition: rdkafkacpp.h:693
Base handle, super class for specific clients.
Definition: rdkafkacpp.h:954
Topic+Partition.
Definition: rdkafkacpp.h:1200
Consume callback class.
Definition: rdkafkacpp.h:613
Simple Consumer (legacy)
Definition: rdkafkacpp.h:1815
MessageTimestampType
Definition: rdkafkacpp.h:1339
Event object class as passed to the EventCb callback.
Definition: rdkafkacpp.h:527
Portability: SocketCb callback class
Definition: rdkafkacpp.h:722
Delivery Report callback class.
Definition: rdkafkacpp.h:434