librdkafka
The Apache Kafka C/C++ client library
 All Data Structures Files Functions Variables Typedefs Enumerations Enumerator Macros Pages
rdkafka.h
Go to the documentation of this file.
1 /*
2  * librdkafka - Apache Kafka C library
3  *
4  * Copyright (c) 2012-2013 Magnus Edenhill
5  * All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions are met:
9  *
10  * 1. Redistributions of source code must retain the above copyright notice,
11  * this list of conditions and the following disclaimer.
12  * 2. Redistributions in binary form must reproduce the above copyright notice,
13  * this list of conditions and the following disclaimer in the documentation
14  * and/or other materials provided with the distribution.
15  *
16  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26  * POSSIBILITY OF SUCH DAMAGE.
27  */
28 
43 /* @cond NO_DOC */
44 #ifndef _RDKAFKA_H_
45 #define _RDKAFKA_H_
46 
47 #include <stdio.h>
48 #include <inttypes.h>
49 #include <sys/types.h>
50 
51 #ifdef __cplusplus
52 extern "C" {
53 #if 0
54 } /* Restore indent */
55 #endif
56 #endif
57 
58 #ifdef _MSC_VER
59 #include <basetsd.h>
60 #ifndef WIN32_MEAN_AND_LEAN
61 #define WIN32_MEAN_AND_LEAN
62 #endif
63 #include <Winsock2.h> /* for sockaddr, .. */
64 typedef SSIZE_T ssize_t;
65 #define RD_UNUSED
66 #define RD_INLINE __inline
67 #define RD_DEPRECATED __declspec(deprecated)
68 #undef RD_EXPORT
69 #ifdef LIBRDKAFKA_STATICLIB
70 #define RD_EXPORT
71 #else
72 #ifdef LIBRDKAFKA_EXPORTS
73 #define RD_EXPORT __declspec(dllexport)
74 #else
75 #define RD_EXPORT __declspec(dllimport)
76 #endif
77 #ifndef LIBRDKAFKA_TYPECHECKS
78 #define LIBRDKAFKA_TYPECHECKS 0
79 #endif
80 #endif
81 
82 #else
83 #include <sys/socket.h> /* for sockaddr, .. */
84 
85 #define RD_UNUSED __attribute__((unused))
86 #define RD_INLINE inline
87 #define RD_EXPORT
88 #define RD_DEPRECATED __attribute__((deprecated))
89 
90 #ifndef LIBRDKAFKA_TYPECHECKS
91 #define LIBRDKAFKA_TYPECHECKS 1
92 #endif
93 #endif
94 
95 
101 #if LIBRDKAFKA_TYPECHECKS
102 #define _LRK_TYPECHECK(RET,TYPE,ARG) \
103  ({ if (0) { TYPE __t RD_UNUSED = (ARG); } RET; })
104 
105 #define _LRK_TYPECHECK2(RET,TYPE,ARG,TYPE2,ARG2) \
106  ({ \
107  if (0) { \
108  TYPE __t RD_UNUSED = (ARG); \
109  TYPE2 __t2 RD_UNUSED = (ARG2); \
110  } \
111  RET; })
112 
113 #define _LRK_TYPECHECK3(RET,TYPE,ARG,TYPE2,ARG2,TYPE3,ARG3) \
114  ({ \
115  if (0) { \
116  TYPE __t RD_UNUSED = (ARG); \
117  TYPE2 __t2 RD_UNUSED = (ARG2); \
118  TYPE3 __t3 RD_UNUSED = (ARG3); \
119  } \
120  RET; })
121 #else
122 #define _LRK_TYPECHECK(RET,TYPE,ARG) (RET)
123 #define _LRK_TYPECHECK2(RET,TYPE,ARG,TYPE2,ARG2) (RET)
124 #define _LRK_TYPECHECK3(RET,TYPE,ARG,TYPE2,ARG2,TYPE3,ARG3) (RET)
125 #endif
126 
127 /* @endcond */
128 
129 
151 #define RD_KAFKA_VERSION 0x000b04ff
152 
161 RD_EXPORT
162 int rd_kafka_version(void);
163 
169 RD_EXPORT
170 const char *rd_kafka_version_str (void);
171 
190 typedef enum rd_kafka_type_t {
194 
195 
206 
207 
208 
215 RD_EXPORT
216 const char *rd_kafka_get_debug_contexts(void);
217 
225 #define RD_KAFKA_DEBUG_CONTEXTS \
226  "all,generic,broker,topic,metadata,feature,queue,msg,protocol,cgrp,security,fetch,interceptor,plugin,consumer"
227 
228 
229 /* @cond NO_DOC */
230 /* Private types to provide ABI compatibility */
231 typedef struct rd_kafka_s rd_kafka_t;
232 typedef struct rd_kafka_topic_s rd_kafka_topic_t;
233 typedef struct rd_kafka_conf_s rd_kafka_conf_t;
234 typedef struct rd_kafka_topic_conf_s rd_kafka_topic_conf_t;
235 typedef struct rd_kafka_queue_s rd_kafka_queue_t;
236 /* @endcond */
237 
238 
251 typedef enum {
252  /* Internal errors to rdkafka: */
346 
349 
350  /* Kafka broker errors: */
470 
471  RD_KAFKA_RESP_ERR_END_ALL,
473 
474 
482  const char *name;
483  const char *desc;
484 };
485 
486 
490 RD_EXPORT
491 void rd_kafka_get_err_descs (const struct rd_kafka_err_desc **errdescs,
492  size_t *cntp);
493 
494 
495 
496 
502 RD_EXPORT
503 const char *rd_kafka_err2str (rd_kafka_resp_err_t err);
504 
505 
506 
512 RD_EXPORT
513 const char *rd_kafka_err2name (rd_kafka_resp_err_t err);
514 
515 
541 RD_EXPORT
543 
544 
569 RD_EXPORT RD_DEPRECATED
571 
572 
585 RD_EXPORT RD_DEPRECATED
586 int rd_kafka_errno (void);
587 
588 
589 
605 typedef struct rd_kafka_topic_partition_s {
606  char *topic;
607  int32_t partition;
608  int64_t offset;
609  void *metadata;
610  size_t metadata_size;
611  void *opaque;
613  void *_private;
616 
617 
622 RD_EXPORT
624 
625 
630 typedef struct rd_kafka_topic_partition_list_s {
631  int cnt;
632  int size;
635 
636 
651 RD_EXPORT
653 
654 
658 RD_EXPORT
659 void
661 
671 RD_EXPORT
674  const char *topic, int32_t partition);
675 
676 
685 RD_EXPORT
686 void
688  *rktparlist,
689  const char *topic,
690  int32_t start, int32_t stop);
691 
692 
693 
705 RD_EXPORT
706 int
708  const char *topic, int32_t partition);
709 
710 
718 RD_EXPORT
719 int
722  int idx);
723 
724 
732 RD_EXPORT
735 
736 
737 
738 
746 RD_EXPORT
749  const char *topic, int32_t partition, int64_t offset);
750 
751 
752 
758 RD_EXPORT
761  const char *topic, int32_t partition);
762 
763 
771 RD_EXPORT void
773  int (*cmp) (const void *a, const void *b,
774  void *opaque),
775  void *opaque);
776 
777 
795 typedef enum rd_kafka_vtype_t {
809 
810 
819 #define RD_KAFKA_V_END RD_KAFKA_VTYPE_END
820 
824 #define RD_KAFKA_V_TOPIC(topic) \
825  _LRK_TYPECHECK(RD_KAFKA_VTYPE_TOPIC, const char *, topic), \
826  (const char *)topic
827 
830 #define RD_KAFKA_V_RKT(rkt) \
831  _LRK_TYPECHECK(RD_KAFKA_VTYPE_RKT, rd_kafka_topic_t *, rkt), \
832  (rd_kafka_topic_t *)rkt
833 
836 #define RD_KAFKA_V_PARTITION(partition) \
837  _LRK_TYPECHECK(RD_KAFKA_VTYPE_PARTITION, int32_t, partition), \
838  (int32_t)partition
839 
842 #define RD_KAFKA_V_VALUE(VALUE,LEN) \
843  _LRK_TYPECHECK2(RD_KAFKA_VTYPE_VALUE, void *, VALUE, size_t, LEN), \
844  (void *)VALUE, (size_t)LEN
845 
848 #define RD_KAFKA_V_KEY(KEY,LEN) \
849  _LRK_TYPECHECK2(RD_KAFKA_VTYPE_KEY, const void *, KEY, size_t, LEN), \
850  (void *)KEY, (size_t)LEN
851 
855 #define RD_KAFKA_V_OPAQUE(opaque) \
856  _LRK_TYPECHECK(RD_KAFKA_VTYPE_OPAQUE, void *, opaque), \
857  (void *)opaque
858 
862 #define RD_KAFKA_V_MSGFLAGS(msgflags) \
863  _LRK_TYPECHECK(RD_KAFKA_VTYPE_MSGFLAGS, int, msgflags), \
864  (int)msgflags
865 
868 #define RD_KAFKA_V_TIMESTAMP(timestamp) \
869  _LRK_TYPECHECK(RD_KAFKA_VTYPE_TIMESTAMP, int64_t, timestamp), \
870  (int64_t)timestamp
871 
877 #define RD_KAFKA_V_HEADER(NAME,VALUE,LEN) \
878  _LRK_TYPECHECK3(RD_KAFKA_VTYPE_HEADER, const char *, NAME, \
879  const void *, VALUE, ssize_t, LEN), \
880  (const char *)NAME, (const void *)VALUE, (ssize_t)LEN
881 
891 #define RD_KAFKA_V_HEADERS(HDRS) \
892  _LRK_TYPECHECK(RD_KAFKA_VTYPE_HEADERS, rd_kafka_headers_t *, HDRS), \
893  (rd_kafka_headers_t *)HDRS
894 
895 
918 typedef struct rd_kafka_headers_s rd_kafka_headers_t;
919 
927 RD_EXPORT rd_kafka_headers_t *rd_kafka_headers_new (size_t initial_count);
928 
933 RD_EXPORT void rd_kafka_headers_destroy (rd_kafka_headers_t *hdrs);
934 
938 RD_EXPORT rd_kafka_headers_t *
939 rd_kafka_headers_copy (const rd_kafka_headers_t *src);
940 
957 RD_EXPORT rd_kafka_resp_err_t
958 rd_kafka_header_add (rd_kafka_headers_t *hdrs,
959  const char *name, ssize_t name_size,
960  const void *value, ssize_t value_size);
961 
969 RD_EXPORT rd_kafka_resp_err_t
970 rd_kafka_header_remove (rd_kafka_headers_t *hdrs, const char *name);
971 
972 
989 RD_EXPORT rd_kafka_resp_err_t
990 rd_kafka_header_get_last (const rd_kafka_headers_t *hdrs,
991  const char *name, const void **valuep, size_t *sizep);
992 
1006 RD_EXPORT rd_kafka_resp_err_t
1007 rd_kafka_header_get (const rd_kafka_headers_t *hdrs, size_t idx,
1008  const char *name, const void **valuep, size_t *sizep);
1009 
1010 
1018 RD_EXPORT rd_kafka_resp_err_t
1019 rd_kafka_header_get_all (const rd_kafka_headers_t *hdrs, size_t idx,
1020  const char **namep,
1021  const void **valuep, size_t *sizep);
1022 
1023 
1024 
1037 // FIXME: This doesn't show up in docs for some reason
1038 // "Compound rd_kafka_message_t is not documented."
1039 
1053 typedef struct rd_kafka_message_s {
1055  rd_kafka_topic_t *rkt;
1056  int32_t partition;
1057  void *payload;
1061  size_t len;
1064  void *key;
1066  size_t key_len;
1068  int64_t offset;
1078  void *_private;
1083 
1084 
1088 RD_EXPORT
1090 
1091 
1092 
1093 
1100 static RD_INLINE const char *
1101 RD_UNUSED
1103  if (!rkmessage->err)
1104  return NULL;
1105 
1106  if (rkmessage->payload)
1107  return (const char *)rkmessage->payload;
1108 
1109  return rd_kafka_err2str(rkmessage->err);
1110 }
1111 
1112 
1113 
1125 RD_EXPORT
1126 int64_t rd_kafka_message_timestamp (const rd_kafka_message_t *rkmessage,
1127  rd_kafka_timestamp_type_t *tstype);
1128 
1129 
1130 
1137 RD_EXPORT
1138 int64_t rd_kafka_message_latency (const rd_kafka_message_t *rkmessage);
1139 
1140 
1157 RD_EXPORT rd_kafka_resp_err_t
1159  rd_kafka_headers_t **hdrsp);
1160 
1172 RD_EXPORT rd_kafka_resp_err_t
1174  rd_kafka_headers_t **hdrsp);
1175 
1176 
1188 RD_EXPORT
1190  rd_kafka_headers_t *hdrs);
1191 
1192 
1198 RD_EXPORT size_t rd_kafka_header_cnt (const rd_kafka_headers_t *hdrs);
1199 
1200 
1216 typedef enum {
1221 
1222 
1253 RD_EXPORT
1254 rd_kafka_conf_t *rd_kafka_conf_new(void);
1255 
1256 
1260 RD_EXPORT
1261 void rd_kafka_conf_destroy(rd_kafka_conf_t *conf);
1262 
1263 
1270 RD_EXPORT
1271 rd_kafka_conf_t *rd_kafka_conf_dup(const rd_kafka_conf_t *conf);
1272 
1273 
1278 RD_EXPORT
1279 rd_kafka_conf_t *rd_kafka_conf_dup_filter (const rd_kafka_conf_t *conf,
1280  size_t filter_cnt,
1281  const char **filter);
1282 
1283 
1284 
1301 RD_EXPORT
1302 rd_kafka_conf_res_t rd_kafka_conf_set(rd_kafka_conf_t *conf,
1303  const char *name,
1304  const char *value,
1305  char *errstr, size_t errstr_size);
1306 
1307 
1313 RD_EXPORT
1314 void rd_kafka_conf_set_events(rd_kafka_conf_t *conf, int events);
1315 
1316 
1320 RD_EXPORT
1321 void rd_kafka_conf_set_dr_cb(rd_kafka_conf_t *conf,
1322  void (*dr_cb) (rd_kafka_t *rk,
1323  void *payload, size_t len,
1324  rd_kafka_resp_err_t err,
1325  void *opaque, void *msg_opaque));
1326 
1341 RD_EXPORT
1342 void rd_kafka_conf_set_dr_msg_cb(rd_kafka_conf_t *conf,
1343  void (*dr_msg_cb) (rd_kafka_t *rk,
1344  const rd_kafka_message_t *
1345  rkmessage,
1346  void *opaque));
1347 
1348 
1353 RD_EXPORT
1354 void rd_kafka_conf_set_consume_cb (rd_kafka_conf_t *conf,
1355  void (*consume_cb) (rd_kafka_message_t *
1356  rkmessage,
1357  void *opaque));
1358 
1418 RD_EXPORT
1420  rd_kafka_conf_t *conf,
1421  void (*rebalance_cb) (rd_kafka_t *rk,
1422  rd_kafka_resp_err_t err,
1423  rd_kafka_topic_partition_list_t *partitions,
1424  void *opaque));
1425 
1426 
1427 
1442 RD_EXPORT
1444  rd_kafka_conf_t *conf,
1445  void (*offset_commit_cb) (rd_kafka_t *rk,
1446  rd_kafka_resp_err_t err,
1448  void *opaque));
1449 
1450 
1459 RD_EXPORT
1460 void rd_kafka_conf_set_error_cb(rd_kafka_conf_t *conf,
1461  void (*error_cb) (rd_kafka_t *rk, int err,
1462  const char *reason,
1463  void *opaque));
1464 
1479 RD_EXPORT
1480 void rd_kafka_conf_set_throttle_cb (rd_kafka_conf_t *conf,
1481  void (*throttle_cb) (
1482  rd_kafka_t *rk,
1483  const char *broker_name,
1484  int32_t broker_id,
1485  int throttle_time_ms,
1486  void *opaque));
1487 
1488 
1505 RD_EXPORT
1506 void rd_kafka_conf_set_log_cb(rd_kafka_conf_t *conf,
1507  void (*log_cb) (const rd_kafka_t *rk, int level,
1508  const char *fac, const char *buf));
1509 
1510 
1527 RD_EXPORT
1528 void rd_kafka_conf_set_stats_cb(rd_kafka_conf_t *conf,
1529  int (*stats_cb) (rd_kafka_t *rk,
1530  char *json,
1531  size_t json_len,
1532  void *opaque));
1533 
1534 
1535 
1550 RD_EXPORT
1551 void rd_kafka_conf_set_socket_cb(rd_kafka_conf_t *conf,
1552  int (*socket_cb) (int domain, int type,
1553  int protocol,
1554  void *opaque));
1555 
1556 
1557 
1570 RD_EXPORT void
1571 rd_kafka_conf_set_connect_cb (rd_kafka_conf_t *conf,
1572  int (*connect_cb) (int sockfd,
1573  const struct sockaddr *addr,
1574  int addrlen,
1575  const char *id,
1576  void *opaque));
1577 
1585 RD_EXPORT void
1586 rd_kafka_conf_set_closesocket_cb (rd_kafka_conf_t *conf,
1587  int (*closesocket_cb) (int sockfd,
1588  void *opaque));
1589 
1590 
1591 
1592 #ifndef _MSC_VER
1593 
1607 RD_EXPORT
1608 void rd_kafka_conf_set_open_cb (rd_kafka_conf_t *conf,
1609  int (*open_cb) (const char *pathname,
1610  int flags, mode_t mode,
1611  void *opaque));
1612 #endif
1613 
1617 RD_EXPORT
1618 void rd_kafka_conf_set_opaque(rd_kafka_conf_t *conf, void *opaque);
1619 
1623 RD_EXPORT
1624 void *rd_kafka_opaque(const rd_kafka_t *rk);
1625 
1626 
1627 
1633 RD_EXPORT
1634 void rd_kafka_conf_set_default_topic_conf (rd_kafka_conf_t *conf,
1635  rd_kafka_topic_conf_t *tconf);
1636 
1637 
1638 
1658 RD_EXPORT
1659 rd_kafka_conf_res_t rd_kafka_conf_get (const rd_kafka_conf_t *conf,
1660  const char *name,
1661  char *dest, size_t *dest_size);
1662 
1663 
1669 RD_EXPORT
1670 rd_kafka_conf_res_t rd_kafka_topic_conf_get (const rd_kafka_topic_conf_t *conf,
1671  const char *name,
1672  char *dest, size_t *dest_size);
1673 
1674 
1683 RD_EXPORT
1684 const char **rd_kafka_conf_dump(rd_kafka_conf_t *conf, size_t *cntp);
1685 
1686 
1695 RD_EXPORT
1696 const char **rd_kafka_topic_conf_dump(rd_kafka_topic_conf_t *conf,
1697  size_t *cntp);
1698 
1703 RD_EXPORT
1704 void rd_kafka_conf_dump_free(const char **arr, size_t cnt);
1705 
1710 RD_EXPORT
1711 void rd_kafka_conf_properties_show(FILE *fp);
1712 
1730 RD_EXPORT
1731 rd_kafka_topic_conf_t *rd_kafka_topic_conf_new(void);
1732 
1733 
1737 RD_EXPORT
1738 rd_kafka_topic_conf_t *rd_kafka_topic_conf_dup(const rd_kafka_topic_conf_t
1739  *conf);
1740 
1745 RD_EXPORT
1746 rd_kafka_topic_conf_t *rd_kafka_default_topic_conf_dup (rd_kafka_t *rk);
1747 
1748 
1752 RD_EXPORT
1753 void rd_kafka_topic_conf_destroy(rd_kafka_topic_conf_t *topic_conf);
1754 
1755 
1764 RD_EXPORT
1765 rd_kafka_conf_res_t rd_kafka_topic_conf_set(rd_kafka_topic_conf_t *conf,
1766  const char *name,
1767  const char *value,
1768  char *errstr, size_t errstr_size);
1769 
1774 RD_EXPORT
1775 void rd_kafka_topic_conf_set_opaque(rd_kafka_topic_conf_t *conf, void *opaque);
1776 
1777 
1792 RD_EXPORT
1793 void
1794 rd_kafka_topic_conf_set_partitioner_cb (rd_kafka_topic_conf_t *topic_conf,
1795  int32_t (*partitioner) (
1796  const rd_kafka_topic_t *rkt,
1797  const void *keydata,
1798  size_t keylen,
1799  int32_t partition_cnt,
1800  void *rkt_opaque,
1801  void *msg_opaque));
1802 
1803 
1829 RD_EXPORT void
1830 rd_kafka_topic_conf_set_msg_order_cmp (rd_kafka_topic_conf_t *topic_conf,
1831  int (*msg_order_cmp) (
1832  const rd_kafka_message_t *a,
1833  const rd_kafka_message_t *b));
1834 
1835 
1843 RD_EXPORT
1844 int rd_kafka_topic_partition_available(const rd_kafka_topic_t *rkt,
1845  int32_t partition);
1846 
1847 
1848 /*******************************************************************
1849  * *
1850  * Partitioners provided by rdkafka *
1851  * *
1852  *******************************************************************/
1853 
1862 RD_EXPORT
1863 int32_t rd_kafka_msg_partitioner_random(const rd_kafka_topic_t *rkt,
1864  const void *key, size_t keylen,
1865  int32_t partition_cnt,
1866  void *opaque, void *msg_opaque);
1867 
1876 RD_EXPORT
1877 int32_t rd_kafka_msg_partitioner_consistent (const rd_kafka_topic_t *rkt,
1878  const void *key, size_t keylen,
1879  int32_t partition_cnt,
1880  void *opaque, void *msg_opaque);
1881 
1892 RD_EXPORT
1893 int32_t rd_kafka_msg_partitioner_consistent_random (const rd_kafka_topic_t *rkt,
1894  const void *key, size_t keylen,
1895  int32_t partition_cnt,
1896  void *opaque, void *msg_opaque);
1897 
1898 
1907 RD_EXPORT
1908 int32_t rd_kafka_msg_partitioner_murmur2 (const rd_kafka_topic_t *rkt,
1909  const void *key, size_t keylen,
1910  int32_t partition_cnt,
1911  void *rkt_opaque,
1912  void *msg_opaque);
1913 
1923 RD_EXPORT
1924 int32_t rd_kafka_msg_partitioner_murmur2_random (const rd_kafka_topic_t *rkt,
1925  const void *key, size_t keylen,
1926  int32_t partition_cnt,
1927  void *rkt_opaque,
1928  void *msg_opaque);
1929 
1930 
1971 RD_EXPORT
1972 rd_kafka_t *rd_kafka_new(rd_kafka_type_t type, rd_kafka_conf_t *conf,
1973  char *errstr, size_t errstr_size);
1974 
1975 
1981 RD_EXPORT
1982 void rd_kafka_destroy(rd_kafka_t *rk);
1983 
1984 
1985 
1989 RD_EXPORT
1990 const char *rd_kafka_name(const rd_kafka_t *rk);
1991 
1992 
1996 RD_EXPORT
1997 rd_kafka_type_t rd_kafka_type(const rd_kafka_t *rk);
1998 
1999 
2010 RD_EXPORT
2011 char *rd_kafka_memberid (const rd_kafka_t *rk);
2012 
2013 
2014 
2032 RD_EXPORT
2033 char *rd_kafka_clusterid (rd_kafka_t *rk, int timeout_ms);
2034 
2035 
2057 RD_EXPORT
2058 rd_kafka_topic_t *rd_kafka_topic_new(rd_kafka_t *rk, const char *topic,
2059  rd_kafka_topic_conf_t *conf);
2060 
2061 
2062 
2071 RD_EXPORT
2072 void rd_kafka_topic_destroy(rd_kafka_topic_t *rkt);
2073 
2074 
2078 RD_EXPORT
2079 const char *rd_kafka_topic_name(const rd_kafka_topic_t *rkt);
2080 
2081 
2085 RD_EXPORT
2086 void *rd_kafka_topic_opaque (const rd_kafka_topic_t *rkt);
2087 
2088 
2095 #define RD_KAFKA_PARTITION_UA ((int32_t)-1)
2096 
2097 
2119 RD_EXPORT
2120 int rd_kafka_poll(rd_kafka_t *rk, int timeout_ms);
2121 
2122 
2133 RD_EXPORT
2134 void rd_kafka_yield (rd_kafka_t *rk);
2135 
2136 
2137 
2138 
2146 RD_EXPORT rd_kafka_resp_err_t
2147 rd_kafka_pause_partitions (rd_kafka_t *rk,
2148  rd_kafka_topic_partition_list_t *partitions);
2149 
2150 
2151 
2159 RD_EXPORT rd_kafka_resp_err_t
2160 rd_kafka_resume_partitions (rd_kafka_t *rk,
2161  rd_kafka_topic_partition_list_t *partitions);
2162 
2163 
2164 
2165 
2174 RD_EXPORT rd_kafka_resp_err_t
2175 rd_kafka_query_watermark_offsets (rd_kafka_t *rk,
2176  const char *topic, int32_t partition,
2177  int64_t *low, int64_t *high, int timeout_ms);
2178 
2179 
2196 RD_EXPORT rd_kafka_resp_err_t
2197 rd_kafka_get_watermark_offsets (rd_kafka_t *rk,
2198  const char *topic, int32_t partition,
2199  int64_t *low, int64_t *high);
2200 
2201 
2202 
2227 RD_EXPORT rd_kafka_resp_err_t
2228 rd_kafka_offsets_for_times (rd_kafka_t *rk,
2230  int timeout_ms);
2231 
2232 
2246 RD_EXPORT
2247 void rd_kafka_mem_free (rd_kafka_t *rk, void *ptr);
2248 
2249 
2273 RD_EXPORT
2274 rd_kafka_queue_t *rd_kafka_queue_new(rd_kafka_t *rk);
2275 
2279 RD_EXPORT
2280 void rd_kafka_queue_destroy(rd_kafka_queue_t *rkqu);
2281 
2282 
2289 RD_EXPORT
2290 rd_kafka_queue_t *rd_kafka_queue_get_main (rd_kafka_t *rk);
2291 
2292 
2302 RD_EXPORT
2303 rd_kafka_queue_t *rd_kafka_queue_get_consumer (rd_kafka_t *rk);
2304 
2315 RD_EXPORT
2316 rd_kafka_queue_t *rd_kafka_queue_get_partition (rd_kafka_t *rk,
2317  const char *topic,
2318  int32_t partition);
2319 
2330 RD_EXPORT
2331 void rd_kafka_queue_forward (rd_kafka_queue_t *src, rd_kafka_queue_t *dst);
2332 
2349 RD_EXPORT
2351  rd_kafka_queue_t *rkqu);
2352 
2353 
2357 RD_EXPORT
2358 size_t rd_kafka_queue_length (rd_kafka_queue_t *rkqu);
2359 
2360 
2376 RD_EXPORT
2377 void rd_kafka_queue_io_event_enable (rd_kafka_queue_t *rkqu, int fd,
2378  const void *payload, size_t size);
2379 
2390 #define RD_KAFKA_OFFSET_BEGINNING -2
2392 #define RD_KAFKA_OFFSET_END -1
2394 #define RD_KAFKA_OFFSET_STORED -1000
2396 #define RD_KAFKA_OFFSET_INVALID -1001
2400 #define RD_KAFKA_OFFSET_TAIL_BASE -2000 /* internal: do not use */
2401 
2408 #define RD_KAFKA_OFFSET_TAIL(CNT) (RD_KAFKA_OFFSET_TAIL_BASE - (CNT))
2409 
2443 RD_EXPORT
2444 int rd_kafka_consume_start(rd_kafka_topic_t *rkt, int32_t partition,
2445  int64_t offset);
2446 
2461 RD_EXPORT
2462 int rd_kafka_consume_start_queue(rd_kafka_topic_t *rkt, int32_t partition,
2463  int64_t offset, rd_kafka_queue_t *rkqu);
2464 
2478 RD_EXPORT
2479 int rd_kafka_consume_stop(rd_kafka_topic_t *rkt, int32_t partition);
2480 
2481 
2482 
2497 RD_EXPORT
2498 rd_kafka_resp_err_t rd_kafka_seek (rd_kafka_topic_t *rkt,
2499  int32_t partition,
2500  int64_t offset,
2501  int timeout_ms);
2502 
2503 
2528 RD_EXPORT
2529 rd_kafka_message_t *rd_kafka_consume(rd_kafka_topic_t *rkt, int32_t partition,
2530  int timeout_ms);
2531 
2532 
2533 
2559 RD_EXPORT
2560 ssize_t rd_kafka_consume_batch(rd_kafka_topic_t *rkt, int32_t partition,
2561  int timeout_ms,
2562  rd_kafka_message_t **rkmessages,
2563  size_t rkmessages_size);
2564 
2565 
2566 
2590 RD_EXPORT
2591 int rd_kafka_consume_callback(rd_kafka_topic_t *rkt, int32_t partition,
2592  int timeout_ms,
2593  void (*consume_cb) (rd_kafka_message_t
2594  *rkmessage,
2595  void *opaque),
2596  void *opaque);
2597 
2598 
2615 RD_EXPORT
2616 rd_kafka_message_t *rd_kafka_consume_queue(rd_kafka_queue_t *rkqu,
2617  int timeout_ms);
2618 
2624 RD_EXPORT
2625 ssize_t rd_kafka_consume_batch_queue(rd_kafka_queue_t *rkqu,
2626  int timeout_ms,
2627  rd_kafka_message_t **rkmessages,
2628  size_t rkmessages_size);
2629 
2635 RD_EXPORT
2636 int rd_kafka_consume_callback_queue(rd_kafka_queue_t *rkqu,
2637  int timeout_ms,
2638  void (*consume_cb) (rd_kafka_message_t
2639  *rkmessage,
2640  void *opaque),
2641  void *opaque);
2642 
2643 
2669 RD_EXPORT
2670 rd_kafka_resp_err_t rd_kafka_offset_store(rd_kafka_topic_t *rkt,
2671  int32_t partition, int64_t offset);
2672 
2673 
2690 RD_EXPORT rd_kafka_resp_err_t
2691 rd_kafka_offsets_store(rd_kafka_t *rk,
2719 RD_EXPORT rd_kafka_resp_err_t
2720 rd_kafka_subscribe (rd_kafka_t *rk,
2721  const rd_kafka_topic_partition_list_t *topics);
2722 
2723 
2727 RD_EXPORT
2728 rd_kafka_resp_err_t rd_kafka_unsubscribe (rd_kafka_t *rk);
2729 
2730 
2740 RD_EXPORT rd_kafka_resp_err_t
2741 rd_kafka_subscription (rd_kafka_t *rk,
2743 
2744 
2745 
2767 RD_EXPORT
2768 rd_kafka_message_t *rd_kafka_consumer_poll (rd_kafka_t *rk, int timeout_ms);
2769 
2785 RD_EXPORT
2787 
2788 
2789 
2803 RD_EXPORT rd_kafka_resp_err_t
2804 rd_kafka_assign (rd_kafka_t *rk,
2805  const rd_kafka_topic_partition_list_t *partitions);
2806 
2816 RD_EXPORT rd_kafka_resp_err_t
2817 rd_kafka_assignment (rd_kafka_t *rk,
2818  rd_kafka_topic_partition_list_t **partitions);
2819 
2820 
2821 
2822 
2837 RD_EXPORT rd_kafka_resp_err_t
2838 rd_kafka_commit (rd_kafka_t *rk, const rd_kafka_topic_partition_list_t *offsets,
2839  int async);
2840 
2841 
2847 RD_EXPORT rd_kafka_resp_err_t
2848 rd_kafka_commit_message (rd_kafka_t *rk, const rd_kafka_message_t *rkmessage,
2849  int async);
2850 
2851 
2873 RD_EXPORT rd_kafka_resp_err_t
2874 rd_kafka_commit_queue (rd_kafka_t *rk,
2875  const rd_kafka_topic_partition_list_t *offsets,
2876  rd_kafka_queue_t *rkqu,
2877  void (*cb) (rd_kafka_t *rk,
2878  rd_kafka_resp_err_t err,
2880  void *opaque),
2881  void *opaque);
2882 
2883 
2896 RD_EXPORT rd_kafka_resp_err_t
2897 rd_kafka_committed (rd_kafka_t *rk,
2898  rd_kafka_topic_partition_list_t *partitions,
2899  int timeout_ms);
2900 
2901 
2902 
2915 RD_EXPORT rd_kafka_resp_err_t
2916 rd_kafka_position (rd_kafka_t *rk,
2917  rd_kafka_topic_partition_list_t *partitions);
2918 
2919 
2935 #define RD_KAFKA_MSG_F_FREE 0x1
2936 #define RD_KAFKA_MSG_F_COPY 0x2
2937 #define RD_KAFKA_MSG_F_BLOCK 0x4
2948 #define RD_KAFKA_MSG_F_PARTITION 0x8
3021 RD_EXPORT
3022 int rd_kafka_produce(rd_kafka_topic_t *rkt, int32_t partition,
3023  int msgflags,
3024  void *payload, size_t len,
3025  const void *key, size_t keylen,
3026  void *msg_opaque);
3027 
3028 
3041 RD_EXPORT
3042 rd_kafka_resp_err_t rd_kafka_producev (rd_kafka_t *rk, ...);
3043 
3044 
3069 RD_EXPORT
3070 int rd_kafka_produce_batch(rd_kafka_topic_t *rkt, int32_t partition,
3071  int msgflags,
3072  rd_kafka_message_t *rkmessages, int message_cnt);
3073 
3074 
3075 
3076 
3088 RD_EXPORT
3089 rd_kafka_resp_err_t rd_kafka_flush (rd_kafka_t *rk, int timeout_ms);
3090 
3091 
3106 typedef struct rd_kafka_metadata_broker {
3107  int32_t id;
3108  char *host;
3109  int port;
3111 
3115 typedef struct rd_kafka_metadata_partition {
3116  int32_t id;
3117  rd_kafka_resp_err_t err;
3118  int32_t leader;
3119  int replica_cnt;
3120  int32_t *replicas;
3121  int isr_cnt;
3122  int32_t *isrs;
3124 
3128 typedef struct rd_kafka_metadata_topic {
3129  char *topic;
3130  int partition_cnt;
3131  struct rd_kafka_metadata_partition *partitions;
3139 typedef struct rd_kafka_metadata {
3140  int broker_cnt;
3141  struct rd_kafka_metadata_broker *brokers;
3143  int topic_cnt;
3144  struct rd_kafka_metadata_topic *topics;
3146  int32_t orig_broker_id;
3147  char *orig_broker_name;
3149 
3150 
3167 RD_EXPORT
3169 rd_kafka_metadata (rd_kafka_t *rk, int all_topics,
3170  rd_kafka_topic_t *only_rkt,
3171  const struct rd_kafka_metadata **metadatap,
3172  int timeout_ms);
3173 
3177 RD_EXPORT
3178 void rd_kafka_metadata_destroy(const struct rd_kafka_metadata *metadata);
3179 
3180 
3201  char *member_id;
3202  char *client_id;
3203  char *client_host;
3204  void *member_metadata;
3206  int member_metadata_size;
3207  void *member_assignment;
3210 };
3211 
3216  struct rd_kafka_metadata_broker broker;
3217  char *group;
3219  char *state;
3221  char *protocol;
3224 };
3225 
3234 };
3264 RD_EXPORT
3266 rd_kafka_list_groups (rd_kafka_t *rk, const char *group,
3267  const struct rd_kafka_group_list **grplistp,
3268  int timeout_ms);
3269 
3273 RD_EXPORT
3274 void rd_kafka_group_list_destroy (const struct rd_kafka_group_list *grplist);
3275 
3276 
3317 RD_EXPORT
3318 int rd_kafka_brokers_add(rd_kafka_t *rk, const char *brokerlist);
3319 
3320 
3321 
3322 
3335 RD_EXPORT RD_DEPRECATED
3336 void rd_kafka_set_logger(rd_kafka_t *rk,
3337  void (*func) (const rd_kafka_t *rk, int level,
3338  const char *fac, const char *buf));
3339 
3340 
3348 RD_EXPORT
3349 void rd_kafka_set_log_level(rd_kafka_t *rk, int level);
3350 
3351 
3355 RD_EXPORT
3356 void rd_kafka_log_print(const rd_kafka_t *rk, int level,
3357  const char *fac, const char *buf);
3358 
3359 
3363 RD_EXPORT
3364 void rd_kafka_log_syslog(const rd_kafka_t *rk, int level,
3365  const char *fac, const char *buf);
3366 
3367 
3380 RD_EXPORT
3381 int rd_kafka_outq_len(rd_kafka_t *rk);
3382 
3383 
3384 
3391 RD_EXPORT
3392 void rd_kafka_dump(FILE *fp, rd_kafka_t *rk);
3393 
3394 
3395 
3401 RD_EXPORT
3402 int rd_kafka_thread_cnt(void);
3403 
3404 
3413 RD_EXPORT
3414 int rd_kafka_wait_destroyed(int timeout_ms);
3415 
3416 
3422 RD_EXPORT
3423 int rd_kafka_unittest (void);
3424 
3425 
3443 RD_EXPORT
3445 
3446 
3462 typedef int rd_kafka_event_type_t;
3463 #define RD_KAFKA_EVENT_NONE 0x0
3464 #define RD_KAFKA_EVENT_DR 0x1
3465 #define RD_KAFKA_EVENT_FETCH 0x2
3466 #define RD_KAFKA_EVENT_LOG 0x4
3467 #define RD_KAFKA_EVENT_ERROR 0x8
3468 #define RD_KAFKA_EVENT_REBALANCE 0x10
3469 #define RD_KAFKA_EVENT_OFFSET_COMMIT 0x20
3470 #define RD_KAFKA_EVENT_STATS 0x40
3473 typedef struct rd_kafka_op_s rd_kafka_event_t;
3474 
3475 
3482 RD_EXPORT
3483 rd_kafka_event_type_t rd_kafka_event_type (const rd_kafka_event_t *rkev);
3491 RD_EXPORT
3492 const char *rd_kafka_event_name (const rd_kafka_event_t *rkev);
3493 
3494 
3504 RD_EXPORT
3505 void rd_kafka_event_destroy (rd_kafka_event_t *rkev);
3506 
3507 
3523 RD_EXPORT
3524 const rd_kafka_message_t *rd_kafka_event_message_next (rd_kafka_event_t *rkev);
3525 
3526 
3540 RD_EXPORT
3541 size_t rd_kafka_event_message_array (rd_kafka_event_t *rkev,
3542  const rd_kafka_message_t **rkmessages,
3543  size_t size);
3544 
3545 
3553 RD_EXPORT
3554 size_t rd_kafka_event_message_count (rd_kafka_event_t *rkev);
3555 
3556 
3563 RD_EXPORT
3564 rd_kafka_resp_err_t rd_kafka_event_error (rd_kafka_event_t *rkev);
3565 
3566 
3575 RD_EXPORT
3576 const char *rd_kafka_event_error_string (rd_kafka_event_t *rkev);
3577 
3578 
3579 
3586 RD_EXPORT
3587 void *rd_kafka_event_opaque (rd_kafka_event_t *rkev);
3588 
3589 
3598 RD_EXPORT
3599 int rd_kafka_event_log (rd_kafka_event_t *rkev,
3600  const char **fac, const char **str, int *level);
3601 
3602 
3614 RD_EXPORT
3615 const char *rd_kafka_event_stats (rd_kafka_event_t *rkev);
3616 
3617 
3628 rd_kafka_event_topic_partition_list (rd_kafka_event_t *rkev);
3629 
3630 
3640 RD_EXPORT rd_kafka_topic_partition_t *
3641 rd_kafka_event_topic_partition (rd_kafka_event_t *rkev);
3642 
3643 
3651 RD_EXPORT
3652 rd_kafka_event_t *rd_kafka_queue_poll (rd_kafka_queue_t *rkqu, int timeout_ms);
3653 
3662 RD_EXPORT
3663 int rd_kafka_queue_poll_callback (rd_kafka_queue_t *rkqu, int timeout_ms);
3664 
3665 
3707 typedef rd_kafka_resp_err_t
3708 (rd_kafka_plugin_f_conf_init_t) (rd_kafka_conf_t *conf,
3709  void **plug_opaquep,
3710  char *errstr, size_t errstr_size);
3711 
3791 typedef rd_kafka_conf_res_t
3792 (rd_kafka_interceptor_f_on_conf_set_t) (rd_kafka_conf_t *conf,
3793  const char *name, const char *val,
3794  char *errstr, size_t errstr_size,
3795  void *ic_opaque);
3796 
3797 
3814 typedef rd_kafka_resp_err_t
3815 (rd_kafka_interceptor_f_on_conf_dup_t) (rd_kafka_conf_t *new_conf,
3816  const rd_kafka_conf_t *old_conf,
3817  size_t filter_cnt,
3818  const char **filter,
3819  void *ic_opaque);
3820 
3821 
3828 typedef rd_kafka_resp_err_t
3830 
3831 
3849 typedef rd_kafka_resp_err_t
3850 (rd_kafka_interceptor_f_on_new_t) (rd_kafka_t *rk, const rd_kafka_conf_t *conf,
3851  void *ic_opaque,
3852  char *errstr, size_t errstr_size);
3853 
3854 
3862 typedef rd_kafka_resp_err_t
3863 (rd_kafka_interceptor_f_on_destroy_t) (rd_kafka_t *rk, void *ic_opaque);
3865 
3866 
3867 
3888 typedef rd_kafka_resp_err_t
3889 (rd_kafka_interceptor_f_on_send_t) (rd_kafka_t *rk,
3890  rd_kafka_message_t *rkmessage,
3891  void *ic_opaque);
3892 
3915 typedef rd_kafka_resp_err_t
3917  rd_kafka_message_t *rkmessage,
3918  void *ic_opaque);
3919 
3920 
3937 typedef rd_kafka_resp_err_t
3938 (rd_kafka_interceptor_f_on_consume_t) (rd_kafka_t *rk,
3939  rd_kafka_message_t *rkmessage,
3940  void *ic_opaque);
3941 
3962 typedef rd_kafka_resp_err_t
3964  rd_kafka_t *rk,
3965  const rd_kafka_topic_partition_list_t *offsets,
3966  rd_kafka_resp_err_t err, void *ic_opaque);
3967 
3968 
3990 typedef rd_kafka_resp_err_t
3992  rd_kafka_t *rk,
3993  int sockfd,
3994  const char *brokername,
3995  int32_t brokerid,
3996  int16_t ApiKey,
3997  int16_t ApiVersion,
3998  int32_t CorrId,
3999  size_t size,
4000  void *ic_opaque);
4001 
4002 
4003 
4016 RD_EXPORT rd_kafka_resp_err_t
4018  rd_kafka_conf_t *conf, const char *ic_name,
4020  void *ic_opaque);
4021 
4022 
4035 RD_EXPORT rd_kafka_resp_err_t
4037  rd_kafka_conf_t *conf, const char *ic_name,
4039  void *ic_opaque);
4040 
4054 RD_EXPORT rd_kafka_resp_err_t
4056  rd_kafka_conf_t *conf, const char *ic_name,
4058  void *ic_opaque);
4059 
4060 
4082 RD_EXPORT rd_kafka_resp_err_t
4084  rd_kafka_conf_t *conf, const char *ic_name,
4086  void *ic_opaque);
4087 
4088 
4089 
4102 RD_EXPORT rd_kafka_resp_err_t
4104  rd_kafka_t *rk, const char *ic_name,
4106  void *ic_opaque);
4107 
4108 
4121 RD_EXPORT rd_kafka_resp_err_t
4123  rd_kafka_t *rk, const char *ic_name,
4125  void *ic_opaque);
4126 
4139 RD_EXPORT rd_kafka_resp_err_t
4141  rd_kafka_t *rk, const char *ic_name,
4142  rd_kafka_interceptor_f_on_acknowledgement_t *on_acknowledgement,
4143  void *ic_opaque);
4144 
4145 
4158 RD_EXPORT rd_kafka_resp_err_t
4160  rd_kafka_t *rk, const char *ic_name,
4162  void *ic_opaque);
4163 
4164 
4177 RD_EXPORT rd_kafka_resp_err_t
4179  rd_kafka_t *rk, const char *ic_name,
4181  void *ic_opaque);
4182 
4183 
4196 RD_EXPORT rd_kafka_resp_err_t
4198  rd_kafka_t *rk, const char *ic_name,
4200  void *ic_opaque);
4201 
4202 
4203 
4204 
4208 #ifdef __cplusplus
4209 }
4210 #endif
4211 #endif /* _RDKAFKA_H_ */
void * _private
Definition: rdkafka.h:613
rd_kafka_resp_err_t
Error codes.
Definition: rdkafka.h:251
rd_kafka_resp_err_t( rd_kafka_interceptor_f_on_send_t)(rd_kafka_t *rk, rd_kafka_message_t *rkmessage, void *ic_opaque)
on_send() is called from rd_kafka_produce*() (et.al) prior to the partitioner being called...
Definition: rdkafka.h:3903
rd_kafka_topic_t * rkt
Definition: rdkafka.h:1055
RD_EXPORT rd_kafka_resp_err_t rd_kafka_conf_interceptor_add_on_conf_dup(rd_kafka_conf_t *conf, const char *ic_name, rd_kafka_interceptor_f_on_conf_dup_t *on_conf_dup, void *ic_opaque)
Append an on_conf_dup() interceptor.
Definition: rdkafka.h:354
RD_EXPORT int rd_kafka_consume_start(rd_kafka_topic_t *rkt, int32_t partition, int64_t offset)
Start consuming messages for topic rkt and partition at offset offset which may either be an absolute...
Definition: rdkafka.h:313
rd_kafka_resp_err_t err
Definition: rdkafka.h:1054
rd_kafka_conf_res_t
Configuration result type.
Definition: rdkafka.h:1216
int member_cnt
Definition: rdkafka.h:3237
RD_EXPORT char * rd_kafka_memberid(const rd_kafka_t *rk)
Returns this client's broker-assigned group member id.
RD_EXPORT void rd_kafka_queue_forward(rd_kafka_queue_t *src, rd_kafka_queue_t *dst)
Forward/re-route queue src to dst. If dst is NULL the forwarding is removed.
int cnt
Definition: rdkafka.h:631
Definition: rdkafka.h:331
RD_EXPORT int rd_kafka_thread_cnt(void)
Retrieve the current number of threads in use by librdkafka.
RD_EXPORT void rd_kafka_conf_set_consume_cb(rd_kafka_conf_t *conf, void(*consume_cb)(rd_kafka_message_t *rkmessage, void *opaque))
Consumer: Set consume callback for use with rd_kafka_consumer_poll()
rd_kafka_topic_partition_t * elems
Definition: rdkafka.h:633
RD_EXPORT rd_kafka_resp_err_t rd_kafka_subscription(rd_kafka_t *rk, rd_kafka_topic_partition_list_t **topics)
Returns the current topic subscription.
RD_EXPORT int32_t rd_kafka_msg_partitioner_consistent(const rd_kafka_topic_t *rkt, const void *key, size_t keylen, int32_t partition_cnt, void *opaque, void *msg_opaque)
Consistent partitioner.
RD_EXPORT void rd_kafka_conf_set_dr_msg_cb(rd_kafka_conf_t *conf, void(*dr_msg_cb)(rd_kafka_t *rk, const rd_kafka_message_t *rkmessage, void *opaque))
Producer: Set delivery report callback in provided conf object.
RD_EXPORT const char * rd_kafka_event_name(const rd_kafka_event_t *rkev)
Definition: rdkafka.h:804
RD_EXPORT void rd_kafka_topic_partition_list_destroy(rd_kafka_topic_partition_list_t *rkparlist)
Free all resources used by the list and the list itself.
RD_EXPORT int rd_kafka_unittest(void)
Run librdkafka's built-in unit-tests.
RD_EXPORT rd_kafka_resp_err_t rd_kafka_resume_partitions(rd_kafka_t *rk, rd_kafka_topic_partition_list_t *partitions)
Resume producing consumption for the provided list of partitions.
RD_EXPORT rd_kafka_conf_res_t rd_kafka_topic_conf_set(rd_kafka_topic_conf_t *conf, const char *name, const char *value, char *errstr, size_t errstr_size)
Sets a single rd_kafka_topic_conf_t value by property name.
RD_EXPORT rd_kafka_topic_partition_list_t * rd_kafka_topic_partition_list_copy(const rd_kafka_topic_partition_list_t *src)
Make a copy of an existing list.
RD_EXPORT size_t rd_kafka_header_cnt(const rd_kafka_headers_t *hdrs)
Returns the number of header key/value pairs.
RD_EXPORT rd_kafka_resp_err_t rd_kafka_header_add(rd_kafka_headers_t *hdrs, const char *name, ssize_t name_size, const void *value, ssize_t value_size)
Add header with name name and value val (copied) of size size (not including null-terminator).
RD_EXPORT void rd_kafka_topic_conf_set_partitioner_cb(rd_kafka_topic_conf_t *topic_conf, int32_t(*partitioner)(const rd_kafka_topic_t *rkt, const void *keydata, size_t keylen, int32_t partition_cnt, void *rkt_opaque, void *msg_opaque))
Producer: Set partitioner callback in provided topic conf object.
RD_EXPORT rd_kafka_resp_err_t rd_kafka_poll_set_consumer(rd_kafka_t *rk)
Redirect the main (rd_kafka_poll()) queue to the KafkaConsumer's queue (rd_kafka_consumer_poll()).
RD_EXPORT void rd_kafka_message_destroy(rd_kafka_message_t *rkmessage)
Frees resources for rkmessage and hands ownership back to rdkafka.
RD_EXPORT rd_kafka_conf_res_t rd_kafka_topic_conf_get(const rd_kafka_topic_conf_t *conf, const char *name, char *dest, size_t *dest_size)
Retrieve topic configuration value for property name.
Definition: rdkafka.h:287
Definition: rdkafka.h:343
Definition: rdkafka.h:424
Definition: rdkafka.h:797
RD_EXPORT rd_kafka_resp_err_t rd_kafka_position(rd_kafka_t *rk, rd_kafka_topic_partition_list_t *partitions)
Retrieve current positions (offsets) for topics+partitions.
Definition: rdkafka.h:301
RD_EXPORT rd_kafka_queue_t * rd_kafka_queue_get_consumer(rd_kafka_t *rk)
char * state
Definition: rdkafka.h:3233
rd_kafka_resp_err_t( rd_kafka_interceptor_f_on_request_sent_t)(rd_kafka_t *rk, int sockfd, const char *brokername, int32_t brokerid, int16_t ApiKey, int16_t ApiVersion, int32_t CorrId, size_t size, void *ic_opaque)
on_request_sent() is called when a request has been fully written to a broker TCP connections socket...
Definition: rdkafka.h:4005
size_t key_len
Definition: rdkafka.h:1066
RD_EXPORT rd_kafka_resp_err_t rd_kafka_offsets_store(rd_kafka_t *rk, rd_kafka_topic_partition_list_t *offsets)
Store offsets for next auto-commit for one or more partitions.
Definition: rdkafka.h:281
RD_EXPORT rd_kafka_event_type_t rd_kafka_event_type(const rd_kafka_event_t *rkev)
RD_EXPORT rd_kafka_conf_t * rd_kafka_conf_dup_filter(const rd_kafka_conf_t *conf, size_t filter_cnt, const char **filter)
Same as rd_kafka_conf_dup() but with an array of property name prefixes to filter out (ignore) when c...
Definition: rdkafka.h:1218
int member_assignment_size
Definition: rdkafka.h:3223
RD_EXPORT rd_kafka_t * rd_kafka_new(rd_kafka_type_t type, rd_kafka_conf_t *conf, char *errstr, size_t errstr_size)
Creates a new Kafka handle and starts its operation according to the specified type (RD_KAFKA_CONSUME...
RD_EXPORT void rd_kafka_get_err_descs(const struct rd_kafka_err_desc **errdescs, size_t *cntp)
Returns the full list of error codes.
Group information.
Definition: rdkafka.h:3229
char * group
Definition: rdkafka.h:3231
Partition information.
Definition: rdkafka.h:3129
Definition: rdkafka.h:270
RD_EXPORT rd_kafka_resp_err_t rd_kafka_interceptor_add_on_commit(rd_kafka_t *rk, const char *ic_name, rd_kafka_interceptor_f_on_commit_t *on_commit, void *ic_opaque)
Append an on_commit() interceptor.
Definition: rdkafka.h:450
RD_EXPORT int rd_kafka_event_log(rd_kafka_event_t *rkev, const char **fac, const char **str, int *level)
Extract log message from the event.
RD_EXPORT rd_kafka_resp_err_t rd_kafka_offsets_for_times(rd_kafka_t *rk, rd_kafka_topic_partition_list_t *offsets, int timeout_ms)
Look up the offsets for the given partitions by timestamp.
rd_kafka_resp_err_t( rd_kafka_interceptor_f_on_conf_dup_t)(rd_kafka_conf_t *new_conf, const rd_kafka_conf_t *old_conf, size_t filter_cnt, const char **filter, void *ic_opaque)
on_conf_dup() is called from rd_kafka_conf_dup() in the order the interceptors were added and is used...
Definition: rdkafka.h:3829
RD_EXPORT const char * rd_kafka_version_str(void)
Returns the librdkafka version as string.
RD_EXPORT void * rd_kafka_opaque(const rd_kafka_t *rk)
Retrieves the opaque pointer previously set with rd_kafka_conf_set_opaque()
char * client_id
Definition: rdkafka.h:3216
const char * name
Definition: rdkafka.h:482
Definition: rdkafka.h:277
RD_EXPORT rd_kafka_topic_partition_list_t * rd_kafka_topic_partition_list_new(int size)
Create a new list/vector Topic+Partition container.
char * client_host
Definition: rdkafka.h:3217
struct rd_kafka_group_info * groups
Definition: rdkafka.h:3246
RD_EXPORT rd_kafka_resp_err_t rd_kafka_commit_message(rd_kafka_t *rk, const rd_kafka_message_t *rkmessage, int async)
Commit message's offset on broker for the message's partition.
RD_EXPORT ssize_t rd_kafka_consume_batch(rd_kafka_topic_t *rkt, int32_t partition, int timeout_ms, rd_kafka_message_t **rkmessages, size_t rkmessages_size)
Consume up to rkmessages_size from topic rkt and partition putting a pointer to each message in the a...
Definition: rdkafka.h:802
RD_EXPORT void rd_kafka_topic_conf_set_opaque(rd_kafka_topic_conf_t *conf, void *opaque)
Sets the application's opaque pointer that will be passed to all topic callbacks as the rkt_opaque ar...
Definition: rdkafka.h:297
rd_kafka_resp_err_t( rd_kafka_interceptor_f_on_new_t)(rd_kafka_t *rk, const rd_kafka_conf_t *conf, void *ic_opaque, char *errstr, size_t errstr_size)
on_new() is called from rd_kafka_new() prior toreturning the newly created client instance to the app...
Definition: rdkafka.h:3864
RD_EXPORT void rd_kafka_log_print(const rd_kafka_t *rk, int level, const char *fac, const char *buf)
Builtin (default) log sink: print to stderr.
char * protocol_type
Definition: rdkafka.h:3234
rd_kafka_resp_err_t( rd_kafka_interceptor_f_on_acknowledgement_t)(rd_kafka_t *rk, rd_kafka_message_t *rkmessage, void *ic_opaque)
on_acknowledgement() is called to inform interceptors that a message was succesfully delivered or per...
Definition: rdkafka.h:3930
rd_kafka_resp_err_t( rd_kafka_interceptor_f_on_commit_t)(rd_kafka_t *rk, const rd_kafka_topic_partition_list_t *offsets, rd_kafka_resp_err_t err, void *ic_opaque)
on_commit() is called on completed or failed offset commit. It is called from internal librdkafka thr...
Definition: rdkafka.h:3977
Definition: rdkafka.h:305
rd_kafka_resp_err_t( rd_kafka_plugin_f_conf_init_t)(rd_kafka_conf_t *conf, void **plug_opaquep, char *errstr, size_t errstr_size)
Plugin's configuration initializer method called each time the library is referenced from configurati...
Definition: rdkafka.h:3722
RD_EXPORT int rd_kafka_consume_callback_queue(rd_kafka_queue_t *rkqu, int timeout_ms, void(*consume_cb)(rd_kafka_message_t *rkmessage, void *opaque), void *opaque)
Consume multiple messages from queue with callback.
RD_EXPORT rd_kafka_conf_res_t rd_kafka_conf_set(rd_kafka_conf_t *conf, const char *name, const char *value, char *errstr, size_t errstr_size)
Sets a configuration property.
RD_EXPORT int rd_kafka_outq_len(rd_kafka_t *rk)
Returns the current out queue length.
int group_cnt
Definition: rdkafka.h:3247
Definition: rdkafka.h:323
RD_EXPORT rd_kafka_resp_err_t rd_kafka_interceptor_add_on_consume(rd_kafka_t *rk, const char *ic_name, rd_kafka_interceptor_f_on_consume_t *on_consume, void *ic_opaque)
Append an on_consume() interceptor.
rd_kafka_vtype_t
Var-arg tag types.
Definition: rdkafka.h:795
RD_EXPORT const rd_kafka_message_t * rd_kafka_event_message_next(rd_kafka_event_t *rkev)
RD_EXPORT rd_kafka_resp_err_t rd_kafka_offset_store(rd_kafka_topic_t *rkt, int32_t partition, int64_t offset)
Store offset offset for topic rkt partition partition.
RD_EXPORT void rd_kafka_conf_set_error_cb(rd_kafka_conf_t *conf, void(*error_cb)(rd_kafka_t *rk, int err, const char *reason, void *opaque))
Set error callback in provided conf object.
rd_kafka_resp_err_t err
Definition: rdkafka.h:3232
size_t len
Definition: rdkafka.h:1061
RD_EXPORT rd_kafka_resp_err_t rd_kafka_commit(rd_kafka_t *rk, const rd_kafka_topic_partition_list_t *offsets, int async)
Commit offsets on broker for the provided list of partitions.
RD_EXPORT void rd_kafka_conf_set_default_topic_conf(rd_kafka_conf_t *conf, rd_kafka_topic_conf_t *tconf)
Definition: rdkafka.h:402
RD_EXPORT rd_kafka_type_t rd_kafka_type(const rd_kafka_t *rk)
Returns Kafka handle type.
RD_EXPORT int rd_kafka_produce(rd_kafka_topic_t *rkt, int32_t partition, int msgflags, void *payload, size_t len, const void *key, size_t keylen, void *msg_opaque)
Produce and send a single message to broker.
rd_kafka_conf_res_t( rd_kafka_interceptor_f_on_conf_set_t)(rd_kafka_conf_t *conf, const char *name, const char *val, char *errstr, size_t errstr_size, void *ic_opaque)
on_conf_set() is called from rd_kafka_*_conf_set() in the order the interceptors were added...
Definition: rdkafka.h:3806
Definition: rdkafka.h:291
RD_EXPORT const char * rd_kafka_event_error_string(rd_kafka_event_t *rkev)
Definition: rdkafka.h:404
Definition: rdkafka.h:442
RD_EXPORT rd_kafka_topic_partition_t * rd_kafka_event_topic_partition(rd_kafka_event_t *rkev)
struct rd_kafka_metadata_broker broker
Definition: rdkafka.h:3230
RD_EXPORT void rd_kafka_queue_destroy(rd_kafka_queue_t *rkqu)
RD_EXPORT size_t rd_kafka_queue_length(rd_kafka_queue_t *rkqu)
Definition: rdkafka.h:309
RD_EXPORT rd_kafka_resp_err_t rd_kafka_interceptor_add_on_acknowledgement(rd_kafka_t *rk, const char *ic_name, rd_kafka_interceptor_f_on_acknowledgement_t *on_acknowledgement, void *ic_opaque)
Append an on_acknowledgement() interceptor.
RD_EXPORT rd_kafka_resp_err_t rd_kafka_header_get(const rd_kafka_headers_t *hdrs, size_t idx, const char *name, const void **valuep, size_t *sizep)
Iterator for headers matching name.
rd_kafka_resp_err_t code
Definition: rdkafka.h:481
RD_EXPORT void rd_kafka_destroy(rd_kafka_t *rk)
Destroy Kafka handle.
RD_EXPORT void rd_kafka_set_log_level(rd_kafka_t *rk, int level)
Specifies the maximum logging level produced by internal kafka logging and debugging.
RD_EXPORT int rd_kafka_consume_callback(rd_kafka_topic_t *rkt, int32_t partition, int timeout_ms, void(*consume_cb)(rd_kafka_message_t *rkmessage, void *opaque), void *opaque)
Consumes messages from topic rkt and partition, calling the provided callback for each consumed messs...
Definition: rdkafka.h:803
RD_EXPORT rd_kafka_resp_err_t rd_kafka_metadata(rd_kafka_t *rk, int all_topics, rd_kafka_topic_t *only_rkt, const struct rd_kafka_metadata **metadatap, int timeout_ms)
Request Metadata from broker.
RD_EXPORT int rd_kafka_wait_destroyed(int timeout_ms)
Wait for all rd_kafka_t objects to be destroyed.
Definition: rdkafka.h:388
RD_EXPORT rd_kafka_resp_err_t rd_kafka_commit_queue(rd_kafka_t *rk, const rd_kafka_topic_partition_list_t *offsets, rd_kafka_queue_t *rkqu, void(*cb)(rd_kafka_t *rk, rd_kafka_resp_err_t err, rd_kafka_topic_partition_list_t *offsets, void *opaque), void *opaque)
Commit offsets on broker for the provided list of partitions.
Definition: rdkafka.h:376
RD_EXPORT rd_kafka_resp_err_t rd_kafka_message_headers(const rd_kafka_message_t *rkmessage, rd_kafka_headers_t **hdrsp)
Get the message header list.
int64_t offset
Definition: rdkafka.h:1068
RD_EXPORT rd_kafka_headers_t * rd_kafka_headers_new(size_t initial_count)
Create a new headers list.
Definition: rdkafka.h:380
RD_EXPORT int rd_kafka_brokers_add(rd_kafka_t *rk, const char *brokerlist)
Adds one or more brokers to the kafka handle's list of initial bootstrap brokers. ...
RD_EXPORT RD_DEPRECATED int rd_kafka_errno(void)
Returns the thread-local system errno.
RD_EXPORT void rd_kafka_topic_conf_set_msg_order_cmp(rd_kafka_topic_conf_t *topic_conf, int(*msg_order_cmp)(const rd_kafka_message_t *a, const rd_kafka_message_t *b))
Producer: Set message queueing order comparator callback.
rd_kafka_resp_err_t( rd_kafka_interceptor_f_on_destroy_t)(rd_kafka_t *rk, void *ic_opaque)
on_destroy() is called from rd_kafka_destroy() or (rd_kafka_new() if rd_kafka_new() fails during init...
Definition: rdkafka.h:3877
Definition: rdkafka.h:362
Definition: rdkafka.h:266
Definition: rdkafka.h:352
RD_EXPORT void rd_kafka_log_syslog(const rd_kafka_t *rk, int level, const char *fac, const char *buf)
Builtin log sink: print to syslog.
Definition: rdkafka.h:254
Group member information.
Definition: rdkafka.h:3214
void * key
Definition: rdkafka.h:1064
RD_EXPORT rd_kafka_conf_res_t rd_kafka_conf_get(const rd_kafka_conf_t *conf, const char *name, char *dest, size_t *dest_size)
Retrieve configuration value for property name.
RD_EXPORT rd_kafka_topic_conf_t * rd_kafka_topic_conf_new(void)
Create topic configuration object.
RD_EXPORT rd_kafka_resp_err_t rd_kafka_get_watermark_offsets(rd_kafka_t *rk, const char *topic, int32_t partition, int64_t *low, int64_t *high)
Get last known low (oldest/beginning) and high (newest/end) offsets for partition.
RD_EXPORT rd_kafka_resp_err_t rd_kafka_interceptor_add_on_request_sent(rd_kafka_t *rk, const char *ic_name, rd_kafka_interceptor_f_on_request_sent_t *on_request_sent, void *ic_opaque)
Append an on_request_sent() interceptor.
RD_EXPORT void rd_kafka_dump(FILE *fp, rd_kafka_t *rk)
Dumps rdkafka's internal state for handle rk to stream fp.
Definition: rdkafka.h:799
Definition: rdkafka.h:279
RD_EXPORT void rd_kafka_conf_set_closesocket_cb(rd_kafka_conf_t *conf, int(*closesocket_cb)(int sockfd, void *opaque))
Set close socket callback.
A growable list of Topic+Partitions.
Definition: rdkafka.h:630
int rd_kafka_event_type_t
Event types.
Definition: rdkafka.h:3476
Topic information.
Definition: rdkafka.h:3142
RD_EXPORT void rd_kafka_conf_set_throttle_cb(rd_kafka_conf_t *conf, void(*throttle_cb)(rd_kafka_t *rk, const char *broker_name, int32_t broker_id, int throttle_time_ms, void *opaque))
Set throttle callback.
RD_EXPORT rd_kafka_queue_t * rd_kafka_queue_get_main(rd_kafka_t *rk)
RD_EXPORT void rd_kafka_conf_set_log_cb(rd_kafka_conf_t *conf, void(*log_cb)(const rd_kafka_t *rk, int level, const char *fac, const char *buf))
Set logger callback.
Definition: rdkafka.h:268
RD_EXPORT rd_kafka_resp_err_t rd_kafka_list_groups(rd_kafka_t *rk, const char *group, const struct rd_kafka_group_list **grplistp, int timeout_ms)
List and describe client groups in cluster.
Definition: rdkafka.h:798
int32_t partition
Definition: rdkafka.h:607
Definition: rdkafka.h:264
Definition: rdkafka.h:293
Definition: rdkafka.h:262
void * opaque
Definition: rdkafka.h:611
const char * desc
Definition: rdkafka.h:483
RD_EXPORT rd_kafka_resp_err_t rd_kafka_assign(rd_kafka_t *rk, const rd_kafka_topic_partition_list_t *partitions)
Atomic assignment of partitions to consume.
RD_EXPORT rd_kafka_conf_t * rd_kafka_conf_dup(const rd_kafka_conf_t *conf)
Creates a copy/duplicate of configuration object conf.
RD_EXPORT int64_t rd_kafka_message_timestamp(const rd_kafka_message_t *rkmessage, rd_kafka_timestamp_type_t *tstype)
Returns the message timestamp for a consumed message.
RD_EXPORT rd_kafka_queue_t * rd_kafka_queue_get_partition(rd_kafka_t *rk, const char *topic, int32_t partition)
RD_EXPORT int rd_kafka_topic_partition_list_del_by_idx(rd_kafka_topic_partition_list_t *rktparlist, int idx)
Delete partition from list by elems[] index.
rd_kafka_resp_err_t( rd_kafka_interceptor_f_on_conf_destroy_t)(void *ic_opaque)
on_conf_destroy() is called from rd_kafka_*_conf_destroy() in the order the interceptors were added...
Definition: rdkafka.h:3843
RD_EXPORT rd_kafka_resp_err_t rd_kafka_interceptor_add_on_send(rd_kafka_t *rk, const char *ic_name, rd_kafka_interceptor_f_on_send_t *on_send, void *ic_opaque)
Append an on_send() interceptor.
Definition: rdkafka.h:374
Definition: rdkafka.h:329
RD_EXPORT void rd_kafka_conf_set_connect_cb(rd_kafka_conf_t *conf, int(*connect_cb)(int sockfd, const struct sockaddr *addr, int addrlen, const char *id, void *opaque))
Set connect callback.
RD_EXPORT void rd_kafka_mem_free(rd_kafka_t *rk, void *ptr)
Free pointer returned by librdkafka.
static RD_INLINE const char *RD_UNUSED rd_kafka_message_errstr(const rd_kafka_message_t *rkmessage)
Returns the error string for an errored rd_kafka_message_t or NULL if there was no error...
Definition: rdkafka.h:1102
RD_EXPORT rd_kafka_resp_err_t rd_kafka_conf_interceptor_add_on_new(rd_kafka_conf_t *conf, const char *ic_name, rd_kafka_interceptor_f_on_new_t *on_new, void *ic_opaque)
Append an on_new() interceptor.
RD_EXPORT char * rd_kafka_clusterid(rd_kafka_t *rk, int timeout_ms)
Returns the ClusterId as reported in broker metadata.
RD_EXPORT rd_kafka_resp_err_t rd_kafka_pause_partitions(rd_kafka_t *rk, rd_kafka_topic_partition_list_t *partitions)
Pause producing or consumption for the provided list of partitions.
RD_EXPORT void rd_kafka_group_list_destroy(const struct rd_kafka_group_list *grplist)
Release list memory.
Definition: rdkafka.h:434
RD_EXPORT void rd_kafka_event_destroy(rd_kafka_event_t *rkev)
Destroy an event.
RD_EXPORT rd_kafka_resp_err_t rd_kafka_committed(rd_kafka_t *rk, rd_kafka_topic_partition_list_t *partitions, int timeout_ms)
Retrieve committed offsets for topics+partitions.
Definition: rdkafka.h:204
RD_EXPORT rd_kafka_conf_t * rd_kafka_conf_new(void)
Create configuration object.
RD_EXPORT rd_kafka_event_t * rd_kafka_queue_poll(rd_kafka_queue_t *rkqu, int timeout_ms)
Poll a queue for an event for max timeout_ms.
RD_EXPORT int rd_kafka_consume_stop(rd_kafka_topic_t *rkt, int32_t partition)
Stop consuming messages for topic rkt and partition, purging all messages currently in the local queu...
RD_EXPORT const char * rd_kafka_err2name(rd_kafka_resp_err_t err)
Returns the error code name (enum name).
RD_EXPORT rd_kafka_resp_err_t rd_kafka_conf_interceptor_add_on_conf_destroy(rd_kafka_conf_t *conf, const char *ic_name, rd_kafka_interceptor_f_on_conf_destroy_t *on_conf_destroy, void *ic_opaque)
Append an on_conf_destroy() interceptor.
RD_EXPORT int rd_kafka_version(void)
Returns the librdkafka version as integer.
Definition: rdkafka.h:422
void * member_assignment
Definition: rdkafka.h:3221
RD_EXPORT int rd_kafka_topic_partition_list_del(rd_kafka_topic_partition_list_t *rktparlist, const char *topic, int32_t partition)
Delete partition from list.
int size
Definition: rdkafka.h:632
RD_EXPORT rd_kafka_resp_err_t rd_kafka_subscribe(rd_kafka_t *rk, const rd_kafka_topic_partition_list_t *topics)
Subscribe to topic set using balanced consumer groups.
RD_EXPORT void rd_kafka_topic_partition_destroy(rd_kafka_topic_partition_t *rktpar)
Destroy a rd_kafka_topic_partition_t.
RD_EXPORT void rd_kafka_conf_set_opaque(rd_kafka_conf_t *conf, void *opaque)
Sets the application's opaque pointer that will be passed to callbacks.
Definition: rdkafka.h:1219
RD_EXPORT const char ** rd_kafka_conf_dump(rd_kafka_conf_t *conf, size_t *cntp)
Dump the configuration properties and values of conf to an array with "key", "value" pairs...
RD_EXPORT rd_kafka_resp_err_t rd_kafka_event_error(rd_kafka_event_t *rkev)
Definition: rdkafka.h:321
char * topic
Definition: rdkafka.h:606
RD_EXPORT const char * rd_kafka_get_debug_contexts(void)
Retrieve supported debug contexts for use with the "debug" configuration property. (runtime)
Definition: rdkafka.h:327
RD_EXPORT void rd_kafka_conf_set_offset_commit_cb(rd_kafka_conf_t *conf, void(*offset_commit_cb)(rd_kafka_t *rk, rd_kafka_resp_err_t err, rd_kafka_topic_partition_list_t *offsets, void *opaque))
Consumer: Set offset commit callback for use with consumer groups.
RD_EXPORT void rd_kafka_conf_set_rebalance_cb(rd_kafka_conf_t *conf, void(*rebalance_cb)(rd_kafka_t *rk, rd_kafka_resp_err_t err, rd_kafka_topic_partition_list_t *partitions, void *opaque))
Consumer: Set rebalance callback for use with coordinated consumer group balancing.
Definition: rdkafka.h:796
RD_EXPORT int rd_kafka_queue_poll_callback(rd_kafka_queue_t *rkqu, int timeout_ms)
Poll a queue for events served through callbacks for max timeout_ms.
rd_kafka_resp_err_t( rd_kafka_interceptor_f_on_consume_t)(rd_kafka_t *rk, rd_kafka_message_t *rkmessage, void *ic_opaque)
on_consume() is called just prior to passing the message to the application in rd_kafka_consumer_poll...
Definition: rdkafka.h:3952
RD_EXPORT rd_kafka_resp_err_t rd_kafka_flush(rd_kafka_t *rk, int timeout_ms)
Wait until all outstanding produce requests, et.al, are completed. This should typically be done prio...
Definition: rdkafka.h:339
RD_EXPORT int rd_kafka_poll(rd_kafka_t *rk, int timeout_ms)
Polls the provided kafka handle for events.
RD_EXPORT int rd_kafka_topic_partition_available(const rd_kafka_topic_t *rkt, int32_t partition)
Check if partition is available (has a leader broker).
Definition: rdkafka.h:191
RD_EXPORT int64_t rd_kafka_message_latency(const rd_kafka_message_t *rkmessage)
Returns the latency for a produced message measured from the produce() call.
RD_EXPORT int32_t rd_kafka_msg_partitioner_murmur2(const rd_kafka_topic_t *rkt, const void *key, size_t keylen, int32_t partition_cnt, void *rkt_opaque, void *msg_opaque)
Murmur2 partitioner (Java compatible).
Definition: rdkafka.h:438
Definition: rdkafka.h:392
RD_EXPORT void rd_kafka_message_set_headers(rd_kafka_message_t *rkmessage, rd_kafka_headers_t *hdrs)
Replace the message's current headers with a new list.
RD_EXPORT void rd_kafka_conf_set_socket_cb(rd_kafka_conf_t *conf, int(*socket_cb)(int domain, int type, int protocol, void *opaque))
Set socket callback.
Definition: rdkafka.h:192
Definition: rdkafka.h:203
Definition: rdkafka.h:289
RD_EXPORT void rd_kafka_conf_set_events(rd_kafka_conf_t *conf, int events)
Enable event sourcing. events is a bitmask of RD_KAFKA_EVENT_* of events to enable for consumption by...
RD_EXPORT void rd_kafka_conf_set_dr_cb(rd_kafka_conf_t *conf, void(*dr_cb)(rd_kafka_t *rk, void *payload, size_t len, rd_kafka_resp_err_t err, void *opaque, void *msg_opaque))
Definition: rdkafka.h:285
RD_EXPORT rd_kafka_topic_conf_t * rd_kafka_topic_conf_dup(const rd_kafka_topic_conf_t *conf)
Creates a copy/duplicate of topic configuration object conf.
Definition: rdkafka.h:356
RD_EXPORT RD_DEPRECATED void rd_kafka_set_logger(rd_kafka_t *rk, void(*func)(const rd_kafka_t *rk, int level, const char *fac, const char *buf))
Set logger function.
Definition: rdkafka.h:260
Metadata container.
Definition: rdkafka.h:3153
Definition: rdkafka.h:319
RD_EXPORT rd_kafka_resp_err_t rd_kafka_unsubscribe(rd_kafka_t *rk)
Unsubscribe from the current subscription set.
RD_EXPORT void rd_kafka_metadata_destroy(const struct rd_kafka_metadata *metadata)
Release metadata memory.
RD_EXPORT int rd_kafka_produce_batch(rd_kafka_topic_t *rkt, int32_t partition, int msgflags, rd_kafka_message_t *rkmessages, int message_cnt)
Produce multiple messages.
RD_EXPORT void rd_kafka_conf_set_open_cb(rd_kafka_conf_t *conf, int(*open_cb)(const char *pathname, int flags, mode_t mode, void *opaque))
Set open callback.
RD_EXPORT rd_kafka_resp_err_t rd_kafka_header_remove(rd_kafka_headers_t *hdrs, const char *name)
Remove all headers for the given key (if any).
Definition: rdkafka.h:311
RD_EXPORT rd_kafka_resp_err_t rd_kafka_producev(rd_kafka_t *rk,...)
Produce and send a single message to broker.
Definition: rdkafka.h:436
RD_EXPORT rd_kafka_resp_err_t rd_kafka_seek(rd_kafka_topic_t *rkt, int32_t partition, int64_t offset, int timeout_ms)
Seek consumer for topic+partition to offset which is either an absolute or logical offset...
RD_EXPORT void rd_kafka_topic_partition_list_add_range(rd_kafka_topic_partition_list_t *rktparlist, const char *topic, int32_t start, int32_t stop)
Add range of partitions from start to stop inclusive.
RD_EXPORT size_t rd_kafka_event_message_count(rd_kafka_event_t *rkev)
Definition: rdkafka.h:1217
RD_EXPORT rd_kafka_topic_partition_t * rd_kafka_topic_partition_list_add(rd_kafka_topic_partition_list_t *rktparlist, const char *topic, int32_t partition)
Add topic+partition to list.
Definition: rdkafka.h:283
RD_EXPORT int32_t rd_kafka_msg_partitioner_murmur2_random(const rd_kafka_topic_t *rkt, const void *key, size_t keylen, int32_t partition_cnt, void *rkt_opaque, void *msg_opaque)
Consistent-Random Murmur2 partitioner (Java compatible).
Definition: rdkafka.h:202
Definition: rdkafka.h:307
RD_EXPORT ssize_t rd_kafka_consume_batch_queue(rd_kafka_queue_t *rkqu, int timeout_ms, rd_kafka_message_t **rkmessages, size_t rkmessages_size)
Consume batch of messages from queue.
rd_kafka_timestamp_type_t
Definition: rdkafka.h:201
RD_EXPORT void * rd_kafka_topic_opaque(const rd_kafka_topic_t *rkt)
Get the rkt_opaque pointer that was set in the topic configuration.
RD_EXPORT void * rd_kafka_event_opaque(rd_kafka_event_t *rkev)
RD_EXPORT const char * rd_kafka_name(const rd_kafka_t *rk)
Returns Kafka handle name.
A Kafka message as returned by the rd_kafka_consume*() family of functions as well as provided to the...
Definition: rdkafka.h:1053
Definition: rdkafka.h:295
RD_EXPORT void rd_kafka_queue_io_event_enable(rd_kafka_queue_t *rkqu, int fd, const void *payload, size_t size)
Enable IO event triggering for queue.
RD_EXPORT void rd_kafka_topic_destroy(rd_kafka_topic_t *rkt)
Loose application's topic handle refcount as previously created with rd_kafka_topic_new().
int32_t partition
Definition: rdkafka.h:1056
Definition: rdkafka.h:348
RD_EXPORT rd_kafka_resp_err_t rd_kafka_conf_interceptor_add_on_conf_set(rd_kafka_conf_t *conf, const char *ic_name, rd_kafka_interceptor_f_on_conf_set_t *on_conf_set, void *ic_opaque)
Append an on_conf_set() interceptor.
RD_EXPORT rd_kafka_resp_err_t rd_kafka_query_watermark_offsets(rd_kafka_t *rk, const char *topic, int32_t partition, int64_t *low, int64_t *high, int timeout_ms)
Query broker for low (oldest/beginning) and high (newest/end) offsets for partition.
RD_EXPORT rd_kafka_message_t * rd_kafka_consume(rd_kafka_topic_t *rkt, int32_t partition, int timeout_ms)
Consume a single message from topic rkt and partition.
Definition: rdkafka.h:299
RD_EXPORT rd_kafka_message_t * rd_kafka_consumer_poll(rd_kafka_t *rk, int timeout_ms)
Poll the consumer for messages or events.
RD_EXPORT rd_kafka_resp_err_t rd_kafka_assignment(rd_kafka_t *rk, rd_kafka_topic_partition_list_t **partitions)
Returns the current partition assignment.
Definition: rdkafka.h:275
Definition: rdkafka.h:807
char * member_id
Definition: rdkafka.h:3215
List of groups.
Definition: rdkafka.h:3245
Definition: rdkafka.h:467
Definition: rdkafka.h:801
RD_EXPORT const char * rd_kafka_err2str(rd_kafka_resp_err_t err)
Returns a human readable representation of a kafka error.
void * metadata
Definition: rdkafka.h:609
RD_EXPORT void rd_kafka_topic_partition_list_sort(rd_kafka_topic_partition_list_t *rktparlist, int(*cmp)(const void *a, const void *b, void *opaque), void *opaque)
Sort list using comparator cmp.
RD_EXPORT rd_kafka_resp_err_t rd_kafka_interceptor_add_on_destroy(rd_kafka_t *rk, const char *ic_name, rd_kafka_interceptor_f_on_destroy_t *on_destroy, void *ic_opaque)
Append an on_destroy() interceptor.
int64_t offset
Definition: rdkafka.h:608
RD_EXPORT rd_kafka_topic_partition_list_t * rd_kafka_event_topic_partition_list(rd_kafka_event_t *rkev)
RD_EXPORT RD_DEPRECATED rd_kafka_resp_err_t rd_kafka_errno2err(int errnox)
Converts the system errno value errnox to a rd_kafka_resp_err_t error code upon failure from the foll...
Definition: rdkafka.h:258
RD_EXPORT void rd_kafka_conf_destroy(rd_kafka_conf_t *conf)
Destroys a conf object.
RD_EXPORT rd_kafka_resp_err_t rd_kafka_header_get_all(const rd_kafka_headers_t *hdrs, size_t idx, const char **namep, const void **valuep, size_t *sizep)
Iterator for all headers.
Definition: rdkafka.h:317
Broker information.
Definition: rdkafka.h:3120
Error code value, name and description. Typically for use with language bindings to automatically exp...
Definition: rdkafka.h:480
Definition: rdkafka.h:418
Definition: rdkafka.h:398
rd_kafka_resp_err_t err
Definition: rdkafka.h:612
Definition: rdkafka.h:315
RD_EXPORT rd_kafka_headers_t * rd_kafka_headers_copy(const rd_kafka_headers_t *src)
Make a copy of headers list src.
Definition: rdkafka.h:341
RD_EXPORT rd_kafka_resp_err_t rd_kafka_message_detach_headers(rd_kafka_message_t *rkmessage, rd_kafka_headers_t **hdrsp)
Get the message header list and detach the list from the message making the application the owner of ...
Topic+Partition place holder.
Definition: rdkafka.h:605
Definition: rdkafka.h:256
RD_EXPORT rd_kafka_topic_conf_t * rd_kafka_default_topic_conf_dup(rd_kafka_t *rk)
Creates a copy/duplicate of rk 's default topic configuration object.
Definition: rdkafka.h:345
struct rd_kafka_group_member_info * members
Definition: rdkafka.h:3236
size_t metadata_size
Definition: rdkafka.h:610
RD_EXPORT const char ** rd_kafka_topic_conf_dump(rd_kafka_topic_conf_t *conf, size_t *cntp)
Dump the topic configuration properties and values of conf to an array with "key", "value" pairs.
RD_EXPORT void rd_kafka_headers_destroy(rd_kafka_headers_t *hdrs)
Destroy the headers list. The object and any returned value pointers are not usable after this call...
int member_metadata_size
Definition: rdkafka.h:3220
RD_EXPORT size_t rd_kafka_event_message_array(rd_kafka_event_t *rkev, const rd_kafka_message_t **rkmessages, size_t size)
Extacts size message(s) from the event into the pre-allocated array rkmessages.
Definition: rdkafka.h:800
rd_kafka_type_t
rd_kafka_t handle type.
Definition: rdkafka.h:190
RD_EXPORT int32_t rd_kafka_msg_partitioner_random(const rd_kafka_topic_t *rkt, const void *key, size_t keylen, int32_t partition_cnt, void *opaque, void *msg_opaque)
Random partitioner.
RD_EXPORT void rd_kafka_conf_set_stats_cb(rd_kafka_conf_t *conf, int(*stats_cb)(rd_kafka_t *rk, char *json, size_t json_len, void *opaque))
Set statistics callback in provided conf object.
Definition: rdkafka.h:368
RD_EXPORT rd_kafka_topic_partition_t * rd_kafka_topic_partition_list_find(rd_kafka_topic_partition_list_t *rktparlist, const char *topic, int32_t partition)
Find element by topic and partition.
RD_EXPORT void rd_kafka_topic_conf_destroy(rd_kafka_topic_conf_t *topic_conf)
Destroys a topic conf object.
RD_EXPORT rd_kafka_resp_err_t rd_kafka_topic_partition_list_set_offset(rd_kafka_topic_partition_list_t *rktparlist, const char *topic, int32_t partition, int64_t offset)
Set offset to offset for topic and partition.
Definition: rdkafka.h:428
void * payload
Definition: rdkafka.h:1057
RD_EXPORT void rd_kafka_yield(rd_kafka_t *rk)
Cancels the current callback dispatcher (rd_kafka_poll(), rd_kafka_consume_callback(), etc).
Definition: rdkafka.h:358
RD_EXPORT rd_kafka_resp_err_t rd_kafka_header_get_last(const rd_kafka_headers_t *hdrs, const char *name, const void **valuep, size_t *sizep)
Find last header in list hdrs matching name.
RD_EXPORT const char * rd_kafka_event_stats(rd_kafka_event_t *rkev)
Extract stats from the event.
RD_EXPORT rd_kafka_resp_err_t rd_kafka_set_log_queue(rd_kafka_t *rk, rd_kafka_queue_t *rkqu)
Forward librdkafka logs (and debug) to the specified queue for serving with one of the ...
void * member_metadata
Definition: rdkafka.h:3218
RD_EXPORT rd_kafka_resp_err_t rd_kafka_consumer_close(rd_kafka_t *rk)
Close down the KafkaConsumer.
RD_EXPORT void rd_kafka_conf_properties_show(FILE *fp)
Prints a table to fp of all supported configuration properties, their default values as well as a des...
RD_EXPORT void rd_kafka_conf_dump_free(const char **arr, size_t cnt)
Frees a configuration dump returned from rd_kafka_conf_dump() or `rd_kafka_topic_conf_dump().
void * _private
Definition: rdkafka.h:1078
Definition: rdkafka.h:273
RD_EXPORT int32_t rd_kafka_msg_partitioner_consistent_random(const rd_kafka_topic_t *rkt, const void *key, size_t keylen, int32_t partition_cnt, void *opaque, void *msg_opaque)
Consistent-Random partitioner.
RD_EXPORT const char * rd_kafka_topic_name(const rd_kafka_topic_t *rkt)
Returns the topic name.
char * protocol
Definition: rdkafka.h:3235
RD_EXPORT rd_kafka_message_t * rd_kafka_consume_queue(rd_kafka_queue_t *rkqu, int timeout_ms)
Consume from queue.
RD_EXPORT rd_kafka_resp_err_t rd_kafka_last_error(void)
Returns the last error code generated by a legacy API call in the current thread. ...
RD_EXPORT int rd_kafka_consume_start_queue(rd_kafka_topic_t *rkt, int32_t partition, int64_t offset, rd_kafka_queue_t *rkqu)
Same as rd_kafka_consume_start() but re-routes incoming messages to the provided queue rkqu (which mu...
RD_EXPORT rd_kafka_queue_t * rd_kafka_queue_new(rd_kafka_t *rk)
Create a new message queue.
RD_EXPORT rd_kafka_topic_t * rd_kafka_topic_new(rd_kafka_t *rk, const char *topic, rd_kafka_topic_conf_t *conf)
Creates a new topic handle for topic named topic.
Definition: rdkafka.h:805