29 #ifndef _RDKAFKACPP_H_
30 #define _RDKAFKACPP_H_
58 #ifdef LIBRDKAFKA_STATICLIB
61 #ifdef LIBRDKAFKACPP_EXPORTS
62 #define RD_EXPORT __declspec(dllexport)
64 #define RD_EXPORT __declspec(dllimport)
76 struct rd_kafka_topic_s;
77 struct rd_kafka_message_s;
102 #define RD_KAFKA_VERSION 0x000b06ff
116 std::string version_str();
123 std::string get_debug_contexts();
135 int wait_destroyed(
int timeout_ms);
167 ERR__BAD_COMPRESSION = -198,
173 ERR__TRANSPORT = -195,
175 ERR__CRIT_SYS_RESOURCE = -194,
179 ERR__MSG_TIMED_OUT = -192,
182 ERR__PARTITION_EOF = -191,
184 ERR__UNKNOWN_PARTITION = -190,
188 ERR__UNKNOWN_TOPIC = -188,
190 ERR__ALL_BROKERS_DOWN = -187,
192 ERR__INVALID_ARG = -186,
194 ERR__TIMED_OUT = -185,
196 ERR__QUEUE_FULL = -184,
198 ERR__ISR_INSUFF = -183,
200 ERR__NODE_UPDATE = -182,
204 ERR__WAIT_COORD = -180,
206 ERR__UNKNOWN_GROUP = -179,
208 ERR__IN_PROGRESS = -178,
210 ERR__PREV_IN_PROGRESS = -177,
212 ERR__EXISTING_SUBSCRIPTION = -176,
214 ERR__ASSIGN_PARTITIONS = -175,
216 ERR__REVOKE_PARTITIONS = -174,
218 ERR__CONFLICT = -173,
222 ERR__UNKNOWN_PROTOCOL = -171,
224 ERR__NOT_IMPLEMENTED = -170,
226 ERR__AUTHENTICATION = -169,
228 ERR__NO_OFFSET = -168,
230 ERR__OUTDATED = -167,
232 ERR__TIMED_OUT_QUEUE = -166,
234 ERR__UNSUPPORTED_FEATURE = -165,
236 ERR__WAIT_CACHE = -164,
240 ERR__KEY_SERIALIZATION = -162,
242 ERR__VALUE_SERIALIZATION = -161,
244 ERR__KEY_DESERIALIZATION = -160,
246 ERR__VALUE_DESERIALIZATION = -159,
250 ERR__READ_ONLY = -157,
254 ERR__UNDERFLOW = -155,
256 ERR__INVALID_TYPE = -154,
267 ERR_OFFSET_OUT_OF_RANGE = 1,
271 ERR_UNKNOWN_TOPIC_OR_PART = 3,
273 ERR_INVALID_MSG_SIZE = 4,
275 ERR_LEADER_NOT_AVAILABLE = 5,
277 ERR_NOT_LEADER_FOR_PARTITION = 6,
279 ERR_REQUEST_TIMED_OUT = 7,
281 ERR_BROKER_NOT_AVAILABLE = 8,
283 ERR_REPLICA_NOT_AVAILABLE = 9,
285 ERR_MSG_SIZE_TOO_LARGE = 10,
287 ERR_STALE_CTRL_EPOCH = 11,
289 ERR_OFFSET_METADATA_TOO_LARGE = 12,
291 ERR_NETWORK_EXCEPTION = 13,
293 ERR_GROUP_LOAD_IN_PROGRESS = 14,
295 ERR_GROUP_COORDINATOR_NOT_AVAILABLE = 15,
297 ERR_NOT_COORDINATOR_FOR_GROUP = 16,
299 ERR_TOPIC_EXCEPTION = 17,
301 ERR_RECORD_LIST_TOO_LARGE = 18,
303 ERR_NOT_ENOUGH_REPLICAS = 19,
305 ERR_NOT_ENOUGH_REPLICAS_AFTER_APPEND = 20,
307 ERR_INVALID_REQUIRED_ACKS = 21,
309 ERR_ILLEGAL_GENERATION = 22,
311 ERR_INCONSISTENT_GROUP_PROTOCOL = 23,
313 ERR_INVALID_GROUP_ID = 24,
315 ERR_UNKNOWN_MEMBER_ID = 25,
317 ERR_INVALID_SESSION_TIMEOUT = 26,
319 ERR_REBALANCE_IN_PROGRESS = 27,
321 ERR_INVALID_COMMIT_OFFSET_SIZE = 28,
323 ERR_TOPIC_AUTHORIZATION_FAILED = 29,
325 ERR_GROUP_AUTHORIZATION_FAILED = 30,
327 ERR_CLUSTER_AUTHORIZATION_FAILED = 31,
329 ERR_INVALID_TIMESTAMP = 32,
331 ERR_UNSUPPORTED_SASL_MECHANISM = 33,
333 ERR_ILLEGAL_SASL_STATE = 34,
335 ERR_UNSUPPORTED_VERSION = 35,
337 ERR_TOPIC_ALREADY_EXISTS = 36,
339 ERR_INVALID_PARTITIONS = 37,
341 ERR_INVALID_REPLICATION_FACTOR = 38,
343 ERR_INVALID_REPLICA_ASSIGNMENT = 39,
345 ERR_INVALID_CONFIG = 40,
347 ERR_NOT_CONTROLLER = 41,
349 ERR_INVALID_REQUEST = 42,
351 ERR_UNSUPPORTED_FOR_MESSAGE_FORMAT = 43,
353 ERR_POLICY_VIOLATION = 44,
355 ERR_OUT_OF_ORDER_SEQUENCE_NUMBER = 45,
357 ERR_DUPLICATE_SEQUENCE_NUMBER = 46,
359 ERR_INVALID_PRODUCER_EPOCH = 47,
361 ERR_INVALID_TXN_STATE = 48,
364 ERR_INVALID_PRODUCER_ID_MAPPING = 49,
367 ERR_INVALID_TRANSACTION_TIMEOUT = 50,
370 ERR_CONCURRENT_TRANSACTIONS = 51,
374 ERR_TRANSACTION_COORDINATOR_FENCED = 52,
376 ERR_TRANSACTIONAL_ID_AUTHORIZATION_FAILED = 53,
378 ERR_SECURITY_DISABLED = 54,
380 ERR_OPERATION_NOT_ATTEMPTED = 55
388 std::string err2str(RdKafka::ErrorCode err);
402 class TopicPartition;
441 virtual void dr_cb (
Message &message) = 0;
472 virtual int32_t partitioner_cb (
const Topic *topic,
473 const std::string *key,
474 int32_t partition_cnt,
475 void *msg_opaque) = 0;
497 int32_t partition_cnt,
498 void *msg_opaque) = 0;
520 virtual void event_cb (
Event &event) = 0;
541 EVENT_SEVERITY_EMERG = 0,
542 EVENT_SEVERITY_ALERT = 1,
543 EVENT_SEVERITY_CRITICAL = 2,
544 EVENT_SEVERITY_ERROR = 3,
545 EVENT_SEVERITY_WARNING = 4,
546 EVENT_SEVERITY_NOTICE = 5,
547 EVENT_SEVERITY_INFO = 6,
548 EVENT_SEVERITY_DEBUG = 7
551 virtual ~
Event () { }
561 virtual Type type ()
const = 0;
567 virtual ErrorCode err ()
const = 0;
573 virtual Severity severity ()
const = 0;
579 virtual std::string fac ()
const = 0;
589 virtual std::string str ()
const = 0;
595 virtual int throttle_time ()
const = 0;
601 virtual std::string broker_name ()
const = 0;
607 virtual int broker_id ()
const = 0;
624 virtual void consume_cb (
Message &message,
void *opaque) = 0;
685 RdKafka::ErrorCode err,
686 std::vector<TopicPartition*>&partitions) = 0;
712 virtual void offset_commit_cb(RdKafka::ErrorCode err,
713 std::vector<TopicPartition*>&offsets) = 0;
739 virtual int socket_cb (
int domain,
int type,
int protocol) = 0;
762 virtual int open_cb (
const std::string &path,
int flags,
int mode) = 0;
811 static Conf *create (ConfType type);
829 const std::string &value,
830 std::string &errstr) = 0;
834 DeliveryReportCb *dr_cb,
835 std::string &errstr) = 0;
840 std::string &errstr) = 0;
850 const Conf *topic_conf,
851 std::string &errstr) = 0;
855 PartitionerCb *partitioner_cb,
856 std::string &errstr) = 0;
860 PartitionerKeyPointerCb *partitioner_kp_cb,
861 std::string &errstr) = 0;
865 std::string &errstr) = 0;
869 std::string &errstr) = 0;
873 RebalanceCb *rebalance_cb,
874 std::string &errstr) = 0;
878 OffsetCommitCb *offset_commit_cb,
879 std::string &errstr) = 0;
893 std::string &value)
const = 0;
913 virtual Conf::ConfResult get(PartitionerKeyPointerCb *&partitioner_kp_cb)
const = 0;
937 virtual std::list<std::string> *dump () = 0;
940 virtual Conf::ConfResult set (
const std::string &name, ConsumeCb *consume_cb,
941 std::string &errstr) = 0;
961 virtual const std::string name ()
const = 0;
971 virtual const std::string memberid ()
const = 0;
996 virtual int poll (
int timeout_ms) = 0;
1004 virtual int outq_len () = 0;
1021 virtual ErrorCode metadata (
bool all_topics,
const Topic *only_rkt,
1022 Metadata **metadatap,
int timeout_ms) = 0;
1034 virtual ErrorCode pause (std::vector<TopicPartition*> &partitions) = 0;
1046 virtual ErrorCode resume (std::vector<TopicPartition*> &partitions) = 0;
1057 virtual ErrorCode query_watermark_offsets (
const std::string &topic,
1059 int64_t *low, int64_t *high,
1060 int timeout_ms) = 0;
1079 virtual ErrorCode get_watermark_offsets (
const std::string &topic,
1081 int64_t *low, int64_t *high) = 0;
1103 virtual ErrorCode offsetsForTimes (std::vector<TopicPartition*> &offsets,
1104 int timeout_ms) = 0;
1133 virtual ErrorCode set_log_queue (
Queue *queue) = 0;
1146 virtual void yield () = 0;
1162 virtual const std::string clusterid (
int timeout_ms) = 0;
1180 virtual struct rd_kafka_s *c_ptr () = 0;
1197 virtual int32_t controllerid (
int timeout_ms) = 0;
1227 static TopicPartition *create (
const std::string &topic,
int partition);
1228 static TopicPartition *create (
const std::string &topic,
int partition,
1237 static void destroy (std::vector<TopicPartition*> &partitions);
1240 virtual const std::string &topic ()
const = 0;
1243 virtual int partition ()
const = 0;
1246 virtual int64_t offset ()
const = 0;
1249 virtual void set_offset (int64_t offset) = 0;
1252 virtual ErrorCode err ()
const = 0;
1287 static Topic *create (
Handle *base,
const std::string &topic_str,
1288 Conf *conf, std::string &errstr);
1290 virtual ~
Topic () = 0;
1294 virtual const std::string name ()
const = 0;
1301 virtual bool partition_available (int32_t partition)
const = 0;
1313 virtual ErrorCode offset_store (int32_t partition, int64_t offset) = 0;
1331 virtual struct rd_kafka_topic_s *c_ptr () = 0;
1361 MSG_TIMESTAMP_LOG_APPEND_TIME
1390 virtual std::string errstr()
const = 0;
1393 virtual ErrorCode err ()
const = 0;
1399 virtual Topic *topic ()
const = 0;
1402 virtual std::string topic_name ()
const = 0;
1405 virtual int32_t partition ()
const = 0;
1408 virtual void *payload ()
const = 0 ;
1411 virtual size_t len ()
const = 0;
1414 virtual const std::string *key ()
const = 0;
1417 virtual const void *key_pointer ()
const = 0 ;
1420 virtual size_t key_len ()
const = 0;
1423 virtual int64_t offset ()
const = 0;
1429 virtual void *msg_opaque ()
const = 0;
1435 virtual int64_t latency ()
const = 0;
1453 virtual struct rd_kafka_message_s *c_ptr () = 0;
1496 virtual ErrorCode forward (
Queue *dst) = 0;
1510 virtual Message *consume (
int timeout_ms) = 0;
1519 virtual int poll (
int timeout_ms) = 0;
1521 virtual ~
Queue () = 0;
1538 virtual void io_event_enable (
int fd,
const void *payload,
size_t size) = 0;
1579 virtual ErrorCode assignment (std::vector<RdKafka::TopicPartition*> &partitions) = 0;
1583 virtual ErrorCode subscription (std::vector<std::string> &topics) = 0;
1609 virtual ErrorCode subscribe (
const std::vector<std::string> &topics) = 0;
1612 virtual ErrorCode unsubscribe () = 0;
1620 virtual ErrorCode assign (
const std::vector<TopicPartition*> &partitions) = 0;
1625 virtual ErrorCode unassign () = 0;
1651 virtual Message *consume (
int timeout_ms) = 0;
1666 virtual ErrorCode commitSync () = 0;
1673 virtual ErrorCode commitAsync () = 0;
1682 virtual ErrorCode commitSync (
Message *message) = 0;
1691 virtual ErrorCode commitAsync (
Message *message) = 0;
1698 virtual ErrorCode commitSync (std::vector<TopicPartition*> &offsets) = 0;
1705 virtual ErrorCode commitAsync (
const std::vector<TopicPartition*> &offsets) = 0;
1717 virtual ErrorCode commitSync (
OffsetCommitCb *offset_commit_cb) = 0;
1729 virtual ErrorCode commitSync (std::vector<TopicPartition*> &offsets,
1743 virtual ErrorCode committed (std::vector<TopicPartition*> &partitions,
1744 int timeout_ms) = 0;
1754 virtual ErrorCode position (std::vector<TopicPartition*> &partitions) = 0;
1779 virtual ErrorCode close () = 0;
1799 virtual ErrorCode seek (
const TopicPartition &partition,
int timeout_ms) = 0;
1816 virtual ErrorCode offsets_store (std::vector<TopicPartition*> &offsets) = 0;
1846 static Consumer *create (
Conf *conf, std::string &errstr);
1870 virtual ErrorCode start (
Topic *topic, int32_t partition, int64_t offset) = 0;
1878 virtual ErrorCode start (
Topic *topic, int32_t partition, int64_t offset,
1890 virtual ErrorCode stop (
Topic *topic, int32_t partition) = 0;
1906 virtual ErrorCode seek (
Topic *topic, int32_t partition, int64_t offset,
1907 int timeout_ms) = 0;
1926 virtual Message *consume (
Topic *topic, int32_t partition,
1927 int timeout_ms) = 0;
1950 virtual Message *consume (
Queue *queue,
int timeout_ms) = 0;
1971 virtual int consume_callback (
Topic *topic, int32_t partition,
1982 virtual int consume_callback (
Queue *queue,
int timeout_ms,
1995 static int64_t OffsetTail(int64_t offset);
2023 static Producer *create (
Conf *conf, std::string &errstr);
2062 MSG_FREE = RK_MSG_FREE,
2063 MSG_COPY = RK_MSG_COPY
2124 virtual ErrorCode produce (
Topic *topic, int32_t partition,
2126 void *payload,
size_t len,
2127 const std::string *key,
2128 void *msg_opaque) = 0;
2134 virtual ErrorCode produce (
Topic *topic, int32_t partition,
2136 void *payload,
size_t len,
2137 const void *key,
size_t key_len,
2138 void *msg_opaque) = 0;
2146 virtual ErrorCode produce (
const std::string topic_name, int32_t partition,
2148 void *payload,
size_t len,
2149 const void *key,
size_t key_len,
2151 void *msg_opaque) = 0;
2158 virtual ErrorCode produce (
Topic *topic, int32_t partition,
2159 const std::vector<char> *payload,
2160 const std::vector<char> *key,
2161 void *msg_opaque) = 0;
2175 virtual ErrorCode flush (
int timeout_ms) = 0;
2194 virtual int32_t
id()
const = 0;
2197 virtual const std::string
host()
const = 0;
2200 virtual int port()
const = 0;
2224 virtual int32_t
id()
const = 0;
2227 virtual ErrorCode
err()
const = 0;
2230 virtual int32_t
leader()
const = 0;
2233 virtual const std::vector<int32_t> *
replicas()
const = 0;
2238 virtual const std::vector<int32_t> *
isrs()
const = 0;
2256 virtual const std::string
topic()
const = 0;
2259 virtual const PartitionMetadataVector *
partitions()
const = 0;
2262 virtual ErrorCode
err()
const = 0;
2285 virtual const BrokerMetadataVector *
brokers()
const = 0;
2288 virtual const TopicMetadataVector *
topics()
const = 0;
virtual int32_t partitioner_cb(const Topic *topic, const void *key, size_t key_len, int32_t partition_cnt, void *msg_opaque)=0
Variant partitioner callback that gets key as pointer and length instead of as a const std::string *...
High-level KafkaConsumer (for brokers 0.9 and later)
Definition: rdkafkacpp.h:1559
Definition: rdkafkacpp.h:1360
static const int64_t OFFSET_BEGINNING
Special offsets.
Definition: rdkafkacpp.h:1272
ConfType
Configuration object type.
Definition: rdkafkacpp.h:793
Partitioner callback class.
Definition: rdkafkacpp.h:454
Type
Event type.
Definition: rdkafkacpp.h:532
static const int32_t PARTITION_UA
Unassigned partition.
Definition: rdkafkacpp.h:1269
Message object.
Definition: rdkafkacpp.h:1381
Severity
EVENT_LOG severities (conforms to syslog(3) severities)
Definition: rdkafkacpp.h:540
Event callback class.
Definition: rdkafkacpp.h:513
KafkaConsunmer: Rebalance callback class
Definition: rdkafkacpp.h:633
int64_t timestamp
Definition: rdkafkacpp.h:1365
Variant partitioner with key pointer.
Definition: rdkafkacpp.h:484
Definition: rdkafkacpp.h:535
static const int64_t OFFSET_INVALID
Definition: rdkafkacpp.h:1275
Definition: rdkafkacpp.h:1359
Topic handle.
Definition: rdkafkacpp.h:1261
Producer.
Definition: rdkafkacpp.h:2011
Definition: rdkafkacpp.h:794
Definition: rdkafkacpp.h:533
ConfResult
RdKafka::Conf::Set() result code.
Definition: rdkafkacpp.h:801
Definition: rdkafkacpp.h:80
Queue interface.
Definition: rdkafkacpp.h:1479
MessageTimestampType type
Definition: rdkafkacpp.h:1364
Message timestamp object.
Definition: rdkafkacpp.h:1356
static const int64_t OFFSET_END
Definition: rdkafkacpp.h:1273
Portability: OpenCb callback class
Definition: rdkafkacpp.h:749
Definition: rdkafkacpp.h:534
static const int64_t OFFSET_STORED
Definition: rdkafkacpp.h:1274
Configuration interface.
Definition: rdkafkacpp.h:788
Offset Commit callback class.
Definition: rdkafkacpp.h:695
Base handle, super class for specific clients.
Definition: rdkafkacpp.h:956
Topic+Partition.
Definition: rdkafkacpp.h:1219
Consume callback class.
Definition: rdkafkacpp.h:615
Simple Consumer (legacy)
Definition: rdkafkacpp.h:1834
MessageTimestampType
Definition: rdkafkacpp.h:1358
Event object class as passed to the EventCb callback.
Definition: rdkafkacpp.h:529
Portability: SocketCb callback class
Definition: rdkafkacpp.h:724
Delivery Report callback class.
Definition: rdkafkacpp.h:436