Important
You are viewing documentation for an older version of Confluent Platform. For the latest, click here.
Kafka C++ Client¶
Confluent Platform includes librdkafka, a C/C++ library offering a producer and a consumer for Apache Kafka®.
C++ Client installation¶
The C/C++ client named librdkafka is available in source form and as precompiled binaries for Debian and Red Hat-based Linux distributions, and macOS. Most users will want to use the precompiled binaries.
For Linux distributions, follow the instructions for Debian or Red Hat distributions to set up the repositories, then use yum
or apt-get
to install the appropriate packages. For example, a developer building a C application on a Red Hat-based
distribution would use the librdkafka-devel
package:
sudo yum install librdkafka-devel
And on a Debian-based distribution they would use the librdkafka-dev
package:
sudo apt-get install librdkafka-dev
On macOS, the latest release is available via Homebrew:
brew install librdkafka
The source code is also available in the ZIP and TAR archives under the directory src/
.
C++ Client example code¶
For Hello World examples of Kafka clients in C/C++, see C. All examples include a producer and consumer that can connect to any Kafka cluster running on-premises or in Confluent Cloud.
Producer¶
Initialization¶
char hostname[128];
char errstr[512];
rd_kafka_conf_t *conf = rd_kafka_conf_new();
if (gethostname(hostname, sizeof(hostname))) {
fprintf(stderr, "%% Failed to lookup hostname\n");
exit(1);
}
if (rd_kafka_conf_set(conf, "client.id", hostname,
errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
fprintf(stderr, "%% %s\n", errstr);
exit(1);
}
if (rd_kafka_conf_set(conf, "bootstrap.servers", "host1:9092,host2:9092",
errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
fprintf(stderr, "%% %s\n", errstr);
exit(1);
}
rd_kafka_topic_conf_t *topic_conf = rd_kafka_topic_conf_new();
if (rd_kafka_topic_conf_set(topic_conf, "acks", "all",
errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
fprintf(stderr, "%% %s\n", errstr);
exit(1);
}
/* Create Kafka producer handle */
rd_kafka_t *rk;
if (!(rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf,
errstr, sizeof(errstr)))) {
fprintf(stderr, "%% Failed to create new producer: %s\n", errstr);
exit(1);
}
librdkafka handles the errors for each setting directly.
Asynchronous writes¶
With librdkafka, you first need to create a rd_kafka_topic_t
handle for the topic you want to write to. Then you can use
rd_kafka_produce
to send messages to it. For example:
rd_kafka_topic_t *rkt = rd_kafka_topic_new(rk, topic, topic_conf);
if (rd_kafka_produce(rkt, RD_KAFKA_PARTITION_UA,
RD_KAFKA_MSG_F_COPY,
payload, payload_len,
key, key_len,
NULL) == -1) {
fprintf(stderr, "%% Failed to produce to topic %s: %s\n",
topic, rd_kafka_err2str(rd_kafka_errno2err(errno)));
}
You can pass topic-specific configuration in the third argument to
rd_kafka_topic_new
. The previous example passed the topic_conf
and seeded with a configuration for acknowledgments. Passing NULL
will cause
the producer to use the default configuration.
The second argument to rd_kafka_produce
can be used to set the
desired partition for the message. If set to RD_KAFKA_PARTITION_UA
,
as in this case, librdkafka will use the default partitioner to select
the partition for this message. The third argument indicates that
librdkafka should copy the payload and key, which would let us free it
upon returning.
If you want to invoke some code after the write has completed, you have to configure it on initialization:
static void on_delivery(rd_kafka_t *rk,
const rd_kafka_message_t *rkmessage
void *opaque) {
if (rkmessage->err)
fprintf(stderr, "%% Message delivery failed: %s\n",
rd_kafka_message_errstr(rkmessage));
}
void init_rd_kafka() {
rd_kafka_conf_t *conf = rd_kafka_conf_new();
rd_kafka_conf_set_dr_msg_cb(conf, on_delivery);
// initialization omitted
}
The delivery callback in librdkafka is invoked in the user’s thread by
calling rd_kafka_poll
. A common pattern is to call this function
after every call to the produce API, but this may not be sufficient to
ensure regular delivery reports if the message produce rate is not
steady. However, this API does not provide a direct way to
block for the result of any particular message delivery. If you need
to do this, then see the synchronous write example below.
Synchronous writes¶
Making writes synchronous is typically a bad idea since it kills throughput, so librdkafka is async by nature. It may be justified in some cases, but the implementation is not trivial.
A full example can be found here.
Warning
The default partitioner in Java producer uses the murmur2 hash function while the default partitioner in librdkafka uses crc32. Because of the different hash functions, a message produced by a Java client and a message produced by a librdkafka client may be assigned to different partitions even with the same partition key.
Consumer¶
Initialization¶
char hostname[128];
char errstr[512];
rd_kafka_conf_t *conf = rd_kafka_conf_new();
if (gethostname(hostname, sizeof(hostname))) {
fprintf(stderr, "%% Failed to lookup hostname\n");
exit(1);
}
if (rd_kafka_conf_set(conf, "client.id", hostname,
errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
fprintf(stderr, "%% %s\n", errstr);
exit(1);
}
if (rd_kafka_conf_set(conf, "group.id", "foo",
errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
fprintf(stderr, "%% %s\n", errstr);
exit(1);
}
if (rd_kafka_conf_set(conf, "bootstrap.servers", "host1:9092,host2:9092",
errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
fprintf(stderr, "%% %s\n", errstr);
exit(1);
}
/* Create Kafka consumer handle */
rd_kafka_t *rk;
if (!(rk = rd_kafka_new(RD_KAFKA_CONSUMER, conf,
errstr, sizeof(errstr)))) {
fprintf(stderr, "%% Failed to create new consumer: %s\n", errstr);
exit(1);
}
librdkafka handles configuration errors directly when setting properties.
Basic usage¶
librdkafka uses a multi-threaded approach to Kafka consumption. The API returns only a single message or event at a time:
while (running) {
rd_kafka_message_t *rkmessage = rd_kafka_consumer_poll(rk, 500);
if (rkmessage) {
msg_process(rkmessage);
rd_kafka_message_destroy(rkmessage);
if ((++msg_count % MIN_COMMIT_COUNT) == 0)
rd_kafka_commit(rk, NULL, 0);
}
}
librdkafka does all fetching and coordinator communication in background threads. This frees you from the complication of tuning the session timeout according to the expected processing time. However, since the background thread will keep the consumer alive until the client has been closed, you must ensure that your process does not become a zombie since it will continue to hold on to assigned partitions in that case.
Important
Partition rebalances also take place in a background thread, which means you still have to handle the potential for commit failures as the consumer may no longer have the same partition assignment when the commit begins. This is unnecessary if you enable autocommit since commit failures will be ignored silently, which also implies that you have no way to rollback processing.
while (running) {
rd_kafka_message_t *rkmessage = rd_kafka_consumer_poll(rk, 1000);
if (!rkmessage)
continue; // timeout: no message
msg_process(rkmessage); // application-specific processing
rd_kafka_message_destroy(rkmessage);
if ((++msg_count % MIN_COMMIT_COUNT) == 0) {
rd_kafka_resp_err_t err = rd_kafka_commit(rk, NULL, 0);
if (err) {
// application-specific rollback of processed records
}
}
}
C++ Client code examples¶
Basic poll loop¶
The consumer API is centered around the rd_kafka_consumer_poll
method, which is
used to retrieve records from the brokers. The rd_kafka_subscribe
method
controls which topics will be fetched in poll. Typically, consumer
usage involves an initial call to rd_kafka_subscribe
to set up the topics
of interest and then a loop that calls rd_kafka_consumer_poll
until the
application is shut down.
static int shutdown = 0;
static void msg_process(rd_kafka_message_t message);
void basic_consume_loop(rd_kafka_t *rk,
rd_kafka_topic_partition_list_t *topics) {
rd_kafka_resp_err_t err;
if ((err = rd_kafka_subscribe(rk, topics))) {
fprintf(stderr, "%% Failed to start consuming topics: %s\n", rd_kafka_err2str(err));
exit(1);
}
while (running) {
rd_kafka_message_t *rkmessage = rd_kafka_consumer_poll(rk, 500);
if (rkmessage) {
msg_process(rkmessage);
rd_kafka_message_destroy(rkmessage);
}
}
err = rd_kafka_consumer_close(rk);
if (err)
fprintf(stderr, "%% Failed to close consumer: %s\n", rd_kafka_err2str(err));
else
fprintf(stderr, "%% Consumer closed\n");
}
The poll timeout is hard-coded to 500 milliseconds. If no records
are received before this timeout expires, then rd_kafka_consumer_poll
will return
an empty record set.
You should always call rd_kafka_consumer_close
after you are finished
using the consumer. Doing so will ensure that active sockets are
closed and internal state is cleaned up. It will also trigger a group
rebalance immediately, which ensures that any partitions owned by the
consumer are re-assigned to another member in the group. If not closed
properly, the broker will trigger the rebalance only after the session
timeout has expired. A latch is added to this example to ensure
that the consumer has time to finish closing before finishing
shut down.
Synchronous commits¶
The simplest and most
reliable way to manually commit offsets is using rd_kafka_commit
for both synchronous and asynchronous commits. When committing, it’s
important to keep in mind that rd_kafka_consumer_poll
returns
single messages instead of batches.
void consume_loop(rd_kafka_t *rk,
rd_kafka_topic_partition_list_t *topics) {
static const int MIN_COMMIT_COUNT = 1000;
int msg_count = 0;
rd_kafka_resp_err_t err;
if ((err = rd_kafka_subscribe(rk, topics))) {
fprintf(stderr, "%% Failed to start consuming topics: %s\n", rd_kafka_err2str(err));
exit(1);
}
while (running) {
rd_kafka_message_t *rkmessage = rd_kafka_consumer_poll(rk, 500);
if (rkmessage) {
msg_process(rkmessage);
rd_kafka_message_destroy(rkmessage);
if ((++msg_count % MIN_COMMIT_COUNT) == 0)
rd_kafka_commit(rk, NULL, 0);
}
}
err = rd_kafka_consumer_close(rk);
if (err)
fprintf(stderr, "%% Failed to close consumer: %s\n", rd_kafka_err2str(err));
else
fprintf(stderr, "%% Consumer closed\n");
}
In this example, a synchronous commit is triggered every 1000
messages. The second argument to rd_kafka_commit
is the list of
offsets to be committed; if set to NULL
, librdkafka will commit
the latest offsets for the assigned positions. The third argument in
rd_kafka_commit
is a flag which controls whether this call is
asynchronous. You could also trigger the commit on expiration of a
timeout to ensure there the committed position is updated regularly.
Delivery guarantees¶
In the previous example, you get “at least once” delivery because the commit follows the message processing. By changing the order, however, you can get “at most once” delivery, but you must be a little careful with the commit failure.
void consume_loop(rd_kafka_t *rk,
rd_kafka_topic_partition_list_t *topics) {
rd_kafka_resp_err_t err;
if ((err = rd_kafka_subscribe(rk, topics))) {
fprintf(stderr, "%% Failed to start consuming topics: %s\n", rd_kafka_err2str(err));
exit(1);
}
while (running) {
rd_kafka_message_t *rkmessage = rd_kafka_consumer_poll(rk, 500);
if (rkmessage && !rd_kafka_commit_message(rk, rkmessage, 0)) {
msg_process(rkmessage);
rd_kafka_message_destroy(rkmessage);
}
}
err = rd_kafka_consumer_close(rk);
if (err)
fprintf(stderr, "%% Failed to close consumer: %s\n", rd_kafka_err2str(err));
else
fprintf(stderr, "%% Consumer closed\n");
}
For simplicity in this example, rd_kafka_commit_message
is used
prior to processing the message. Committing on every message would
produce a lot of overhead in practice. A better approach would be to
collect a batch of messages, execute the synchronous commit, and then
process the messages only if the commit succeeded.
Asynchronous Commits¶
void consume_loop(rd_kafka_t *rk,
rd_kafka_topic_partition_list_t *topics) {
static const int MIN_COMMIT_COUNT = 1000;
int msg_count = 0;
rd_kafka_resp_err_t err;
if ((err = rd_kafka_subscribe(rk, topics))) {
fprintf(stderr, "%% Failed to start consuming topics: %s\n", rd_kafka_err2str(err));
exit(1);
}
while (running) {
rd_kafka_message_t *rkmessage = rd_kafka_consumer_poll(rk, 500);
if (rkmessage) {
msg_process(rkmessage);
rd_kafka_message_destroy(rkmessage);
if ((++msg_count % MIN_COMMIT_COUNT) == 0)
rd_kafka_commit(rk, NULL, 1);
}
}
err = rd_kafka_consumer_close(rk);
if (err)
fprintf(stderr, "%% Failed to close consumer: %s\n", rd_kafka_err2str(err));
else
fprintf(stderr, "%% Consumer closed\n");
}
In this example, the consumer sends the request and returns
immediately by using asynchronous commits. This is achieved by enabling asynchronous
commit in the call to rd_kafka_commit
.
The API gives you a callback that is invoked when the commit either succeeds or fails. In librdkafka, this is configured on initialization:
static void on_commit(rd_kafka_t *rk,
rd_kafka_resp_err_t err,
rd_kafka_topic_partition_list_t *offsets,
void *opaque) {
if (err)
fprintf(stderr, "%% Failed to commit offsets: %s\n", rd_kafka_err2str(err));
}
void init_rd_kafka() {
rd_kafka_conf_t *conf = rd_kafka_conf_new();
rd_kafka_conf_set_offset_commit_cb(conf, on_commit);
// initialization omitted
}