librdkafka
The Apache Kafka C/C++ client library
 All Data Structures Files Functions Variables Typedefs Enumerations Enumerator Macros Pages
rdkafka.h
Go to the documentation of this file.
1 /*
2  * librdkafka - Apache Kafka C library
3  *
4  * Copyright (c) 2012-2013 Magnus Edenhill
5  * All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions are met:
9  *
10  * 1. Redistributions of source code must retain the above copyright notice,
11  * this list of conditions and the following disclaimer.
12  * 2. Redistributions in binary form must reproduce the above copyright notice,
13  * this list of conditions and the following disclaimer in the documentation
14  * and/or other materials provided with the distribution.
15  *
16  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26  * POSSIBILITY OF SUCH DAMAGE.
27  */
28 
43 /* @cond NO_DOC */
44 #pragma once
45 
46 #include <stdio.h>
47 #include <inttypes.h>
48 #include <sys/types.h>
49 
50 #ifdef __cplusplus
51 extern "C" {
52 #if 0
53 } /* Restore indent */
54 #endif
55 #endif
56 
57 #ifdef _MSC_VER
58 #include <basetsd.h>
59 typedef SSIZE_T ssize_t;
60 #define RD_UNUSED
61 #define RD_DEPRECATED
62 #undef RD_EXPORT
63 #ifdef LIBRDKAFKA_EXPORTS
64 #define RD_EXPORT __declspec(dllexport)
65 #else
66 #define RD_EXPORT __declspec(dllimport)
67 #endif
68 
69 #else
70 #define RD_UNUSED __attribute__((unused))
71 #define RD_EXPORT
72 #define RD_DEPRECATED __attribute__((deprecated))
73 #endif
74 /* @endcond */
75 
76 
77 
99 #define RD_KAFKA_VERSION 0x000900ff
100 
109 RD_EXPORT
110 int rd_kafka_version(void);
111 
117 RD_EXPORT
118 const char *rd_kafka_version_str (void);
119 
138 typedef enum rd_kafka_type_t {
142 
143 
150 RD_EXPORT
151 const char *rd_kafka_get_debug_contexts(void);
152 
160 #define RD_KAFKA_DEBUG_CONTEXTS \
161  "all,generic,broker,topic,metadata,producer,queue,msg,protocol,cgrp,security,fetch"
162 
163 
164 /* @cond NO_DOC */
165 /* Private types to provide ABI compatibility */
166 typedef struct rd_kafka_s rd_kafka_t;
167 typedef struct rd_kafka_topic_s rd_kafka_topic_t;
168 typedef struct rd_kafka_conf_s rd_kafka_conf_t;
169 typedef struct rd_kafka_topic_conf_s rd_kafka_topic_conf_t;
170 typedef struct rd_kafka_queue_s rd_kafka_queue_t;
171 /* @endcond */
172 
173 
186 typedef enum {
187  /* Internal errors to rdkafka: */
257 
258  /* Kafka broker errors: */
325 
327 
328 
334 RD_EXPORT
335 const char *rd_kafka_err2str (rd_kafka_resp_err_t err);
336 
337 
353 RD_EXPORT
355 
356 
357 
358 
374 typedef struct rd_kafka_topic_partition_s {
375  char *topic;
376  int32_t partition;
377  int64_t offset;
378  void *metadata;
379  size_t metadata_size;
380  void *opaque;
382  void *_private;
385 
386 
387 
392 typedef struct rd_kafka_topic_partition_list_s {
393  int cnt;
394  int size;
397 
398 
413 RD_EXPORT
415 
416 
420 RD_EXPORT
421 void
423 
433 RD_EXPORT
436  const char *topic, int32_t partition);
437 
438 
447 RD_EXPORT
448 void
450  *rktparlist,
451  const char *topic,
452  int32_t start, int32_t stop);
453 
454 
455 
463 RD_EXPORT
466 
467 
480 // FIXME: This doesn't show up in docs for some reason
481 // "Compound rd_kafka_message_t is not documented."
482 
496 typedef struct rd_kafka_message_s {
498  rd_kafka_topic_t *rkt;
499  int32_t partition;
500  void *payload;
503  size_t len;
506  void *key;
508  size_t key_len;
510  int64_t offset;
520  void *_private;
525 
526 
530 RD_EXPORT
532 
533 
538 static __inline const char *
539 RD_UNUSED
541  if (!rkmessage->err)
542  return NULL;
543 
544  if (rkmessage->payload)
545  return (const char *)rkmessage->payload;
546 
547  return rd_kafka_err2str(rkmessage->err);
548 }
549 
565 typedef enum {
570 
571 
602 RD_EXPORT
603 rd_kafka_conf_t *rd_kafka_conf_new(void);
604 
605 
609 RD_EXPORT
610 void rd_kafka_conf_destroy(rd_kafka_conf_t *conf);
611 
612 
616 RD_EXPORT
617 rd_kafka_conf_t *rd_kafka_conf_dup(const rd_kafka_conf_t *conf);
618 
619 
629 RD_EXPORT
630 rd_kafka_conf_res_t rd_kafka_conf_set(rd_kafka_conf_t *conf,
631  const char *name,
632  const char *value,
633  char *errstr, size_t errstr_size);
634 
635 
639 RD_EXPORT
640 void rd_kafka_conf_set_dr_cb(rd_kafka_conf_t *conf,
641  void (*dr_cb) (rd_kafka_t *rk,
642  void *payload, size_t len,
644  void *opaque, void *msg_opaque));
645 
660 RD_EXPORT
661 void rd_kafka_conf_set_dr_msg_cb(rd_kafka_conf_t *conf,
662  void (*dr_msg_cb) (rd_kafka_t *rk,
663  const rd_kafka_message_t *
664  rkmessage,
665  void *opaque));
666 
667 
672 RD_EXPORT
673 void rd_kafka_conf_set_consume_cb (rd_kafka_conf_t *conf,
674  void (*consume_cb) (rd_kafka_message_t *
675  rkmessage,
676  void *opaque));
677 
729 RD_EXPORT
731  rd_kafka_conf_t *conf,
732  void (*rebalance_cb) (rd_kafka_t *rk,
735  void *opaque));
736 
737 
738 
749 RD_EXPORT
751  rd_kafka_conf_t *conf,
752  void (*offset_commit_cb) (rd_kafka_t *rk,
755  void *opaque));
756 
757 
766 RD_EXPORT
767 void rd_kafka_conf_set_error_cb(rd_kafka_conf_t *conf,
768  void (*error_cb) (rd_kafka_t *rk, int err,
769  const char *reason,
770  void *opaque));
771 
785 RD_EXPORT
786 void rd_kafka_conf_set_throttle_cb (rd_kafka_conf_t *conf,
787  void (*throttle_cb) (
788  rd_kafka_t *rk,
789  const char *broker_name,
790  int32_t broker_id,
791  int throttle_time_ms,
792  void *opaque));
793 
794 
805 RD_EXPORT
806 void rd_kafka_conf_set_log_cb(rd_kafka_conf_t *conf,
807  void (*log_cb) (const rd_kafka_t *rk, int level,
808  const char *fac, const char *buf));
809 
810 
827 RD_EXPORT
828 void rd_kafka_conf_set_stats_cb(rd_kafka_conf_t *conf,
829  int (*stats_cb) (rd_kafka_t *rk,
830  char *json,
831  size_t json_len,
832  void *opaque));
833 
834 
835 
848 RD_EXPORT
849 void rd_kafka_conf_set_socket_cb(rd_kafka_conf_t *conf,
850  int (*socket_cb) (int domain, int type,
851  int protocol,
852  void *opaque));
853 
854 
855 #ifndef _MSC_VER
856 
868 RD_EXPORT
869 void rd_kafka_conf_set_open_cb (rd_kafka_conf_t *conf,
870  int (*open_cb) (const char *pathname,
871  int flags, mode_t mode,
872  void *opaque));
873 #endif
874 
878 RD_EXPORT
879 void rd_kafka_conf_set_opaque(rd_kafka_conf_t *conf, void *opaque);
880 
884 RD_EXPORT
885 void *rd_kafka_opaque(const rd_kafka_t *rk);
886 
887 
888 
894 RD_EXPORT
895 void rd_kafka_conf_set_default_topic_conf (rd_kafka_conf_t *conf,
896  rd_kafka_topic_conf_t *tconf);
897 
898 
899 
915 RD_EXPORT
916 rd_kafka_conf_res_t rd_kafka_conf_get (const rd_kafka_conf_t *conf,
917  const char *name,
918  char *dest, size_t *dest_size);
919 
920 
926 RD_EXPORT
927 rd_kafka_conf_res_t rd_kafka_topic_conf_get (const rd_kafka_topic_conf_t *conf,
928  const char *name,
929  char *dest, size_t *dest_size);
930 
931 
940 RD_EXPORT
941 const char **rd_kafka_conf_dump(rd_kafka_conf_t *conf, size_t *cntp);
942 
943 
952 RD_EXPORT
953 const char **rd_kafka_topic_conf_dump(rd_kafka_topic_conf_t *conf,
954  size_t *cntp);
955 
960 RD_EXPORT
961 void rd_kafka_conf_dump_free(const char **arr, size_t cnt);
962 
967 RD_EXPORT
968 void rd_kafka_conf_properties_show(FILE *fp);
969 
987 RD_EXPORT
988 rd_kafka_topic_conf_t *rd_kafka_topic_conf_new(void);
989 
990 
994 RD_EXPORT
995 rd_kafka_topic_conf_t *rd_kafka_topic_conf_dup(const rd_kafka_topic_conf_t
996  *conf);
997 
998 
1002 RD_EXPORT
1003 void rd_kafka_topic_conf_destroy(rd_kafka_topic_conf_t *topic_conf);
1004 
1005 
1014 RD_EXPORT
1015 rd_kafka_conf_res_t rd_kafka_topic_conf_set(rd_kafka_topic_conf_t *conf,
1016  const char *name,
1017  const char *value,
1018  char *errstr, size_t errstr_size);
1019 
1024 RD_EXPORT
1025 void rd_kafka_topic_conf_set_opaque(rd_kafka_topic_conf_t *conf, void *opaque);
1026 
1027 
1042 RD_EXPORT
1043 void
1044 rd_kafka_topic_conf_set_partitioner_cb (rd_kafka_topic_conf_t *topic_conf,
1045  int32_t (*partitioner) (
1046  const rd_kafka_topic_t *rkt,
1047  const void *keydata,
1048  size_t keylen,
1049  int32_t partition_cnt,
1050  void *rkt_opaque,
1051  void *msg_opaque));
1052 
1060 RD_EXPORT
1061 int rd_kafka_topic_partition_available(const rd_kafka_topic_t *rkt,
1062  int32_t partition);
1063 
1064 
1065 /*******************************************************************
1066  * *
1067  * Partitioners provided by rdkafka *
1068  * *
1069  *******************************************************************/
1070 
1080 RD_EXPORT
1081 int32_t rd_kafka_msg_partitioner_random(const rd_kafka_topic_t *rkt,
1082  const void *key, size_t keylen,
1083  int32_t partition_cnt,
1084  void *opaque, void *msg_opaque);
1085 
1094 int32_t rd_kafka_msg_partitioner_consistent (const rd_kafka_topic_t *rkt,
1095  const void *key, size_t keylen,
1096  int32_t partition_cnt,
1097  void *opaque, void *msg_opaque);
1098 
1099 
1100 
1141 RD_EXPORT
1142 rd_kafka_t *rd_kafka_new(rd_kafka_type_t type, rd_kafka_conf_t *conf,
1143  char *errstr, size_t errstr_size);
1144 
1145 
1151 RD_EXPORT
1152 void rd_kafka_destroy(rd_kafka_t *rk);
1153 
1154 
1155 
1159 RD_EXPORT
1160 const char *rd_kafka_name(const rd_kafka_t *rk);
1161 
1162 
1172 RD_EXPORT
1173 char *rd_kafka_memberid (const rd_kafka_t *rk);
1174 
1175 
1197 RD_EXPORT
1198 rd_kafka_topic_t *rd_kafka_topic_new(rd_kafka_t *rk, const char *topic,
1199  rd_kafka_topic_conf_t *conf);
1200 
1201 
1202 
1206 RD_EXPORT
1207 void rd_kafka_topic_destroy(rd_kafka_topic_t *rkt);
1208 
1209 
1213 RD_EXPORT
1214 const char *rd_kafka_topic_name(const rd_kafka_topic_t *rkt);
1215 
1216 
1220 RD_EXPORT
1221 void *rd_kafka_topic_opaque (const rd_kafka_topic_t *rkt);
1222 
1223 
1230 #define RD_KAFKA_PARTITION_UA ((int32_t)-1)
1231 
1232 
1251 RD_EXPORT
1252 int rd_kafka_poll(rd_kafka_t *rk, int timeout_ms);
1253 
1254 
1265 RD_EXPORT
1266 void rd_kafka_yield (rd_kafka_t *rk);
1267 
1268 
1292 RD_EXPORT
1293 rd_kafka_queue_t *rd_kafka_queue_new(rd_kafka_t *rk);
1294 
1298 RD_EXPORT
1299 void rd_kafka_queue_destroy(rd_kafka_queue_t *rkqu);
1300 
1301 
1312 #define RD_KAFKA_OFFSET_BEGINNING -2
1314 #define RD_KAFKA_OFFSET_END -1
1316 #define RD_KAFKA_OFFSET_STORED -1000
1320 #define RD_KAFKA_OFFSET_TAIL_BASE -2000 /* internal: do not use */
1321 
1328 #define RD_KAFKA_OFFSET_TAIL(CNT) (RD_KAFKA_OFFSET_TAIL_BASE - (CNT))
1329 
1363 RD_EXPORT
1364 int rd_kafka_consume_start(rd_kafka_topic_t *rkt, int32_t partition,
1365  int64_t offset);
1366 
1381 RD_EXPORT
1382 int rd_kafka_consume_start_queue(rd_kafka_topic_t *rkt, int32_t partition,
1383  int64_t offset, rd_kafka_queue_t *rkqu);
1384 
1398 RD_EXPORT
1399 int rd_kafka_consume_stop(rd_kafka_topic_t *rkt, int32_t partition);
1400 
1401 
1402 
1417 RD_EXPORT
1418 rd_kafka_resp_err_t rd_kafka_seek (rd_kafka_topic_t *rkt,
1419  int32_t partition,
1420  int64_t offset,
1421  int timeout_ms);
1422 
1423 
1445 RD_EXPORT
1446 rd_kafka_message_t *rd_kafka_consume(rd_kafka_topic_t *rkt, int32_t partition,
1447  int timeout_ms);
1448 
1449 
1450 
1473 RD_EXPORT
1474 ssize_t rd_kafka_consume_batch(rd_kafka_topic_t *rkt, int32_t partition,
1475  int timeout_ms,
1476  rd_kafka_message_t **rkmessages,
1477  size_t rkmessages_size);
1478 
1479 
1480 
1501 RD_EXPORT
1502 int rd_kafka_consume_callback(rd_kafka_topic_t *rkt, int32_t partition,
1503  int timeout_ms,
1504  void (*consume_cb) (rd_kafka_message_t
1505  *rkmessage,
1506  void *opaque),
1507  void *opaque);
1508 
1509 
1526 RD_EXPORT
1527 rd_kafka_message_t *rd_kafka_consume_queue(rd_kafka_queue_t *rkqu,
1528  int timeout_ms);
1529 
1535 RD_EXPORT
1536 ssize_t rd_kafka_consume_batch_queue(rd_kafka_queue_t *rkqu,
1537  int timeout_ms,
1538  rd_kafka_message_t **rkmessages,
1539  size_t rkmessages_size);
1540 
1546 RD_EXPORT
1547 int rd_kafka_consume_callback_queue(rd_kafka_queue_t *rkqu,
1548  int timeout_ms,
1549  void (*consume_cb) (rd_kafka_message_t
1550  *rkmessage,
1551  void *opaque),
1552  void *opaque);
1553 
1554 
1580 RD_EXPORT
1581 rd_kafka_resp_err_t rd_kafka_offset_store(rd_kafka_topic_t *rkt,
1582  int32_t partition, int64_t offset);
1605 RD_EXPORT rd_kafka_resp_err_t
1606 rd_kafka_subscribe (rd_kafka_t *rk,
1607  const rd_kafka_topic_partition_list_t *topics);
1608 
1609 
1613 RD_EXPORT
1614 rd_kafka_resp_err_t rd_kafka_unsubscribe (rd_kafka_t *rk);
1615 
1616 
1620 RD_EXPORT rd_kafka_resp_err_t
1621 rd_kafka_subscription (rd_kafka_t *rk,
1623 
1624 
1625 
1631 RD_EXPORT
1632 rd_kafka_message_t *rd_kafka_consumer_poll (rd_kafka_t *rk, int timeout_ms);
1633 
1649 RD_EXPORT
1651 
1652 
1653 
1658 RD_EXPORT rd_kafka_resp_err_t
1659 rd_kafka_assign (rd_kafka_t *rk,
1660  const rd_kafka_topic_partition_list_t *partitions);
1661 
1665 RD_EXPORT rd_kafka_resp_err_t
1666 rd_kafka_assignment (rd_kafka_t *rk,
1667  rd_kafka_topic_partition_list_t **partitions);
1668 
1669 
1670 
1671 
1684 RD_EXPORT rd_kafka_resp_err_t
1685 rd_kafka_commit (rd_kafka_t *rk, const rd_kafka_topic_partition_list_t *offsets,
1686  int async);
1687 
1688 
1692 RD_EXPORT rd_kafka_resp_err_t
1693 rd_kafka_commit_message (rd_kafka_t *rk, const rd_kafka_message_t *rkmessage,
1694  int async);
1695 
1696 
1712 #define RD_KAFKA_MSG_F_FREE 0x1
1713 #define RD_KAFKA_MSG_F_COPY 0x2
1767 RD_EXPORT
1768 int rd_kafka_produce(rd_kafka_topic_t *rkt, int32_t partitition,
1769  int msgflags,
1770  void *payload, size_t len,
1771  const void *key, size_t keylen,
1772  void *msg_opaque);
1773 
1774 
1775 
1797 RD_EXPORT
1798 int rd_kafka_produce_batch(rd_kafka_topic_t *rkt, int32_t partition,
1799  int msgflags,
1800  rd_kafka_message_t *rkmessages, int message_cnt);
1801 
1802 
1803 
1804 
1819 typedef struct rd_kafka_metadata_broker {
1820  int32_t id;
1821  char *host;
1822  int port;
1828 typedef struct rd_kafka_metadata_partition {
1829  int32_t id;
1830  rd_kafka_resp_err_t err;
1831  int32_t leader;
1832  int replica_cnt;
1833  int32_t *replicas;
1834  int isr_cnt;
1835  int32_t *isrs;
1841 typedef struct rd_kafka_metadata_topic {
1842  char *topic;
1843  int partition_cnt;
1844  struct rd_kafka_metadata_partition *partitions;
1852 typedef struct rd_kafka_metadata {
1853  int broker_cnt;
1854  struct rd_kafka_metadata_broker *brokers;
1856  int topic_cnt;
1857  struct rd_kafka_metadata_topic *topics;
1859  int32_t orig_broker_id;
1860  char *orig_broker_name;
1880 RD_EXPORT
1882 rd_kafka_metadata (rd_kafka_t *rk, int all_topics,
1883  rd_kafka_topic_t *only_rkt,
1884  const struct rd_kafka_metadata **metadatap,
1885  int timeout_ms);
1886 
1890 RD_EXPORT
1891 void rd_kafka_metadata_destroy(const struct rd_kafka_metadata *metadata);
1892 
1893 
1936 RD_EXPORT
1937 int rd_kafka_brokers_add(rd_kafka_t *rk, const char *brokerlist);
1938 
1939 
1940 
1941 
1954 RD_EXPORT RD_DEPRECATED
1955 void rd_kafka_set_logger(rd_kafka_t *rk,
1956  void (*func) (const rd_kafka_t *rk, int level,
1957  const char *fac, const char *buf));
1958 
1959 
1967 RD_EXPORT
1968 void rd_kafka_set_log_level(rd_kafka_t *rk, int level);
1969 
1970 
1974 RD_EXPORT
1975 void rd_kafka_log_print(const rd_kafka_t *rk, int level,
1976  const char *fac, const char *buf);
1977 
1978 
1982 RD_EXPORT
1983 void rd_kafka_log_syslog(const rd_kafka_t *rk, int level,
1984  const char *fac, const char *buf);
1985 
1986 
1999 RD_EXPORT
2000 int rd_kafka_outq_len(rd_kafka_t *rk);
2001 
2002 
2003 
2010 RD_EXPORT
2011 void rd_kafka_dump(FILE *fp, rd_kafka_t *rk);
2012 
2013 
2014 
2020 RD_EXPORT
2021 int rd_kafka_thread_cnt(void);
2022 
2023 
2033 RD_EXPORT
2034 int rd_kafka_wait_destroyed(int timeout_ms);
2035 
2036 
2054 RD_EXPORT
2056 
2059 #ifdef __cplusplus
2060 }
2061 #endif
void * _private
Definition: rdkafka.h:382
rd_kafka_resp_err_t
Error codes.
Definition: rdkafka.h:186
rd_kafka_topic_t * rkt
Definition: rdkafka.h:498
Definition: rdkafka.h:262
RD_EXPORT int rd_kafka_consume_start(rd_kafka_topic_t *rkt, int32_t partition, int64_t offset)
Start consuming messages for topic rkt and partition at offset offset which may either be an absolute...
Definition: rdkafka.h:248
rd_kafka_resp_err_t err
Definition: rdkafka.h:497
rd_kafka_conf_res_t
Configuration result type.
Definition: rdkafka.h:565
RD_EXPORT char * rd_kafka_memberid(const rd_kafka_t *rk)
Returns this client's broker-assigned group member id.
int cnt
Definition: rdkafka.h:393
RD_EXPORT int rd_kafka_thread_cnt(void)
Retrieve the current number of threads in use by librdkafka.
RD_EXPORT void rd_kafka_conf_set_consume_cb(rd_kafka_conf_t *conf, void(*consume_cb)(rd_kafka_message_t *rkmessage, void *opaque))
Consumer: Set consume callback for use with rd_kafka_consumer_poll()
rd_kafka_topic_partition_t * elems
Definition: rdkafka.h:395
RD_EXPORT rd_kafka_resp_err_t rd_kafka_subscription(rd_kafka_t *rk, rd_kafka_topic_partition_list_t **topics)
Returns the current topic subscription.
RD_EXPORT void rd_kafka_conf_set_dr_msg_cb(rd_kafka_conf_t *conf, void(*dr_msg_cb)(rd_kafka_t *rk, const rd_kafka_message_t *rkmessage, void *opaque))
Producer: Set delivery report callback in provided conf object.
RD_EXPORT void rd_kafka_topic_partition_list_destroy(rd_kafka_topic_partition_list_t *rkparlist)
Free all resources used by the list and the list itself.
RD_EXPORT rd_kafka_conf_res_t rd_kafka_topic_conf_set(rd_kafka_topic_conf_t *conf, const char *name, const char *value, char *errstr, size_t errstr_size)
Sets a single rd_kafka_topic_conf_t value by property name.
RD_EXPORT rd_kafka_topic_partition_list_t * rd_kafka_topic_partition_list_copy(const rd_kafka_topic_partition_list_t *src)
Make a copy of an existing list.
RD_EXPORT void rd_kafka_topic_conf_set_partitioner_cb(rd_kafka_topic_conf_t *topic_conf, int32_t(*partitioner)(const rd_kafka_topic_t *rkt, const void *keydata, size_t keylen, int32_t partition_cnt, void *rkt_opaque, void *msg_opaque))
Producer: Set partitioner callback in provided topic conf object.
RD_EXPORT rd_kafka_resp_err_t rd_kafka_poll_set_consumer(rd_kafka_t *rk)
Redirect the main (rd_kafka_poll()) queue to the KafkaConsumer's queue (rd_kafka_consumer_poll()).
RD_EXPORT void rd_kafka_message_destroy(rd_kafka_message_t *rkmessage)
Frees resources for rkmessage and hands ownership back to rdkafka.
RD_EXPORT rd_kafka_conf_res_t rd_kafka_topic_conf_get(const rd_kafka_topic_conf_t *conf, const char *name, char *dest, size_t *dest_size)
Retrieve topic configuration value for property name.
Definition: rdkafka.h:222
Definition: rdkafka.h:236
size_t key_len
Definition: rdkafka.h:508
Definition: rdkafka.h:216
Definition: rdkafka.h:567
RD_EXPORT rd_kafka_t * rd_kafka_new(rd_kafka_type_t type, rd_kafka_conf_t *conf, char *errstr, size_t errstr_size)
Creates a new Kafka handle and starts its operation according to the specified type (RD_KAFKA_CONSUME...
Partition information.
Definition: rdkafka.h:1831
Definition: rdkafka.h:205
RD_EXPORT const char * rd_kafka_version_str(void)
Returns the librdkafka version as string.
RD_EXPORT void * rd_kafka_opaque(const rd_kafka_t *rk)
Retrieves the opaque pointer previously set with rd_kafka_conf_set_opaque()
Definition: rdkafka.h:212
RD_EXPORT rd_kafka_topic_partition_list_t * rd_kafka_topic_partition_list_new(int size)
Create a new list/vector Topic+Partition container.
RD_EXPORT rd_kafka_resp_err_t rd_kafka_commit_message(rd_kafka_t *rk, const rd_kafka_message_t *rkmessage, int async)
Commit message's offset on broker for the message's partition.
RD_EXPORT ssize_t rd_kafka_consume_batch(rd_kafka_topic_t *rkt, int32_t partition, int timeout_ms, rd_kafka_message_t **rkmessages, size_t rkmessages_size)
Consume up to rkmessages_size from topic rkt and partition putting a pointer to each message in the a...
RD_EXPORT void rd_kafka_topic_conf_set_opaque(rd_kafka_topic_conf_t *conf, void *opaque)
Sets the application's opaque pointer that will be passed to all topic callbacks as the rkt_opaque ar...
Definition: rdkafka.h:232
RD_EXPORT void rd_kafka_log_print(const rd_kafka_t *rk, int level, const char *fac, const char *buf)
Builtin (default) log sink: print to stderr.
int32_t rd_kafka_msg_partitioner_consistent(const rd_kafka_topic_t *rkt, const void *key, size_t keylen, int32_t partition_cnt, void *opaque, void *msg_opaque)
Consistent partitioner.
Definition: rdkafka.h:240
RD_EXPORT int rd_kafka_consume_callback_queue(rd_kafka_queue_t *rkqu, int timeout_ms, void(*consume_cb)(rd_kafka_message_t *rkmessage, void *opaque), void *opaque)
Consume multiple messages from queue with callback.
RD_EXPORT rd_kafka_conf_res_t rd_kafka_conf_set(rd_kafka_conf_t *conf, const char *name, const char *value, char *errstr, size_t errstr_size)
Sets a configuration property.
RD_EXPORT int rd_kafka_outq_len(rd_kafka_t *rk)
Returns the current out queue length.
RD_EXPORT rd_kafka_resp_err_t rd_kafka_offset_store(rd_kafka_topic_t *rkt, int32_t partition, int64_t offset)
Store offset offset for topic rkt partition partition.
RD_EXPORT void rd_kafka_conf_set_error_cb(rd_kafka_conf_t *conf, void(*error_cb)(rd_kafka_t *rk, int err, const char *reason, void *opaque))
Set error callback in provided conf object.
size_t len
Definition: rdkafka.h:503
RD_EXPORT rd_kafka_resp_err_t rd_kafka_commit(rd_kafka_t *rk, const rd_kafka_topic_partition_list_t *offsets, int async)
Commit offsets on broker for the provided list of partitions.
RD_EXPORT void rd_kafka_conf_set_default_topic_conf(rd_kafka_conf_t *conf, rd_kafka_topic_conf_t *tconf)
Definition: rdkafka.h:310
Definition: rdkafka.h:226
Definition: rdkafka.h:312
RD_EXPORT void rd_kafka_queue_destroy(rd_kafka_queue_t *rkqu)
Definition: rdkafka.h:244
RD_EXPORT void rd_kafka_destroy(rd_kafka_t *rk)
Destroy Kafka handle.
RD_EXPORT void rd_kafka_set_log_level(rd_kafka_t *rk, int level)
Specifies the maximum logging level produced by internal kafka logging and debugging.
RD_EXPORT int rd_kafka_consume_callback(rd_kafka_topic_t *rkt, int32_t partition, int timeout_ms, void(*consume_cb)(rd_kafka_message_t *rkmessage, void *opaque), void *opaque)
Consumes messages from topic rkt and partition, calling the provided callback for each consumed messs...
RD_EXPORT rd_kafka_resp_err_t rd_kafka_metadata(rd_kafka_t *rk, int all_topics, rd_kafka_topic_t *only_rkt, const struct rd_kafka_metadata **metadatap, int timeout_ms)
Request Metadata from broker.
RD_EXPORT int rd_kafka_wait_destroyed(int timeout_ms)
Wait for all rd_kafka_t objects to be destroyed.
Definition: rdkafka.h:296
Definition: rdkafka.h:284
int64_t offset
Definition: rdkafka.h:510
Definition: rdkafka.h:288
RD_EXPORT int rd_kafka_brokers_add(rd_kafka_t *rk, const char *brokerlist)
Adds one or more brokers to the kafka handle's list of initial bootstrap brokers. ...
Definition: rdkafka.h:270
Definition: rdkafka.h:201
Definition: rdkafka.h:260
RD_EXPORT void rd_kafka_log_syslog(const rd_kafka_t *rk, int level, const char *fac, const char *buf)
Builtin log sink: print to syslog.
Definition: rdkafka.h:189
void * key
Definition: rdkafka.h:506
RD_EXPORT rd_kafka_conf_res_t rd_kafka_conf_get(const rd_kafka_conf_t *conf, const char *name, char *dest, size_t *dest_size)
Retrieve configuration value for property name.
RD_EXPORT rd_kafka_topic_conf_t * rd_kafka_topic_conf_new(void)
Create topic configuration object.
RD_EXPORT void rd_kafka_dump(FILE *fp, rd_kafka_t *rk)
Dumps rdkafka's internal state for handle rk to stream fp.
Definition: rdkafka.h:214
A growable list of Topic+Partitions.
Definition: rdkafka.h:392
Topic information.
Definition: rdkafka.h:1844
RD_EXPORT void rd_kafka_conf_set_throttle_cb(rd_kafka_conf_t *conf, void(*throttle_cb)(rd_kafka_t *rk, const char *broker_name, int32_t broker_id, int throttle_time_ms, void *opaque))
Set throttle callback.
RD_EXPORT void rd_kafka_conf_set_log_cb(rd_kafka_conf_t *conf, void(*log_cb)(const rd_kafka_t *rk, int level, const char *fac, const char *buf))
Set logger callback.
Definition: rdkafka.h:203
int32_t partition
Definition: rdkafka.h:376
Definition: rdkafka.h:199
Definition: rdkafka.h:228
Definition: rdkafka.h:197
void * opaque
Definition: rdkafka.h:380
RD_EXPORT rd_kafka_resp_err_t rd_kafka_assign(rd_kafka_t *rk, const rd_kafka_topic_partition_list_t *partitions)
Atomic assignment of partitions to consume.
RD_EXPORT rd_kafka_conf_t * rd_kafka_conf_dup(const rd_kafka_conf_t *conf)
Creates a copy/duplicate of configuration object conf.
Definition: rdkafka.h:282
RD_EXPORT rd_kafka_conf_t * rd_kafka_conf_new(void)
Create configuration object.
RD_EXPORT int rd_kafka_consume_stop(rd_kafka_topic_t *rkt, int32_t partition)
Stop consuming messages for topic rkt and partition, purging all messages currently in the local queu...
RD_EXPORT int rd_kafka_version(void)
Returns the librdkafka version as integer.
int size
Definition: rdkafka.h:394
RD_EXPORT rd_kafka_resp_err_t rd_kafka_subscribe(rd_kafka_t *rk, const rd_kafka_topic_partition_list_t *topics)
Subscribe to topic set using balanced consumer groups.
RD_EXPORT void rd_kafka_conf_set_opaque(rd_kafka_conf_t *conf, void *opaque)
Sets the application's opaque pointer that will be passed to callbacks.
Definition: rdkafka.h:568
RD_EXPORT const char ** rd_kafka_conf_dump(rd_kafka_conf_t *conf, size_t *cntp)
Dump the configuration properties and values of conf to an array with "key", "value" pairs...
static __inline const char *RD_UNUSED rd_kafka_message_errstr(const rd_kafka_message_t *rkmessage)
Returns the error string for an errored rd_kafka_message_t or NULL if there was no error...
Definition: rdkafka.h:540
char * topic
Definition: rdkafka.h:375
RD_EXPORT const char * rd_kafka_get_debug_contexts(void)
Retrieve supported debug contexts for use with the "debug" configuration property. (runtime)
RD_EXPORT void rd_kafka_conf_set_offset_commit_cb(rd_kafka_conf_t *conf, void(*offset_commit_cb)(rd_kafka_t *rk, rd_kafka_resp_err_t err, rd_kafka_topic_partition_list_t *offsets, void *opaque))
Consumer: Set offset commit callback for use with consumer groups.
RD_EXPORT void rd_kafka_conf_set_rebalance_cb(rd_kafka_conf_t *conf, void(*rebalance_cb)(rd_kafka_t *rk, rd_kafka_resp_err_t err, rd_kafka_topic_partition_list_t *partitions, void *opaque))
Consumer: Set rebalance callback for use with coordinated consumer group balancing.
RD_EXPORT int rd_kafka_poll(rd_kafka_t *rk, int timeout_ms)
Polls the provided kafka handle for events.
RD_EXPORT int rd_kafka_topic_partition_available(const rd_kafka_topic_t *rkt, int32_t partition)
Check if partition is available (has a leader broker).
Definition: rdkafka.h:139
Definition: rdkafka.h:300
RD_EXPORT void rd_kafka_conf_set_socket_cb(rd_kafka_conf_t *conf, int(*socket_cb)(int domain, int type, int protocol, void *opaque))
Set socket callback.
Definition: rdkafka.h:140
RD_EXPORT rd_kafka_resp_err_t rd_kafka_errno2err(int errnox)
Converts the system errno value errnox to a rd_kafka_resp_err_t error code upon failure from the foll...
Definition: rdkafka.h:224
RD_EXPORT void rd_kafka_conf_set_dr_cb(rd_kafka_conf_t *conf, void(*dr_cb)(rd_kafka_t *rk, void *payload, size_t len, rd_kafka_resp_err_t err, void *opaque, void *msg_opaque))
Definition: rdkafka.h:220
RD_EXPORT rd_kafka_topic_conf_t * rd_kafka_topic_conf_dup(const rd_kafka_topic_conf_t *conf)
Creates a copy/duplicate of topic configuration object conf.
Definition: rdkafka.h:264
RD_EXPORT RD_DEPRECATED void rd_kafka_set_logger(rd_kafka_t *rk, void(*func)(const rd_kafka_t *rk, int level, const char *fac, const char *buf))
Set logger function.
Definition: rdkafka.h:195
Metadata container.
Definition: rdkafka.h:1855
RD_EXPORT int rd_kafka_produce(rd_kafka_topic_t *rkt, int32_t partitition, int msgflags, void *payload, size_t len, const void *key, size_t keylen, void *msg_opaque)
Produce and send a single message to broker.
Definition: rdkafka.h:254
RD_EXPORT rd_kafka_resp_err_t rd_kafka_unsubscribe(rd_kafka_t *rk)
Unsubscribe from the current subscriptions et.
RD_EXPORT void rd_kafka_metadata_destroy(const struct rd_kafka_metadata *metadata)
Release metadata memory.
RD_EXPORT int rd_kafka_produce_batch(rd_kafka_topic_t *rkt, int32_t partition, int msgflags, rd_kafka_message_t *rkmessages, int message_cnt)
Produce multiple messages.
RD_EXPORT void rd_kafka_conf_set_open_cb(rd_kafka_conf_t *conf, int(*open_cb)(const char *pathname, int flags, mode_t mode, void *opaque))
Set open callback.
Definition: rdkafka.h:246
RD_EXPORT rd_kafka_resp_err_t rd_kafka_seek(rd_kafka_topic_t *rkt, int32_t partition, int64_t offset, int timeout_ms)
Seek consumer for topic+partition to offset which is either an absolute or logical offset...
RD_EXPORT void rd_kafka_topic_partition_list_add_range(rd_kafka_topic_partition_list_t *rktparlist, const char *topic, int32_t start, int32_t stop)
Add range of partitions from start to stop inclusive.
Definition: rdkafka.h:566
RD_EXPORT rd_kafka_topic_partition_t * rd_kafka_topic_partition_list_add(rd_kafka_topic_partition_list_t *rktparlist, const char *topic, int32_t partition)
Add topic+partition to list.
Definition: rdkafka.h:218
Definition: rdkafka.h:242
RD_EXPORT ssize_t rd_kafka_consume_batch_queue(rd_kafka_queue_t *rkqu, int timeout_ms, rd_kafka_message_t **rkmessages, size_t rkmessages_size)
Consume batch of messages from queue.
RD_EXPORT void * rd_kafka_topic_opaque(const rd_kafka_topic_t *rkt)
Get the rkt_opaque pointer that was set in the topic configuration.
RD_EXPORT const char * rd_kafka_name(const rd_kafka_t *rk)
Returns Kafka handle name.
A Kafka message as returned by the rd_kafka_consume*() family of functions.
Definition: rdkafka.h:496
Definition: rdkafka.h:230
RD_EXPORT void rd_kafka_topic_destroy(rd_kafka_topic_t *rkt)
Destroy topic handle previously created with rd_kafka_topic_new().
int32_t partition
Definition: rdkafka.h:499
Definition: rdkafka.h:256
RD_EXPORT rd_kafka_message_t * rd_kafka_consume(rd_kafka_topic_t *rkt, int32_t partition, int timeout_ms)
Consume a single message from topic rkt and partition.
Definition: rdkafka.h:234
RD_EXPORT rd_kafka_message_t * rd_kafka_consumer_poll(rd_kafka_t *rk, int timeout_ms)
Poll the consumer for messages or events.
RD_EXPORT rd_kafka_resp_err_t rd_kafka_assignment(rd_kafka_t *rk, rd_kafka_topic_partition_list_t **partitions)
Returns the current partition assignment.
Definition: rdkafka.h:210
RD_EXPORT const char * rd_kafka_err2str(rd_kafka_resp_err_t err)
Returns a human readable representation of a kafka error.
void * metadata
Definition: rdkafka.h:378
int64_t offset
Definition: rdkafka.h:377
Definition: rdkafka.h:193
RD_EXPORT void rd_kafka_conf_destroy(rd_kafka_conf_t *conf)
Destroys a conf object.
Definition: rdkafka.h:252
Broker information.
Definition: rdkafka.h:1822
Definition: rdkafka.h:306
rd_kafka_resp_err_t err
Definition: rdkafka.h:381
Definition: rdkafka.h:250
Topic+Partition place holder.
Definition: rdkafka.h:374
Definition: rdkafka.h:191
size_t metadata_size
Definition: rdkafka.h:379
RD_EXPORT const char ** rd_kafka_topic_conf_dump(rd_kafka_topic_conf_t *conf, size_t *cntp)
Dump the topic configuration properties and values of conf to an array with "key", "value" pairs.
rd_kafka_type_t
rd_kafka_t handle type.
Definition: rdkafka.h:138
RD_EXPORT int32_t rd_kafka_msg_partitioner_random(const rd_kafka_topic_t *rkt, const void *key, size_t keylen, int32_t partition_cnt, void *opaque, void *msg_opaque)
Random partitioner.
RD_EXPORT void rd_kafka_conf_set_stats_cb(rd_kafka_conf_t *conf, int(*stats_cb)(rd_kafka_t *rk, char *json, size_t json_len, void *opaque))
Set statistics callback in provided conf object.
Definition: rdkafka.h:276
RD_EXPORT void rd_kafka_topic_conf_destroy(rd_kafka_topic_conf_t *topic_conf)
Destroys a topic conf object.
void * payload
Definition: rdkafka.h:500
RD_EXPORT void rd_kafka_yield(rd_kafka_t *rk)
Cancels the current callback dispatcher (rd_kafka_poll(), rd_kafka_consume_callback(), etc).
Definition: rdkafka.h:266
RD_EXPORT rd_kafka_resp_err_t rd_kafka_consumer_close(rd_kafka_t *rk)
Close down the KafkaConsumer.
RD_EXPORT void rd_kafka_conf_properties_show(FILE *fp)
Prints a table to fp of all supported configuration properties, their default values as well as a des...
RD_EXPORT void rd_kafka_conf_dump_free(const char **arr, size_t cnt)
Frees a configuration dump returned from rd_kafka_conf_dump() or `rd_kafka_topic_conf_dump().
void * _private
Definition: rdkafka.h:520
Definition: rdkafka.h:208
RD_EXPORT const char * rd_kafka_topic_name(const rd_kafka_topic_t *rkt)
Returns the topic name.
RD_EXPORT rd_kafka_message_t * rd_kafka_consume_queue(rd_kafka_queue_t *rkqu, int timeout_ms)
Consume from queue.
RD_EXPORT int rd_kafka_consume_start_queue(rd_kafka_topic_t *rkt, int32_t partition, int64_t offset, rd_kafka_queue_t *rkqu)
Same as rd_kafka_consume_start() but re-routes incoming messages to the provided queue rkqu (which mu...
RD_EXPORT rd_kafka_queue_t * rd_kafka_queue_new(rd_kafka_t *rk)
Create a new message queue.
RD_EXPORT rd_kafka_topic_t * rd_kafka_topic_new(rd_kafka_t *rk, const char *topic, rd_kafka_topic_conf_t *conf)
Creates a new topic handle for topic named topic.