librdkafka
The Apache Kafka C/C++ client library
Loading...
Searching...
No Matches
rdkafkacpp.h
Go to the documentation of this file.
1/*
2 * librdkafka - Apache Kafka C/C++ library
3 *
4 * Copyright (c) 2014-2022, Magnus Edenhill
5 * 2023, Confluent Inc.
6 * All rights reserved.
7 *
8 * Redistribution and use in source and binary forms, with or without
9 * modification, are permitted provided that the following conditions are met:
10 *
11 * 1. Redistributions of source code must retain the above copyright notice,
12 * this list of conditions and the following disclaimer.
13 * 2. Redistributions in binary form must reproduce the above copyright notice,
14 * this list of conditions and the following disclaimer in the documentation
15 * and/or other materials provided with the distribution.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 * POSSIBILITY OF SUCH DAMAGE.
28 */
29
30#ifndef _RDKAFKACPP_H_
31#define _RDKAFKACPP_H_
32
51#include <string>
52#include <list>
53#include <vector>
54#include <cstdlib>
55#include <cstring>
56#include <stdint.h>
57#include <sys/types.h>
58
59#ifdef _WIN32
60#ifndef ssize_t
61#ifndef _BASETSD_H_
62#include <basetsd.h>
63#endif
64#ifndef _SSIZE_T_DEFINED
65#define _SSIZE_T_DEFINED
66typedef SSIZE_T ssize_t;
67#endif
68#endif
69#undef RD_EXPORT
70#ifdef LIBRDKAFKA_STATICLIB
71#define RD_EXPORT
72#else
73#ifdef LIBRDKAFKACPP_EXPORTS
74#define RD_EXPORT __declspec(dllexport)
75#else
76#define RD_EXPORT __declspec(dllimport)
77#endif
78#endif
79#else
80#define RD_EXPORT
81#endif
82
85extern "C" {
86/* Forward declarations */
87struct rd_kafka_s;
88struct rd_kafka_topic_s;
89struct rd_kafka_message_s;
90struct rd_kafka_conf_s;
91struct rd_kafka_topic_conf_s;
92}
93
94namespace RdKafka {
95
115#define RD_KAFKA_VERSION 0x020b00ff
116
122RD_EXPORT
124
128RD_EXPORT
129std::string version_str();
130
135RD_EXPORT
136std::string get_debug_contexts();
137
147RD_EXPORT
148int wait_destroyed(int timeout_ms);
149
160RD_EXPORT
161void *mem_malloc(size_t size);
162
176RD_EXPORT
177void mem_free(void *ptr);
178
202 /* Internal errors to rdkafka: */
212 ERR__FAIL = -196,
229 ERR__FS = -189,
245 ERR__SSL = -181,
281 ERR__INTR = -163,
325 ERR__NOOP = -141,
335
337 ERR__END = -100,
338
339 /* Kafka broker errors: */
373#define ERR_GROUP_LOAD_IN_PROGRESS ERR_COORDINATOR_LOAD_IN_PROGRESS
377#define ERR_GROUP_COORDINATOR_NOT_AVAILABLE ERR_COORDINATOR_NOT_AVAILABLE
381#define ERR_NOT_COORDINATOR_FOR_GROUP ERR_NOT_COORDINATOR
575
576
580RD_EXPORT
582
583
584
595
606
612/* Forward declarations */
613class Handle;
614class Producer;
615class Message;
616class Headers;
617class Queue;
618class Event;
619class Topic;
620class TopicPartition;
621class Metadata;
622class KafkaConsumer;
642class RD_EXPORT Error {
643 public:
647 static Error *create(ErrorCode code, const std::string *errstr);
648
649 virtual ~Error() {
650 }
651
652 /*
653 * Error accessor methods
654 */
655
659 virtual ErrorCode code() const = 0;
660
664 virtual std::string name() const = 0;
665
669 virtual std::string str() const = 0;
670
675 virtual bool is_fatal() const = 0;
676
680 virtual bool is_retriable() const = 0;
681
693 virtual bool txn_requires_abort() const = 0;
694};
695
727class RD_EXPORT DeliveryReportCb {
728 public:
732 virtual void dr_cb(Message &message) = 0;
733
734 virtual ~DeliveryReportCb() {
735 }
736};
737
738
767 public:
776 RdKafka::Handle *handle,
777 const std::string &oauthbearer_config) = 0;
778
780 }
781};
782
783
791class RD_EXPORT PartitionerCb {
792 public:
810 virtual int32_t partitioner_cb(const Topic *topic,
811 const std::string *key,
812 int32_t partition_cnt,
813 void *msg_opaque) = 0;
814
815 virtual ~PartitionerCb() {
816 }
817};
818
824 public:
833 virtual int32_t partitioner_cb(const Topic *topic,
834 const void *key,
835 size_t key_len,
836 int32_t partition_cnt,
837 void *msg_opaque) = 0;
838
839 virtual ~PartitionerKeyPointerCb() {
840 }
841};
842
843
844
853class RD_EXPORT EventCb {
854 public:
860 virtual void event_cb(Event &event) = 0;
861
862 virtual ~EventCb() {
863 }
864};
865
866
870class RD_EXPORT Event {
871 public:
873 enum Type {
877 EVENT_THROTTLE
878 };
879
881 enum Severity {
882 EVENT_SEVERITY_EMERG = 0,
883 EVENT_SEVERITY_ALERT = 1,
884 EVENT_SEVERITY_CRITICAL = 2,
885 EVENT_SEVERITY_ERROR = 3,
886 EVENT_SEVERITY_WARNING = 4,
887 EVENT_SEVERITY_NOTICE = 5,
888 EVENT_SEVERITY_INFO = 6,
889 EVENT_SEVERITY_DEBUG = 7
890 };
891
892 virtual ~Event() {
893 }
894
895 /*
896 * Event Accessor methods
897 */
898
903 virtual Type type() const = 0;
904
909 virtual ErrorCode err() const = 0;
910
915 virtual Severity severity() const = 0;
916
921 virtual std::string fac() const = 0;
922
931 virtual std::string str() const = 0;
932
937 virtual int throttle_time() const = 0;
938
943 virtual std::string broker_name() const = 0;
944
949 virtual int broker_id() const = 0;
950
951
957 virtual bool fatal() const = 0;
958};
959
960
961
965class RD_EXPORT ConsumeCb {
966 public:
974 virtual void consume_cb(Message &message, void *opaque) = 0;
975
976 virtual ~ConsumeCb() {
977 }
978};
979
980
984class RD_EXPORT RebalanceCb {
985 public:
1054 virtual void rebalance_cb(RdKafka::KafkaConsumer *consumer,
1056 std::vector<TopicPartition *> &partitions) = 0;
1057
1058 virtual ~RebalanceCb() {
1059 }
1060};
1061
1062
1066class RD_EXPORT OffsetCommitCb {
1067 public:
1084 std::vector<TopicPartition *> &offsets) = 0;
1085
1086 virtual ~OffsetCommitCb() {
1087 }
1088};
1089
1090
1091
1097class RD_EXPORT SslCertificateVerifyCb {
1098 public:
1135 virtual bool ssl_cert_verify_cb(const std::string &broker_name,
1136 int32_t broker_id,
1137 int *x509_error,
1138 int depth,
1139 const char *buf,
1140 size_t size,
1141 std::string &errstr) = 0;
1142
1143 virtual ~SslCertificateVerifyCb() {
1144 }
1145};
1146
1147
1152class RD_EXPORT SocketCb {
1153 public:
1167 virtual int socket_cb(int domain, int type, int protocol) = 0;
1168
1169 virtual ~SocketCb() {
1170 }
1171};
1172
1173
1178class RD_EXPORT OpenCb {
1179 public:
1191 virtual int open_cb(const std::string &path, int flags, int mode) = 0;
1192
1193 virtual ~OpenCb() {
1194 }
1195};
1196
1197
1217class RD_EXPORT Conf {
1218 public:
1224 CONF_TOPIC
1226
1231 CONF_UNKNOWN = -2,
1232 CONF_INVALID = -1,
1233 CONF_OK = 0
1235
1236
1240 static Conf *create(ConfType type);
1241
1242 virtual ~Conf() {
1243 }
1244
1258 virtual Conf::ConfResult set(const std::string &name,
1259 const std::string &value,
1260 std::string &errstr) = 0;
1261
1263 virtual Conf::ConfResult set(const std::string &name,
1264 DeliveryReportCb *dr_cb,
1265 std::string &errstr) = 0;
1266
1269 const std::string &name,
1270 OAuthBearerTokenRefreshCb *oauthbearer_token_refresh_cb,
1271 std::string &errstr) = 0;
1272
1274 virtual Conf::ConfResult set(const std::string &name,
1275 EventCb *event_cb,
1276 std::string &errstr) = 0;
1277
1285 virtual Conf::ConfResult set(const std::string &name,
1286 const Conf *topic_conf,
1287 std::string &errstr) = 0;
1288
1290 virtual Conf::ConfResult set(const std::string &name,
1291 PartitionerCb *partitioner_cb,
1292 std::string &errstr) = 0;
1293
1295 virtual Conf::ConfResult set(const std::string &name,
1296 PartitionerKeyPointerCb *partitioner_kp_cb,
1297 std::string &errstr) = 0;
1298
1300 virtual Conf::ConfResult set(const std::string &name,
1301 SocketCb *socket_cb,
1302 std::string &errstr) = 0;
1303
1305 virtual Conf::ConfResult set(const std::string &name,
1306 OpenCb *open_cb,
1307 std::string &errstr) = 0;
1308
1310 virtual Conf::ConfResult set(const std::string &name,
1311 RebalanceCb *rebalance_cb,
1312 std::string &errstr) = 0;
1313
1315 virtual Conf::ConfResult set(const std::string &name,
1316 OffsetCommitCb *offset_commit_cb,
1317 std::string &errstr) = 0;
1318
1323 virtual Conf::ConfResult set(const std::string &name,
1324 SslCertificateVerifyCb *ssl_cert_verify_cb,
1325 std::string &errstr) = 0;
1326
1369 const void *buffer,
1370 size_t size,
1371 std::string &errstr) = 0;
1372
1385 virtual Conf::ConfResult get(const std::string &name,
1386 std::string &value) const = 0;
1387
1391 virtual Conf::ConfResult get(DeliveryReportCb *&dr_cb) const = 0;
1392
1397 OAuthBearerTokenRefreshCb *&oauthbearer_token_refresh_cb) const = 0;
1398
1402 virtual Conf::ConfResult get(EventCb *&event_cb) const = 0;
1403
1407 virtual Conf::ConfResult get(PartitionerCb *&partitioner_cb) const = 0;
1408
1413 PartitionerKeyPointerCb *&partitioner_kp_cb) const = 0;
1414
1418 virtual Conf::ConfResult get(SocketCb *&socket_cb) const = 0;
1419
1423 virtual Conf::ConfResult get(OpenCb *&open_cb) const = 0;
1424
1428 virtual Conf::ConfResult get(RebalanceCb *&rebalance_cb) const = 0;
1429
1433 virtual Conf::ConfResult get(OffsetCommitCb *&offset_commit_cb) const = 0;
1434
1437 SslCertificateVerifyCb *&ssl_cert_verify_cb) const = 0;
1438
1441 virtual std::list<std::string> *dump() = 0;
1442
1444 virtual Conf::ConfResult set(const std::string &name,
1445 ConsumeCb *consume_cb,
1446 std::string &errstr) = 0;
1447
1464 virtual struct rd_kafka_conf_s *c_ptr_global() = 0;
1465
1483 virtual struct rd_kafka_topic_conf_s *c_ptr_topic() = 0;
1484
1498 std::string &errstr) = 0;
1499
1500
1524 std::string &errstr) = 0;
1525};
1526
1539class RD_EXPORT Handle {
1540 public:
1541 virtual ~Handle() {
1542 }
1543
1545 virtual std::string name() const = 0;
1546
1555 virtual std::string memberid() const = 0;
1556
1557
1582 virtual int poll(int timeout_ms) = 0;
1583
1590 virtual int outq_len() = 0;
1591
1607 virtual ErrorCode metadata(bool all_topics,
1608 const Topic *only_rkt,
1609 Metadata **metadatap,
1610 int timeout_ms) = 0;
1611
1612
1622 virtual ErrorCode pause(std::vector<TopicPartition *> &partitions) = 0;
1623
1624
1634 virtual ErrorCode resume(std::vector<TopicPartition *> &partitions) = 0;
1635
1636
1645 virtual ErrorCode query_watermark_offsets(const std::string &topic,
1646 int32_t partition,
1647 int64_t *low,
1648 int64_t *high,
1649 int timeout_ms) = 0;
1650
1668 virtual ErrorCode get_watermark_offsets(const std::string &topic,
1669 int32_t partition,
1670 int64_t *low,
1671 int64_t *high) = 0;
1672
1673
1695 virtual ErrorCode offsetsForTimes(std::vector<TopicPartition *> &offsets,
1696 int timeout_ms) = 0;
1697
1698
1707 virtual Queue *get_partition_queue(const TopicPartition *partition) = 0;
1708
1725 virtual ErrorCode set_log_queue(Queue *queue) = 0;
1726
1738 virtual void yield() = 0;
1739
1754 virtual std::string clusterid(int timeout_ms) = 0;
1755
1772 virtual struct rd_kafka_s *c_ptr() = 0;
1773
1789 virtual int32_t controllerid(int timeout_ms) = 0;
1790
1791
1813 virtual ErrorCode fatal_error(std::string &errstr) const = 0;
1814
1855 const std::string &token_value,
1856 int64_t md_lifetime_ms,
1857 const std::string &md_principal_name,
1858 const std::list<std::string> &extensions,
1859 std::string &errstr) = 0;
1860
1879 const std::string &errstr) = 0;
1880
1889
1890
1896 virtual Queue *get_sasl_queue() = 0;
1897
1902
1903
1904
1915 virtual void *mem_malloc(size_t size) = 0;
1916
1930 virtual void mem_free(void *ptr) = 0;
1931
1946 virtual Error *sasl_set_credentials(const std::string &username,
1947 const std::string &password) = 0;
1948};
1949
1950
1969class RD_EXPORT TopicPartition {
1970 public:
1976 static TopicPartition *create(const std::string &topic, int partition);
1977
1984 static TopicPartition *create(const std::string &topic,
1985 int partition,
1986 int64_t offset);
1987
1988 virtual ~TopicPartition() = 0;
1989
1994 static void destroy(std::vector<TopicPartition *> &partitions);
1995
1997 virtual const std::string &topic() const = 0;
1998
2000 virtual int partition() const = 0;
2001
2003 virtual int64_t offset() const = 0;
2004
2006 virtual void set_offset(int64_t offset) = 0;
2007
2009 virtual ErrorCode err() const = 0;
2010
2012 virtual int32_t get_leader_epoch() = 0;
2013
2015 virtual void set_leader_epoch(int32_t leader_epoch) = 0;
2016
2018 virtual std::vector<unsigned char> get_metadata() = 0;
2019
2021 virtual void set_metadata(std::vector<unsigned char> &metadata) = 0;
2022};
2023
2024
2025
2030class RD_EXPORT Topic {
2031 public:
2038 static const int32_t PARTITION_UA;
2039
2041 static const int64_t OFFSET_BEGINNING;
2042 static const int64_t OFFSET_END;
2043 static const int64_t OFFSET_STORED;
2044 static const int64_t OFFSET_INVALID;
2056 static Topic *create(Handle *base,
2057 const std::string &topic_str,
2058 const Conf *conf,
2059 std::string &errstr);
2060
2061 virtual ~Topic() = 0;
2062
2063
2065 virtual std::string name() const = 0;
2066
2072 virtual bool partition_available(int32_t partition) const = 0;
2073
2090 virtual ErrorCode offset_store(int32_t partition, int64_t offset) = 0;
2091
2108 virtual struct rd_kafka_topic_s *c_ptr() = 0;
2109};
2110
2111
2133class RD_EXPORT MessageTimestamp {
2134 public:
2141
2143 int64_t timestamp;
2144};
2145
2146
2156class RD_EXPORT Headers {
2157 public:
2158 virtual ~Headers() = 0;
2159
2168 class Header {
2169 public:
2180 Header(const std::string &key, const void *value, size_t value_size) :
2181 key_(key), err_(ERR_NO_ERROR), value_size_(value_size) {
2182 value_ = copy_value(value, value_size);
2183 }
2184
2198 Header(const std::string &key,
2199 const void *value,
2200 size_t value_size,
2201 const RdKafka::ErrorCode err) :
2202 key_(key), err_(err), value_(NULL), value_size_(value_size) {
2203 if (err == ERR_NO_ERROR)
2204 value_ = copy_value(value, value_size);
2205 }
2206
2212 Header(const Header &other) :
2213 key_(other.key_), err_(other.err_), value_size_(other.value_size_) {
2214 value_ = copy_value(other.value_, value_size_);
2215 }
2216
2222 Header &operator=(const Header &other) {
2223 if (&other == this) {
2224 return *this;
2225 }
2226
2227 key_ = other.key_;
2228 err_ = other.err_;
2229 value_size_ = other.value_size_;
2230
2231 if (value_ != NULL)
2232 mem_free(value_);
2233
2234 value_ = copy_value(other.value_, value_size_);
2235
2236 return *this;
2237 }
2238
2239 ~Header() {
2240 if (value_ != NULL)
2241 mem_free(value_);
2242 }
2243
2245 std::string key() const {
2246 return key_;
2247 }
2248
2250 const void *value() const {
2251 return value_;
2252 }
2253
2256 const char *value_string() const {
2257 return static_cast<const char *>(value_);
2258 }
2259
2261 size_t value_size() const {
2262 return value_size_;
2263 }
2264
2267 return err_;
2268 }
2269
2270 private:
2271 char *copy_value(const void *value, size_t value_size) {
2272 if (!value)
2273 return NULL;
2274
2275 char *dest = (char *)mem_malloc(value_size + 1);
2276 memcpy(dest, (const char *)value, value_size);
2277 dest[value_size] = '\0';
2278
2279 return dest;
2280 }
2281
2282 std::string key_;
2283 RdKafka::ErrorCode err_;
2284 char *value_;
2285 size_t value_size_;
2286 void *operator new(size_t); /* Prevent dynamic allocation */
2287 };
2288
2294 static Headers *create();
2295
2304 static Headers *create(const std::vector<Header> &headers);
2305
2315 virtual ErrorCode add(const std::string &key,
2316 const void *value,
2317 size_t value_size) = 0;
2318
2329 virtual ErrorCode add(const std::string &key, const std::string &value) = 0;
2330
2340 virtual ErrorCode add(const Header &header) = 0;
2341
2349 virtual ErrorCode remove(const std::string &key) = 0;
2350
2360 virtual std::vector<Header> get(const std::string &key) const = 0;
2361
2372 virtual Header get_last(const std::string &key) const = 0;
2373
2379 virtual std::vector<Header> get_all() const = 0;
2380
2384 virtual size_t size() const = 0;
2385};
2386
2387
2399class RD_EXPORT Message {
2400 public:
2403 enum Status {
2407 MSG_STATUS_NOT_PERSISTED = 0,
2408
2412 MSG_STATUS_POSSIBLY_PERSISTED = 1,
2413
2417 MSG_STATUS_PERSISTED = 2,
2418 };
2419
2427 virtual std::string errstr() const = 0;
2428
2430 virtual ErrorCode err() const = 0;
2431
2436 virtual Topic *topic() const = 0;
2437
2439 virtual std::string topic_name() const = 0;
2440
2442 virtual int32_t partition() const = 0;
2443
2445 virtual void *payload() const = 0;
2446
2448 virtual size_t len() const = 0;
2449
2451 virtual const std::string *key() const = 0;
2452
2454 virtual const void *key_pointer() const = 0;
2455
2457 virtual size_t key_len() const = 0;
2458
2460 virtual int64_t offset() const = 0;
2461
2463 virtual MessageTimestamp timestamp() const = 0;
2464
2466 virtual void *msg_opaque() const = 0;
2467
2468 virtual ~Message() = 0;
2469
2472 virtual int64_t latency() const = 0;
2473
2490 virtual struct rd_kafka_message_s *c_ptr() = 0;
2491
2495 virtual Status status() const = 0;
2496
2502
2510
2513 virtual int32_t broker_id() const = 0;
2514
2517 virtual int32_t leader_epoch() const = 0;
2518
2538 virtual Error *offset_store() = 0;
2539};
2540
2564class RD_EXPORT Queue {
2565 public:
2569 static Queue *create(Handle *handle);
2570
2581 virtual ErrorCode forward(Queue *dst) = 0;
2582
2583
2595 virtual Message *consume(int timeout_ms) = 0;
2596
2604 virtual int poll(int timeout_ms) = 0;
2605
2606 virtual ~Queue() = 0;
2607
2623 virtual void io_event_enable(int fd, const void *payload, size_t size) = 0;
2624};
2625
2639class RD_EXPORT ConsumerGroupMetadata {
2640 public:
2641 virtual ~ConsumerGroupMetadata() = 0;
2642};
2643
2661class RD_EXPORT KafkaConsumer : public virtual Handle {
2662 public:
2674 static KafkaConsumer *create(const Conf *conf, std::string &errstr);
2675
2676 virtual ~KafkaConsumer() = 0;
2677
2678
2682 std::vector<RdKafka::TopicPartition *> &partitions) = 0;
2683
2686 virtual ErrorCode subscription(std::vector<std::string> &topics) = 0;
2687
2722 virtual ErrorCode subscribe(const std::vector<std::string> &topics) = 0;
2723
2725 virtual ErrorCode unsubscribe() = 0;
2726
2733 virtual ErrorCode assign(const std::vector<TopicPartition *> &partitions) = 0;
2734
2738 virtual ErrorCode unassign() = 0;
2739
2764 virtual Message *consume(int timeout_ms) = 0;
2765
2779 virtual ErrorCode commitSync() = 0;
2780
2786 virtual ErrorCode commitAsync() = 0;
2787
2797 virtual ErrorCode commitSync(Message *message) = 0;
2798
2808 virtual ErrorCode commitAsync(Message *message) = 0;
2809
2819 virtual ErrorCode commitSync(std::vector<TopicPartition *> &offsets) = 0;
2820
2831 const std::vector<TopicPartition *> &offsets) = 0;
2832
2843 virtual ErrorCode commitSync(OffsetCommitCb *offset_commit_cb) = 0;
2844
2855 virtual ErrorCode commitSync(std::vector<TopicPartition *> &offsets,
2856 OffsetCommitCb *offset_commit_cb) = 0;
2857
2858
2859
2868 virtual ErrorCode committed(std::vector<TopicPartition *> &partitions,
2869 int timeout_ms) = 0;
2870
2879 virtual ErrorCode position(std::vector<TopicPartition *> &partitions) = 0;
2880
2881
2904 virtual ErrorCode close() = 0;
2905
2906
2924 virtual ErrorCode seek(const TopicPartition &partition, int timeout_ms) = 0;
2925
2926
2947 virtual ErrorCode offsets_store(std::vector<TopicPartition *> &offsets) = 0;
2948
2949
2961
2962
2977 virtual bool assignment_lost() = 0;
2978
2994 virtual std::string rebalance_protocol() = 0;
2995
2996
3013 const std::vector<TopicPartition *> &partitions) = 0;
3014
3015
3032 const std::vector<TopicPartition *> &partitions) = 0;
3033
3051 virtual Error *close(Queue *queue) = 0;
3052
3053
3058 virtual bool closed() = 0;
3059};
3060
3061
3076class RD_EXPORT Consumer : public virtual Handle {
3077 public:
3088 static Consumer *create(const Conf *conf, std::string &errstr);
3089
3090 virtual ~Consumer() = 0;
3091
3092
3112 virtual ErrorCode start(Topic *topic, int32_t partition, int64_t offset) = 0;
3113
3120 virtual ErrorCode start(Topic *topic,
3121 int32_t partition,
3122 int64_t offset,
3123 Queue *queue) = 0;
3124
3134 virtual ErrorCode stop(Topic *topic, int32_t partition) = 0;
3135
3150 virtual ErrorCode seek(Topic *topic,
3151 int32_t partition,
3152 int64_t offset,
3153 int timeout_ms) = 0;
3154
3172 virtual Message *consume(Topic *topic, int32_t partition, int timeout_ms) = 0;
3173
3195 virtual Message *consume(Queue *queue, int timeout_ms) = 0;
3196
3216 virtual int consume_callback(Topic *topic,
3217 int32_t partition,
3218 int timeout_ms,
3219 ConsumeCb *consume_cb,
3220 void *opaque) = 0;
3221
3228 virtual int consume_callback(Queue *queue,
3229 int timeout_ms,
3230 RdKafka::ConsumeCb *consume_cb,
3231 void *opaque) = 0;
3232
3242 static int64_t OffsetTail(int64_t offset);
3243};
3244
3258class RD_EXPORT Producer : public virtual Handle {
3259 public:
3270 static Producer *create(const Conf *conf, std::string &errstr);
3271
3272
3273 virtual ~Producer() = 0;
3274
3280 enum {
3281 RK_MSG_FREE = 0x1,
3284 RK_MSG_COPY = 0x2,
3289 RK_MSG_BLOCK = 0x4
3306 /* For backwards compatibility: */
3307#ifndef MSG_COPY /* defined in sys/msg.h */
3308 ,
3311 MSG_FREE = RK_MSG_FREE,
3312 MSG_COPY = RK_MSG_COPY
3313#endif
3316
3373 virtual ErrorCode produce(Topic *topic,
3374 int32_t partition,
3375 int msgflags,
3376 void *payload,
3377 size_t len,
3378 const std::string *key,
3379 void *msg_opaque) = 0;
3380
3385 virtual ErrorCode produce(Topic *topic,
3386 int32_t partition,
3387 int msgflags,
3388 void *payload,
3389 size_t len,
3390 const void *key,
3391 size_t key_len,
3392 void *msg_opaque) = 0;
3393
3400 virtual ErrorCode produce(const std::string topic_name,
3401 int32_t partition,
3402 int msgflags,
3403 void *payload,
3404 size_t len,
3405 const void *key,
3406 size_t key_len,
3407 int64_t timestamp,
3408 void *msg_opaque) = 0;
3409
3417 virtual ErrorCode produce(const std::string topic_name,
3418 int32_t partition,
3419 int msgflags,
3420 void *payload,
3421 size_t len,
3422 const void *key,
3423 size_t key_len,
3424 int64_t timestamp,
3425 RdKafka::Headers *headers,
3426 void *msg_opaque) = 0;
3427
3428
3433 virtual ErrorCode produce(Topic *topic,
3434 int32_t partition,
3435 const std::vector<char> *payload,
3436 const std::vector<char> *key,
3437 void *msg_opaque) = 0;
3438
3439
3455 virtual ErrorCode flush(int timeout_ms) = 0;
3456
3457
3485 virtual ErrorCode purge(int purge_flags) = 0;
3486
3490 enum {
3491 PURGE_QUEUE = 0x1,
3493 PURGE_INFLIGHT = 0x2,
3500 PURGE_NON_BLOCKING = 0x4 /* Don't wait for background queue
3501 * purging to finish. */
3503
3530 virtual Error *init_transactions(int timeout_ms) = 0;
3531
3532
3545 virtual Error *begin_transaction() = 0;
3546
3594 const std::vector<TopicPartition *> &offsets,
3595 const ConsumerGroupMetadata *group_metadata,
3596 int timeout_ms) = 0;
3597
3626 virtual Error *commit_transaction(int timeout_ms) = 0;
3627
3658 virtual Error *abort_transaction(int timeout_ms) = 0;
3659
3661};
3662
3677 public:
3679 virtual int32_t id() const = 0;
3680
3682 virtual std::string host() const = 0;
3683
3685 virtual int port() const = 0;
3686
3687 virtual ~BrokerMetadata() = 0;
3688};
3689
3690
3691
3696 public:
3698 typedef std::vector<int32_t> ReplicasVector;
3700 typedef std::vector<int32_t> ISRSVector;
3701
3703 typedef ReplicasVector::const_iterator ReplicasIterator;
3705 typedef ISRSVector::const_iterator ISRSIterator;
3706
3707
3709 virtual int32_t id() const = 0;
3710
3712 virtual ErrorCode err() const = 0;
3713
3715 virtual int32_t leader() const = 0;
3716
3718 virtual const std::vector<int32_t> *replicas() const = 0;
3719
3723 virtual const std::vector<int32_t> *isrs() const = 0;
3724
3725 virtual ~PartitionMetadata() = 0;
3726};
3727
3728
3729
3734 public:
3736 typedef std::vector<const PartitionMetadata *> PartitionMetadataVector;
3738 typedef PartitionMetadataVector::const_iterator PartitionMetadataIterator;
3739
3741 virtual std::string topic() const = 0;
3742
3744 virtual const PartitionMetadataVector *partitions() const = 0;
3745
3747 virtual ErrorCode err() const = 0;
3748
3749 virtual ~TopicMetadata() = 0;
3750};
3751
3752
3757 public:
3759 typedef std::vector<const BrokerMetadata *> BrokerMetadataVector;
3761 typedef std::vector<const TopicMetadata *> TopicMetadataVector;
3762
3764 typedef BrokerMetadataVector::const_iterator BrokerMetadataIterator;
3766 typedef TopicMetadataVector::const_iterator TopicMetadataIterator;
3767
3768
3774 virtual const BrokerMetadataVector *brokers() const = 0;
3775
3781 virtual const TopicMetadataVector *topics() const = 0;
3782
3784 virtual int32_t orig_broker_id() const = 0;
3785
3787 virtual std::string orig_broker_name() const = 0;
3788
3789 virtual ~Metadata() = 0;
3790};
3791
3794} // namespace RdKafka
3795
3796
3797#endif /* _RDKAFKACPP_H_ */
Metadata: Broker information.
Definition rdkafkacpp.h:3676
virtual int port() const =0
virtual int32_t id() const =0
virtual std::string host() const =0
Configuration interface.
Definition rdkafkacpp.h:1217
virtual Conf::ConfResult get(OpenCb *&open_cb) const =0
Query single configuration value.
ConfType
Configuration object type.
Definition rdkafkacpp.h:1222
@ CONF_GLOBAL
Definition rdkafkacpp.h:1223
virtual Conf::ConfResult set(const std::string &name, EventCb *event_cb, std::string &errstr)=0
Use with name = "event_cb".
virtual Conf::ConfResult set(const std::string &name, OffsetCommitCb *offset_commit_cb, std::string &errstr)=0
Use with name = "offset_commit_cb".
ConfResult
RdKafka::Conf::Set() result code.
Definition rdkafkacpp.h:1230
virtual Conf::ConfResult set(const std::string &name, PartitionerKeyPointerCb *partitioner_kp_cb, std::string &errstr)=0
Use with name = "partitioner_key_pointer_cb".
virtual Conf::ConfResult get(EventCb *&event_cb) const =0
Query single configuration value.
virtual Conf::ConfResult set_engine_callback_data(void *value, std::string &errstr)=0
Set callback_data for ssl engine.
virtual struct rd_kafka_topic_conf_s * c_ptr_topic()=0
Returns the underlying librdkafka C rd_kafka_topic_conf_t handle.
virtual Conf::ConfResult set(const std::string &name, DeliveryReportCb *dr_cb, std::string &errstr)=0
Use with name = "dr_cb".
virtual Conf::ConfResult get(SocketCb *&socket_cb) const =0
Query single configuration value.
virtual Conf::ConfResult set(const std::string &name, const Conf *topic_conf, std::string &errstr)=0
Use with name = "default_topic_conf".
virtual Conf::ConfResult get(OAuthBearerTokenRefreshCb *&oauthbearer_token_refresh_cb) const =0
Query single configuration value.
virtual Conf::ConfResult set(const std::string &name, RebalanceCb *rebalance_cb, std::string &errstr)=0
Use with name = "rebalance_cb".
virtual struct rd_kafka_conf_s * c_ptr_global()=0
Returns the underlying librdkafka C rd_kafka_conf_t handle.
virtual Conf::ConfResult set(const std::string &name, ConsumeCb *consume_cb, std::string &errstr)=0
Use with name = "consume_cb".
virtual Conf::ConfResult get(RebalanceCb *&rebalance_cb) const =0
Query single configuration value.
virtual std::list< std::string > * dump()=0
Dump configuration names and values to list containing name,value tuples.
virtual Conf::ConfResult get(PartitionerCb *&partitioner_cb) const =0
Query single configuration value.
virtual Conf::ConfResult get(OffsetCommitCb *&offset_commit_cb) const =0
Query single configuration value.
virtual Conf::ConfResult get(PartitionerKeyPointerCb *&partitioner_kp_cb) const =0
Query single configuration value.
virtual Conf::ConfResult set_ssl_cert(RdKafka::CertificateType cert_type, RdKafka::CertificateEncoding cert_enc, const void *buffer, size_t size, std::string &errstr)=0
Set certificate/key cert_type from the cert_enc encoded memory at buffer of size bytes.
virtual Conf::ConfResult set(const std::string &name, SocketCb *socket_cb, std::string &errstr)=0
Use with name = "socket_cb".
virtual Conf::ConfResult set(const std::string &name, OpenCb *open_cb, std::string &errstr)=0
Use with name = "open_cb".
virtual Conf::ConfResult enable_sasl_queue(bool enable, std::string &errstr)=0
Enable/disable creation of a queue specific to SASL events and callbacks.
virtual Conf::ConfResult set(const std::string &name, SslCertificateVerifyCb *ssl_cert_verify_cb, std::string &errstr)=0
Use with name = "ssl_cert_verify_cb".
virtual Conf::ConfResult set(const std::string &name, OAuthBearerTokenRefreshCb *oauthbearer_token_refresh_cb, std::string &errstr)=0
Use with name = "oauthbearer_token_refresh_cb".
virtual Conf::ConfResult set(const std::string &name, const std::string &value, std::string &errstr)=0
Set configuration property name to value value.
virtual Conf::ConfResult get(SslCertificateVerifyCb *&ssl_cert_verify_cb) const =0
Use with name = "ssl_cert_verify_cb".
virtual Conf::ConfResult set(const std::string &name, PartitionerCb *partitioner_cb, std::string &errstr)=0
Use with name = "partitioner_cb".
virtual Conf::ConfResult get(DeliveryReportCb *&dr_cb) const =0
Query single configuration value.
static Conf * create(ConfType type)
Create configuration object.
virtual Conf::ConfResult get(const std::string &name, std::string &value) const =0
Query single configuration value.
Consume callback class.
Definition rdkafkacpp.h:965
virtual void consume_cb(Message &message, void *opaque)=0
The consume callback is used with RdKafka::Consumer::consume_callback() methods and will be called fo...
ConsumerGroupMetadata holds a consumer instance's group metadata state.
Definition rdkafkacpp.h:2639
Simple Consumer (legacy)
Definition rdkafkacpp.h:3076
virtual ErrorCode start(Topic *topic, int32_t partition, int64_t offset, Queue *queue)=0
Start consuming messages for topic and partition on queue queue.
virtual ErrorCode stop(Topic *topic, int32_t partition)=0
Stop consuming messages for topic and partition, purging all messages currently in the local queue.
virtual int consume_callback(Topic *topic, int32_t partition, int timeout_ms, ConsumeCb *consume_cb, void *opaque)=0
Consumes messages from topic and partition, calling the provided callback for each consumed messsage.
static Consumer * create(const Conf *conf, std::string &errstr)
Creates a new Kafka consumer handle.
static int64_t OffsetTail(int64_t offset)
Converts an offset into the logical offset from the tail of a topic.
virtual int consume_callback(Queue *queue, int timeout_ms, RdKafka::ConsumeCb *consume_cb, void *opaque)=0
Consumes messages from queue, calling the provided callback for each consumed messsage.
virtual Message * consume(Queue *queue, int timeout_ms)=0
Consume a single message from the specified queue.
virtual Message * consume(Topic *topic, int32_t partition, int timeout_ms)=0
Consume a single message from topic and partition.
virtual ErrorCode seek(Topic *topic, int32_t partition, int64_t offset, int timeout_ms)=0
Seek consumer for topic+partition to offset which is either an absolute or logical offset.
virtual ErrorCode start(Topic *topic, int32_t partition, int64_t offset)=0
Start consuming messages for topic and partition at offset offset which may either be a proper offset...
Delivery Report callback class.
Definition rdkafkacpp.h:727
virtual void dr_cb(Message &message)=0
Delivery report callback.
The Error class is used as a return value from APIs to propagate an error. The error consists of an e...
Definition rdkafkacpp.h:642
virtual ErrorCode code() const =0
virtual std::string name() const =0
virtual std::string str() const =0
virtual bool is_fatal() const =0
virtual bool is_retriable() const =0
static Error * create(ErrorCode code, const std::string *errstr)
Create error object.
virtual bool txn_requires_abort() const =0
Event callback class.
Definition rdkafkacpp.h:853
virtual void event_cb(Event &event)=0
Event callback.
Event object class as passed to the EventCb callback.
Definition rdkafkacpp.h:870
virtual int throttle_time() const =0
virtual std::string broker_name() const =0
virtual bool fatal() const =0
virtual std::string fac() const =0
virtual ErrorCode err() const =0
virtual Type type() const =0
virtual std::string str() const =0
Severity
EVENT_LOG severities (conforms to syslog(3) severities)
Definition rdkafkacpp.h:881
Type
Event type.
Definition rdkafkacpp.h:873
@ EVENT_ERROR
Definition rdkafkacpp.h:874
@ EVENT_STATS
Definition rdkafkacpp.h:875
@ EVENT_LOG
Definition rdkafkacpp.h:876
virtual Severity severity() const =0
virtual int broker_id() const =0
Base handle, super class for specific clients.
Definition rdkafkacpp.h:1539
virtual Error * sasl_background_callbacks_enable()=0
Enable SASL OAUTHBEARER refresh callbacks on the librdkafka background thread.
virtual void * mem_malloc(size_t size)=0
Allocate memory using the same allocator librdkafka uses.
virtual Queue * get_sasl_queue()=0
virtual ErrorCode offsetsForTimes(std::vector< TopicPartition * > &offsets, int timeout_ms)=0
Look up the offsets for the given partitions by timestamp.
virtual ErrorCode oauthbearer_set_token(const std::string &token_value, int64_t md_lifetime_ms, const std::string &md_principal_name, const std::list< std::string > &extensions, std::string &errstr)=0
Set SASL/OAUTHBEARER token and metadata.
virtual ErrorCode resume(std::vector< TopicPartition * > &partitions)=0
Resume producing or consumption for the provided list of partitions.
virtual std::string memberid() const =0
Returns the client's broker-assigned group member id.
virtual void mem_free(void *ptr)=0
Free pointer returned by librdkafka.
virtual int outq_len()=0
Returns the current out queue length.
virtual void yield()=0
Cancels the current callback dispatcher (Handle::poll(), KafkaConsumer::consume(),...
virtual ErrorCode pause(std::vector< TopicPartition * > &partitions)=0
Pause producing or consumption for the provided list of partitions.
virtual Error * sasl_set_credentials(const std::string &username, const std::string &password)=0
Sets SASL credentials used for SASL PLAIN and SCRAM mechanisms by this Kafka client.
virtual int32_t controllerid(int timeout_ms)=0
Returns the current ControllerId (controller broker id) as reported in broker metadata.
virtual struct rd_kafka_s * c_ptr()=0
Returns the underlying librdkafka C rd_kafka_t handle.
virtual int poll(int timeout_ms)=0
Polls the provided kafka handle for events.
virtual ErrorCode oauthbearer_set_token_failure(const std::string &errstr)=0
SASL/OAUTHBEARER token refresh failure indicator.
virtual ErrorCode query_watermark_offsets(const std::string &topic, int32_t partition, int64_t *low, int64_t *high, int timeout_ms)=0
Query broker for low (oldest/beginning) and high (newest/end) offsets for partition.
virtual ErrorCode get_watermark_offsets(const std::string &topic, int32_t partition, int64_t *low, int64_t *high)=0
Get last known low (oldest/beginning) and high (newest/end) offsets for partition.
virtual ErrorCode fatal_error(std::string &errstr) const =0
Returns the first fatal error set on this client instance, or ERR_NO_ERROR if no fatal error has occu...
virtual ErrorCode metadata(bool all_topics, const Topic *only_rkt, Metadata **metadatap, int timeout_ms)=0
Request Metadata from broker.
virtual ErrorCode set_log_queue(Queue *queue)=0
Forward librdkafka logs (and debug) to the specified queue for serving with one of the ....
virtual std::string name() const =0
virtual Queue * get_background_queue()=0
virtual Queue * get_partition_queue(const TopicPartition *partition)=0
Retrieve queue for a given partition.
virtual std::string clusterid(int timeout_ms)=0
Returns the ClusterId as reported in broker metadata.
Header object.
Definition rdkafkacpp.h:2168
size_t value_size() const
Definition rdkafkacpp.h:2261
Header(const std::string &key, const void *value, size_t value_size)
Header object to encapsulate a single Header.
Definition rdkafkacpp.h:2180
const char * value_string() const
Definition rdkafkacpp.h:2256
std::string key() const
Definition rdkafkacpp.h:2245
Header & operator=(const Header &other)
Assignment operator.
Definition rdkafkacpp.h:2222
Header(const std::string &key, const void *value, size_t value_size, const RdKafka::ErrorCode err)
Header object to encapsulate a single Header.
Definition rdkafkacpp.h:2198
Header(const Header &other)
Copy constructor.
Definition rdkafkacpp.h:2212
RdKafka::ErrorCode err() const
Definition rdkafkacpp.h:2266
const void * value() const
Definition rdkafkacpp.h:2250
Headers object.
Definition rdkafkacpp.h:2156
virtual std::vector< Header > get(const std::string &key) const =0
Gets all of the Headers of a given key.
virtual ErrorCode remove(const std::string &key)=0
Removes all the Headers of a given key.
virtual ErrorCode add(const std::string &key, const void *value, size_t value_size)=0
Adds a Header to the end of the list.
virtual ErrorCode add(const Header &header)=0
Adds a Header to the end of the list.
virtual std::vector< Header > get_all() const =0
Returns all Headers.
static Headers * create(const std::vector< Header > &headers)
Create a new instance of the Headers object from a std::vector.
virtual ErrorCode add(const std::string &key, const std::string &value)=0
Adds a Header to the end of the list.
virtual Header get_last(const std::string &key) const =0
Gets the last occurrence of a Header of a given key.
static Headers * create()
Create a new instance of the Headers object.
virtual size_t size() const =0
High-level KafkaConsumer (for brokers 0.9 and later)
Definition rdkafkacpp.h:2661
virtual Error * incremental_unassign(const std::vector< TopicPartition * > &partitions)=0
Incrementally remove partitions from the current assignment.
virtual Error * incremental_assign(const std::vector< TopicPartition * > &partitions)=0
Incrementally add partitions to the current assignment.
virtual ErrorCode commitSync()=0
Commit offsets for the current assignment.
virtual ErrorCode unassign()=0
Stop consumption and remove the current assignment.
virtual ErrorCode commitAsync(Message *message)=0
Commit offset for a single topic+partition based on message.
virtual ErrorCode subscription(std::vector< std::string > &topics)=0
Returns the current subscription as set by RdKafka::KafkaConsumer::subscribe()
virtual ErrorCode position(std::vector< TopicPartition * > &partitions)=0
Retrieve current positions (offsets) for topics+partitions.
virtual std::string rebalance_protocol()=0
The rebalance protocol currently in use. This will be "NONE" if the consumer has not (yet) joined a g...
virtual Message * consume(int timeout_ms)=0
Consume message or get error event, triggers callbacks.
virtual ErrorCode close()=0
Close and shut down the consumer.
virtual ErrorCode commitAsync(const std::vector< TopicPartition * > &offsets)=0
Commit offset for the provided list of partitions.
virtual ErrorCode commitSync(OffsetCommitCb *offset_commit_cb)=0
Commit offsets for the current assignment.
virtual bool assignment_lost()=0
Check whether the consumer considers the current assignment to have been lost involuntarily....
virtual ErrorCode commitAsync()=0
Asynchronous version of RdKafka::KafkaConsumer::CommitSync()
virtual ErrorCode subscribe(const std::vector< std::string > &topics)=0
Update the subscription set to topics.
virtual bool closed()=0
virtual ErrorCode commitSync(std::vector< TopicPartition * > &offsets)=0
Commit offsets for the provided list of partitions.
virtual Error * close(Queue *queue)=0
Close and shut down the consumer.
virtual ErrorCode unsubscribe()=0
Unsubscribe from the current subscription set.
virtual ConsumerGroupMetadata * groupMetadata()=0
static KafkaConsumer * create(const Conf *conf, std::string &errstr)
Creates a KafkaConsumer.
virtual ErrorCode commitSync(std::vector< TopicPartition * > &offsets, OffsetCommitCb *offset_commit_cb)=0
Commit offsets for the provided list of partitions.
virtual ErrorCode committed(std::vector< TopicPartition * > &partitions, int timeout_ms)=0
Retrieve committed offsets for topics+partitions.
virtual ErrorCode seek(const TopicPartition &partition, int timeout_ms)=0
Seek consumer for topic+partition to offset which is either an absolute or logical offset.
virtual ErrorCode offsets_store(std::vector< TopicPartition * > &offsets)=0
Store offset offset for topic partition partition. The offset will be committed (written) to the offs...
virtual ErrorCode commitSync(Message *message)=0
Commit offset for a single topic+partition based on message.
virtual ErrorCode assign(const std::vector< TopicPartition * > &partitions)=0
Update the assignment set to partitions.
virtual ErrorCode assignment(std::vector< RdKafka::TopicPartition * > &partitions)=0
Returns the current partition assignment as set by RdKafka::KafkaConsumer::assign()
Message timestamp object.
Definition rdkafkacpp.h:2133
int64_t timestamp
Definition rdkafkacpp.h:2143
MessageTimestampType
Definition rdkafkacpp.h:2136
@ MSG_TIMESTAMP_CREATE_TIME
Definition rdkafkacpp.h:2138
@ MSG_TIMESTAMP_NOT_AVAILABLE
Definition rdkafkacpp.h:2137
MessageTimestampType type
Definition rdkafkacpp.h:2142
Message object.
Definition rdkafkacpp.h:2399
virtual std::string topic_name() const =0
virtual Status status() const =0
Returns the message's persistence status in the topic log.
virtual Topic * topic() const =0
virtual void * payload() const =0
virtual std::string errstr() const =0
Accessor functions*.
virtual Error * offset_store()=0
Store offset +1 for the consumed message.
virtual int32_t broker_id() const =0
virtual int64_t offset() const =0
virtual RdKafka::Headers * headers()=0
virtual size_t len() const =0
virtual MessageTimestamp timestamp() const =0
virtual int64_t latency() const =0
virtual ErrorCode err() const =0
virtual int32_t leader_epoch() const =0
virtual void * msg_opaque() const =0
virtual RdKafka::Headers * headers(RdKafka::ErrorCode *err)=0
Status
Message persistence status can be used by the application to find out if a produced message was persi...
Definition rdkafkacpp.h:2403
virtual size_t key_len() const =0
virtual struct rd_kafka_message_s * c_ptr()=0
Returns the underlying librdkafka C rd_kafka_message_t handle.
virtual const void * key_pointer() const =0
virtual int32_t partition() const =0
virtual const std::string * key() const =0
Metadata container.
Definition rdkafkacpp.h:3756
virtual int32_t orig_broker_id() const =0
Broker (id) originating this metadata.
std::vector< const TopicMetadata * > TopicMetadataVector
Topics.
Definition rdkafkacpp.h:3761
std::vector< const BrokerMetadata * > BrokerMetadataVector
Brokers.
Definition rdkafkacpp.h:3759
virtual std::string orig_broker_name() const =0
Broker (name) originating this metadata.
virtual const TopicMetadataVector * topics() const =0
Topic list.
virtual const BrokerMetadataVector * brokers() const =0
Broker list.
BrokerMetadataVector::const_iterator BrokerMetadataIterator
Brokers iterator.
Definition rdkafkacpp.h:3764
TopicMetadataVector::const_iterator TopicMetadataIterator
Topics iterator.
Definition rdkafkacpp.h:3766
SASL/OAUTHBEARER token refresh callback class.
Definition rdkafkacpp.h:766
virtual void oauthbearer_token_refresh_cb(RdKafka::Handle *handle, const std::string &oauthbearer_config)=0
SASL/OAUTHBEARER token refresh callback class.
Offset Commit callback class.
Definition rdkafkacpp.h:1066
virtual void offset_commit_cb(RdKafka::ErrorCode err, std::vector< TopicPartition * > &offsets)=0
Set offset commit callback for use with consumer groups.
Portability: OpenCb callback class
Definition rdkafkacpp.h:1178
virtual int open_cb(const std::string &path, int flags, int mode)=0
Open callback The open callback is responsible for opening the file specified by pathname,...
Metadata: Partition information.
Definition rdkafkacpp.h:3695
std::vector< int32_t > ISRSVector
ISRs (In-Sync-Replicas)
Definition rdkafkacpp.h:3700
virtual const std::vector< int32_t > * replicas() const =0
ISRSVector::const_iterator ISRSIterator
ISRs iterator.
Definition rdkafkacpp.h:3705
virtual int32_t id() const =0
virtual const std::vector< int32_t > * isrs() const =0
ReplicasVector::const_iterator ReplicasIterator
Replicas iterator.
Definition rdkafkacpp.h:3703
virtual ErrorCode err() const =0
virtual int32_t leader() const =0
std::vector< int32_t > ReplicasVector
Replicas.
Definition rdkafkacpp.h:3698
Partitioner callback class.
Definition rdkafkacpp.h:791
virtual int32_t partitioner_cb(const Topic *topic, const std::string *key, int32_t partition_cnt, void *msg_opaque)=0
Partitioner callback.
Variant partitioner with key pointer.
Definition rdkafkacpp.h:823
virtual int32_t partitioner_cb(const Topic *topic, const void *key, size_t key_len, int32_t partition_cnt, void *msg_opaque)=0
Variant partitioner callback that gets key as pointer and length instead of as a const std::string *.
Producer.
Definition rdkafkacpp.h:3258
static Producer * create(const Conf *conf, std::string &errstr)
Creates a new Kafka producer handle.
virtual Error * init_transactions(int timeout_ms)=0
Initialize transactions for the producer instance.
virtual ErrorCode produce(const std::string topic_name, int32_t partition, int msgflags, void *payload, size_t len, const void *key, size_t key_len, int64_t timestamp, void *msg_opaque)=0
produce() variant that takes topic as a string (no need for creating a Topic object),...
virtual Error * send_offsets_to_transaction(const std::vector< TopicPartition * > &offsets, const ConsumerGroupMetadata *group_metadata, int timeout_ms)=0
Sends a list of topic partition offsets to the consumer group coordinator for group_metadata,...
virtual ErrorCode produce(Topic *topic, int32_t partition, int msgflags, void *payload, size_t len, const void *key, size_t key_len, void *msg_opaque)=0
Variant produce() that passes the key as a pointer and length instead of as a const std::string *.
virtual Error * begin_transaction()=0
init_transactions() must have been called successfully (once) before this function is called.
virtual ErrorCode purge(int purge_flags)=0
Purge messages currently handled by the producer instance.
virtual Error * commit_transaction(int timeout_ms)=0
Commit the current transaction as started with begin_transaction().
virtual ErrorCode produce(const std::string topic_name, int32_t partition, int msgflags, void *payload, size_t len, const void *key, size_t key_len, int64_t timestamp, RdKafka::Headers *headers, void *msg_opaque)=0
produce() variant that that allows for Header support on produce Otherwise identical to produce() abo...
virtual ErrorCode produce(Topic *topic, int32_t partition, int msgflags, void *payload, size_t len, const std::string *key, void *msg_opaque)=0
Produce and send a single message to broker.
virtual ErrorCode produce(Topic *topic, int32_t partition, const std::vector< char > *payload, const std::vector< char > *key, void *msg_opaque)=0
Variant produce() that accepts vectors for key and payload. The vector data will be copied.
virtual Error * abort_transaction(int timeout_ms)=0
Aborts the ongoing transaction.
virtual ErrorCode flush(int timeout_ms)=0
Wait until all outstanding produce requests, et.al, are completed. This should typically be done prio...
Queue interface.
Definition rdkafkacpp.h:2564
virtual Message * consume(int timeout_ms)=0
Consume message or get error event from the queue.
virtual ErrorCode forward(Queue *dst)=0
Forward/re-route queue to dst. If dst is NULL, the forwarding is removed.
virtual int poll(int timeout_ms)=0
Poll queue, serving any enqueued callbacks.
static Queue * create(Handle *handle)
Create Queue object.
virtual void io_event_enable(int fd, const void *payload, size_t size)=0
Enable IO event triggering for queue.
KafkaConsumer: Rebalance callback class
Definition rdkafkacpp.h:984
virtual void rebalance_cb(RdKafka::KafkaConsumer *consumer, RdKafka::ErrorCode err, std::vector< TopicPartition * > &partitions)=0
Group rebalance callback for use with RdKafka::KafkaConsumer.
Portability: SocketCb callback class
Definition rdkafkacpp.h:1152
virtual int socket_cb(int domain, int type, int protocol)=0
Socket callback.
SSL broker certificate verification class.
Definition rdkafkacpp.h:1097
virtual bool ssl_cert_verify_cb(const std::string &broker_name, int32_t broker_id, int *x509_error, int depth, const char *buf, size_t size, std::string &errstr)=0
SSL broker certificate verification callback.
Metadata: Topic information.
Definition rdkafkacpp.h:3733
virtual ErrorCode err() const =0
virtual const PartitionMetadataVector * partitions() const =0
std::vector< const PartitionMetadata * > PartitionMetadataVector
Partitions.
Definition rdkafkacpp.h:3736
PartitionMetadataVector::const_iterator PartitionMetadataIterator
Partitions iterator.
Definition rdkafkacpp.h:3738
virtual std::string topic() const =0
Topic+Partition.
Definition rdkafkacpp.h:1969
static TopicPartition * create(const std::string &topic, int partition, int64_t offset)
Create topic+partition object for topic and partition with offset offset.
static void destroy(std::vector< TopicPartition * > &partitions)
Destroy/delete the TopicPartitions in partitions and clear the vector.
virtual void set_leader_epoch(int32_t leader_epoch)=0
Set partition leader epoch.
virtual void set_offset(int64_t offset)=0
Set offset.
virtual void set_metadata(std::vector< unsigned char > &metadata)=0
Set partition metadata.
virtual int partition() const =0
virtual std::vector< unsigned char > get_metadata()=0
Get partition metadata.
virtual const std::string & topic() const =0
static TopicPartition * create(const std::string &topic, int partition)
Create topic+partition object for topic and partition.
virtual int64_t offset() const =0
virtual ErrorCode err() const =0
virtual int32_t get_leader_epoch()=0
Get partition leader epoch, or -1 if not known or relevant.
Topic handle.
Definition rdkafkacpp.h:2030
static const int64_t OFFSET_INVALID
Definition rdkafkacpp.h:2044
virtual std::string name() const =0
virtual ErrorCode offset_store(int32_t partition, int64_t offset)=0
Store offset offset + 1 for topic partition partition. The offset will be committed (written) to the ...
virtual struct rd_kafka_topic_s * c_ptr()=0
Returns the underlying librdkafka C rd_kafka_topic_t handle.
static const int64_t OFFSET_BEGINNING
Special offsets.
Definition rdkafkacpp.h:2041
virtual bool partition_available(int32_t partition) const =0
static const int64_t OFFSET_STORED
Definition rdkafkacpp.h:2043
static const int32_t PARTITION_UA
Unassigned partition.
Definition rdkafkacpp.h:2038
static const int64_t OFFSET_END
Definition rdkafkacpp.h:2042
static Topic * create(Handle *base, const std::string &topic_str, const Conf *conf, std::string &errstr)
Creates a new topic handle for topic named topic_str.
RD_EXPORT std::string get_debug_contexts()
Returns a CSV list of the supported debug contexts for use with Conf::Set("debug",...
RD_EXPORT std::string version_str()
Returns the librdkafka version as string.
CertificateEncoding
SSL certificate encoding.
Definition rdkafkacpp.h:600
@ CERT_ENC_PKCS12
Definition rdkafkacpp.h:601
@ CERT_ENC_DER
Definition rdkafkacpp.h:602
@ CERT_ENC_PEM
Definition rdkafkacpp.h:603
RD_EXPORT int wait_destroyed(int timeout_ms)
Wait for all rd_kafka_t objects to be destroyed.
RD_EXPORT std::string err2str(RdKafka::ErrorCode err)
Returns a human readable representation of a kafka error.
ErrorCode
Error codes.
Definition rdkafkacpp.h:201
@ ERR__APPLICATION
Definition rdkafkacpp.h:321
@ ERR_PRINCIPAL_DESERIALIZATION_FAILURE
Definition rdkafkacpp.h:552
@ ERR_NOT_LEADER_FOR_PARTITION
Definition rdkafkacpp.h:355
@ ERR_NON_EMPTY_GROUP
Definition rdkafkacpp.h:490
@ ERR__DESTROY
Definition rdkafkacpp.h:210
@ ERR_INVALID_REQUEST
Definition rdkafkacpp.h:433
@ ERR_GROUP_ID_NOT_FOUND
Definition rdkafkacpp.h:492
@ ERR_INVALID_SESSION_TIMEOUT
Definition rdkafkacpp.h:401
@ ERR__UNDERFLOW
Definition rdkafkacpp.h:297
@ ERR__ASSIGN_PARTITIONS
Definition rdkafkacpp.h:257
@ ERR__UNKNOWN_BROKER
Definition rdkafkacpp.h:315
@ ERR_CLUSTER_AUTHORIZATION_FAILED
Definition rdkafkacpp.h:411
@ ERR_UNKNOWN
Definition rdkafkacpp.h:341
@ ERR_ELIGIBLE_LEADERS_NOT_AVAILABLE
Definition rdkafkacpp.h:521
@ ERR__REVOKE_PARTITIONS
Definition rdkafkacpp.h:259
@ ERR__FATAL
Definition rdkafkacpp.h:307
@ ERR_SASL_AUTHENTICATION_FAILED
Definition rdkafkacpp.h:470
@ ERR_FEATURE_UPDATE_FAILED
Definition rdkafkacpp.h:550
@ ERR__CRIT_SYS_RESOURCE
Definition rdkafkacpp.h:216
@ ERR__LOG_TRUNCATION
Definition rdkafkacpp.h:329
@ ERR__ASSIGNMENT_LOST
Definition rdkafkacpp.h:323
@ ERR_STALE_BROKER_EPOCH
Definition rdkafkacpp.h:508
@ ERR_INVALID_TXN_STATE
Definition rdkafkacpp.h:445
@ ERR__UNKNOWN_PARTITION
Definition rdkafkacpp.h:227
@ ERR__WAIT_CACHE
Definition rdkafkacpp.h:279
@ ERR_UNSTABLE_OFFSET_COMMIT
Definition rdkafkacpp.h:532
@ ERR__INVALID_DIFFERENT_RECORD
Definition rdkafkacpp.h:332
@ ERR__NODE_UPDATE
Definition rdkafkacpp.h:243
@ ERR_NO_ERROR
Definition rdkafkacpp.h:343
@ ERR_INCONSISTENT_GROUP_PROTOCOL
Definition rdkafkacpp.h:395
@ ERR__UNKNOWN_GROUP
Definition rdkafkacpp.h:249
@ ERR_TELEMETRY_TOO_LARGE
Definition rdkafkacpp.h:570
@ ERR_INVALID_PARTITIONS
Definition rdkafkacpp.h:423
@ ERR_DELEGATION_TOKEN_NOT_FOUND
Definition rdkafkacpp.h:478
@ ERR__NOOP
Definition rdkafkacpp.h:325
@ ERR_NO_REASSIGNMENT_IN_PROGRESS
Definition rdkafkacpp.h:525
@ ERR__NOT_IMPLEMENTED
Definition rdkafkacpp.h:267
@ ERR__MSG_TIMED_OUT
Definition rdkafkacpp.h:220
@ ERR_INVALID_REPLICATION_FACTOR
Definition rdkafkacpp.h:425
@ ERR__READ_ONLY
Definition rdkafkacpp.h:293
@ ERR_UNKNOWN_MEMBER_ID
Definition rdkafkacpp.h:399
@ ERR__DESTROY_BROKER
Definition rdkafkacpp.h:334
@ ERR_OUT_OF_ORDER_SEQUENCE_NUMBER
Definition rdkafkacpp.h:439
@ ERR_INVALID_GROUP_ID
Definition rdkafkacpp.h:397
@ ERR__RETRY
Definition rdkafkacpp.h:301
@ ERR__TIMED_OUT_QUEUE
Definition rdkafkacpp.h:275
@ ERR__NOT_CONFIGURED
Definition rdkafkacpp.h:317
@ ERR_DUPLICATE_RESOURCE
Definition rdkafkacpp.h:541
@ ERR_LEADER_NOT_AVAILABLE
Definition rdkafkacpp.h:353
@ ERR_INVALID_FETCH_SESSION_EPOCH
Definition rdkafkacpp.h:496
@ ERR_KAFKA_STORAGE_ERROR
Definition rdkafkacpp.h:466
@ ERR_ILLEGAL_GENERATION
Definition rdkafkacpp.h:393
@ ERR_TOPIC_ALREADY_EXISTS
Definition rdkafkacpp.h:421
@ ERR__AUTHENTICATION
Definition rdkafkacpp.h:269
@ ERR_TOPIC_EXCEPTION
Definition rdkafkacpp.h:383
@ ERR_UNKNOWN_SUBSCRIPTION_ID
Definition rdkafkacpp.h:567
@ ERR__RESOLVE
Definition rdkafkacpp.h:218
@ ERR_INVALID_REQUIRED_ACKS
Definition rdkafkacpp.h:391
@ ERR_FETCH_SESSION_ID_NOT_FOUND
Definition rdkafkacpp.h:494
@ ERR__SSL
Definition rdkafkacpp.h:245
@ ERR_COORDINATOR_NOT_AVAILABLE
Definition rdkafkacpp.h:375
@ ERR__CONFLICT
Definition rdkafkacpp.h:261
@ ERR_GROUP_SUBSCRIBED_TO_TOPIC
Definition rdkafkacpp.h:528
@ ERR__STATE
Definition rdkafkacpp.h:263
@ ERR__KEY_SERIALIZATION
Definition rdkafkacpp.h:283
@ ERR_INVALID_MSG_SIZE
Definition rdkafkacpp.h:351
@ ERR_DELEGATION_TOKEN_OWNER_MISMATCH
Definition rdkafkacpp.h:480
@ ERR_REASSIGNMENT_IN_PROGRESS
Definition rdkafkacpp.h:474
@ ERR__OUTDATED
Definition rdkafkacpp.h:273
@ ERR_OFFSET_NOT_AVAILABLE
Definition rdkafkacpp.h:510
@ ERR_THROTTLING_QUOTA_EXCEEDED
Definition rdkafkacpp.h:534
@ ERR_REQUEST_TIMED_OUT
Definition rdkafkacpp.h:357
@ ERR_INVALID_TRANSACTION_TIMEOUT
Definition rdkafkacpp.h:451
@ ERR_CONCURRENT_TRANSACTIONS
Definition rdkafkacpp.h:454
@ ERR_STALE_CTRL_EPOCH
Definition rdkafkacpp.h:365
@ ERR_UNSUPPORTED_VERSION
Definition rdkafkacpp.h:419
@ ERR__WAIT_COORD
Definition rdkafkacpp.h:247
@ ERR_ELECTION_NOT_NEEDED
Definition rdkafkacpp.h:523
@ ERR_INVALID_COMMIT_OFFSET_SIZE
Definition rdkafkacpp.h:405
@ ERR_DELEGATION_TOKEN_EXPIRED
Definition rdkafkacpp.h:486
@ ERR_TOPIC_AUTHORIZATION_FAILED
Definition rdkafkacpp.h:407
@ ERR_MSG_SIZE_TOO_LARGE
Definition rdkafkacpp.h:363
@ ERR_UNRELEASED_INSTANCE_ID
Definition rdkafkacpp.h:559
@ ERR_NETWORK_EXCEPTION
Definition rdkafkacpp.h:369
@ ERR_INVALID_PRINCIPAL_TYPE
Definition rdkafkacpp.h:488
@ ERR_UNSUPPORTED_ASSIGNOR
Definition rdkafkacpp.h:562
@ ERR__MAX_POLL_EXCEEDED
Definition rdkafkacpp.h:313
@ ERR_FENCED_LEADER_EPOCH
Definition rdkafkacpp.h:502
@ ERR__BEGIN
Definition rdkafkacpp.h:204
@ ERR_DELEGATION_TOKEN_AUTHORIZATION_FAILED
Definition rdkafkacpp.h:484
@ ERR_UNACCEPTABLE_CREDENTIAL
Definition rdkafkacpp.h:543
@ ERR_RESOURCE_NOT_FOUND
Definition rdkafkacpp.h:539
@ ERR_GROUP_AUTHORIZATION_FAILED
Definition rdkafkacpp.h:409
@ ERR_LOG_DIR_NOT_FOUND
Definition rdkafkacpp.h:468
@ ERR__AUTO_OFFSET_RESET
Definition rdkafkacpp.h:327
@ ERR__NO_OFFSET
Definition rdkafkacpp.h:271
@ ERR_REBOOTSTRAP_REQUIRED
Definition rdkafkacpp.h:573
@ ERR__PURGE_INFLIGHT
Definition rdkafkacpp.h:305
@ ERR_INVALID_RECORD
Definition rdkafkacpp.h:530
@ ERR_FENCED_INSTANCE_ID
Definition rdkafkacpp.h:519
@ ERR_ILLEGAL_SASL_STATE
Definition rdkafkacpp.h:417
@ ERR__FAIL
Definition rdkafkacpp.h:212
@ ERR_MEMBER_ID_REQUIRED
Definition rdkafkacpp.h:512
@ ERR__EXISTING_SUBSCRIPTION
Definition rdkafkacpp.h:255
@ ERR__PREV_IN_PROGRESS
Definition rdkafkacpp.h:253
@ ERR_PRODUCER_FENCED
Definition rdkafkacpp.h:537
@ ERR__TIMED_OUT
Definition rdkafkacpp.h:237
@ ERR_PREFERRED_LEADER_NOT_AVAILABLE
Definition rdkafkacpp.h:514
@ ERR__PARTITION_EOF
Definition rdkafkacpp.h:225
@ ERR_NOT_ENOUGH_REPLICAS_AFTER_APPEND
Definition rdkafkacpp.h:389
@ ERR__QUEUE_FULL
Definition rdkafkacpp.h:239
@ ERR_INVALID_MSG
Definition rdkafkacpp.h:347
@ ERR_TOPIC_DELETION_DISABLED
Definition rdkafkacpp.h:500
@ ERR__VALUE_DESERIALIZATION
Definition rdkafkacpp.h:289
@ ERR_DELEGATION_TOKEN_REQUEST_NOT_ALLOWED
Definition rdkafkacpp.h:482
@ ERR__GAPLESS_GUARANTEE
Definition rdkafkacpp.h:311
@ ERR_NOT_COORDINATOR
Definition rdkafkacpp.h:379
@ ERR__UNSUPPORTED_FEATURE
Definition rdkafkacpp.h:277
@ ERR_STALE_MEMBER_EPOCH
Definition rdkafkacpp.h:564
@ ERR_INVALID_PRODUCER_EPOCH
Definition rdkafkacpp.h:443
@ ERR_DUPLICATE_SEQUENCE_NUMBER
Definition rdkafkacpp.h:441
@ ERR_OFFSET_METADATA_TOO_LARGE
Definition rdkafkacpp.h:367
@ ERR_NOT_ENOUGH_REPLICAS
Definition rdkafkacpp.h:387
@ ERR_UNSUPPORTED_SASL_MECHANISM
Definition rdkafkacpp.h:415
@ ERR__INCONSISTENT
Definition rdkafkacpp.h:309
@ ERR_UNKNOWN_PRODUCER_ID
Definition rdkafkacpp.h:472
@ ERR_TRANSACTIONAL_ID_AUTHORIZATION_FAILED
Definition rdkafkacpp.h:460
@ ERR_UNSUPPORTED_COMPRESSION_TYPE
Definition rdkafkacpp.h:506
@ ERR_UNKNOWN_TOPIC_ID
Definition rdkafkacpp.h:554
@ ERR_UNKNOWN_LEADER_EPOCH
Definition rdkafkacpp.h:504
@ ERR_LISTENER_NOT_FOUND
Definition rdkafkacpp.h:498
@ ERR__PURGE_QUEUE
Definition rdkafkacpp.h:303
@ ERR_SECURITY_DISABLED
Definition rdkafkacpp.h:462
@ ERR__ISR_INSUFF
Definition rdkafkacpp.h:241
@ ERR__FENCED
Definition rdkafkacpp.h:319
@ ERR_INCONSISTENT_VOTER_SET
Definition rdkafkacpp.h:546
@ ERR__BAD_COMPRESSION
Definition rdkafkacpp.h:208
@ ERR__ALL_BROKERS_DOWN
Definition rdkafkacpp.h:233
@ ERR__VALUE_SERIALIZATION
Definition rdkafkacpp.h:285
@ ERR_OPERATION_NOT_ATTEMPTED
Definition rdkafkacpp.h:464
@ ERR__END
Definition rdkafkacpp.h:337
@ ERR__IN_PROGRESS
Definition rdkafkacpp.h:251
@ ERR_INVALID_CONFIG
Definition rdkafkacpp.h:429
@ ERR_REPLICA_NOT_AVAILABLE
Definition rdkafkacpp.h:361
@ ERR_INVALID_REPLICA_ASSIGNMENT
Definition rdkafkacpp.h:427
@ ERR_BROKER_NOT_AVAILABLE
Definition rdkafkacpp.h:359
@ ERR__UNKNOWN_PROTOCOL
Definition rdkafkacpp.h:265
@ ERR__BAD_MSG
Definition rdkafkacpp.h:206
@ ERR__NOENT
Definition rdkafkacpp.h:295
@ ERR_REBALANCE_IN_PROGRESS
Definition rdkafkacpp.h:403
@ ERR_COORDINATOR_LOAD_IN_PROGRESS
Definition rdkafkacpp.h:371
@ ERR_RECORD_LIST_TOO_LARGE
Definition rdkafkacpp.h:385
@ ERR_INVALID_PRODUCER_ID_MAPPING
Definition rdkafkacpp.h:448
@ ERR__PARTIAL
Definition rdkafkacpp.h:291
@ ERR_TRANSACTION_COORDINATOR_FENCED
Definition rdkafkacpp.h:458
@ ERR_UNKNOWN_TOPIC_OR_PART
Definition rdkafkacpp.h:349
@ ERR__UNKNOWN_TOPIC
Definition rdkafkacpp.h:231
@ ERR__INVALID_ARG
Definition rdkafkacpp.h:235
@ ERR_UNSUPPORTED_FOR_MESSAGE_FORMAT
Definition rdkafkacpp.h:435
@ ERR_INVALID_TIMESTAMP
Definition rdkafkacpp.h:413
@ ERR__INVALID_TYPE
Definition rdkafkacpp.h:299
@ ERR_OFFSET_OUT_OF_RANGE
Definition rdkafkacpp.h:345
@ ERR_INVALID_UPDATE_VERSION
Definition rdkafkacpp.h:548
@ ERR_DELEGATION_TOKEN_AUTH_DISABLED
Definition rdkafkacpp.h:476
@ ERR_POLICY_VIOLATION
Definition rdkafkacpp.h:437
@ ERR_FENCED_MEMBER_EPOCH
Definition rdkafkacpp.h:556
@ ERR__KEY_DESERIALIZATION
Definition rdkafkacpp.h:287
@ ERR__FS
Definition rdkafkacpp.h:229
@ ERR_GROUP_MAX_SIZE_REACHED
Definition rdkafkacpp.h:516
@ ERR_NOT_CONTROLLER
Definition rdkafkacpp.h:431
@ ERR__INTR
Definition rdkafkacpp.h:281
@ ERR__TRANSPORT
Definition rdkafkacpp.h:214
RD_EXPORT void * mem_malloc(size_t size)
Allocate memory using the same allocator librdkafka uses.
RD_EXPORT int version()
Returns the librdkafka version as integer.
CertificateType
SSL certificate types.
Definition rdkafkacpp.h:589
@ CERT_PRIVATE_KEY
Definition rdkafkacpp.h:591
@ CERT_PUBLIC_KEY
Definition rdkafkacpp.h:590
@ CERT_CA
Definition rdkafkacpp.h:592
RD_EXPORT void mem_free(void *ptr)
Free pointer returned by librdkafka.