Kafka C++ Client

Confluent Platform includes librdkafka, a C/C++ library offering a producer and a consumer for Apache Kafka®.

C++ Client installation

The C/C++ client named librdkafka is available in source form and as precompiled binaries for Debian and Red Hat-based Linux distributions, and macOS. Most users will want to use the precompiled binaries.

For Linux distributions, follow the instructions for Debian or Red Hat distributions to set up the repositories, then use yum or apt-get to install the appropriate packages. For example, a developer building a C application on a Red Hat-based distribution would use the librdkafka-devel package:

sudo yum install librdkafka-devel

And on a Debian-based distribution they would use the librdkafka-dev package:

sudo apt-get install librdkafka-dev

On macOS, the latest release is available via Homebrew:

brew install librdkafka

The source code is also available in the ZIP and TAR archives under the directory src/.

Producer

Initialization

char hostname[128];
char errstr[512];

rd_kafka_conf_t *conf = rd_kafka_conf_new();

if (gethostname(hostname, sizeof(hostname))) {
  fprintf(stderr, "%% Failed to lookup hostname\n");
  exit(1);
}

if (rd_kafka_conf_set(conf, "client.id", hostname,
                      errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
  fprintf(stderr, "%% %s\n", errstr);
  exit(1);
}

if (rd_kafka_conf_set(conf, "bootstrap.servers", "host1:9092,host2:9092",
                      errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
  fprintf(stderr, "%% %s\n", errstr);
  exit(1);
}

rd_kafka_topic_conf_t *topic_conf = rd_kafka_topic_conf_new();

if (rd_kafka_topic_conf_set(topic_conf, "acks", "all",
                      errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
  fprintf(stderr, "%% %s\n", errstr);
  exit(1);
}

/* Create Kafka producer handle */
rd_kafka_t *rk;
if (!(rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf,
                        errstr, sizeof(errstr)))) {
  fprintf(stderr, "%% Failed to create new producer: %s\n", errstr);
  exit(1);
}

Librdkafka handles the errors for each setting directly.

Asynchronous writes

With librdkafka, you first need to create a rd_kafka_topic_t handle for the topic you want to write to. Then you can use rd_kafka_produce to send messages to it. For example:

rd_kafka_topic_t *rkt = rd_kafka_topic_new(rk, topic, topic_conf);

if (rd_kafka_produce(rkt, RD_KAFKA_PARTITION_UA,
                     RD_KAFKA_MSG_F_COPY,
         payload, payload_len,
                     key, key_len,
   NULL) == -1) {
  fprintf(stderr, "%% Failed to produce to topic %s: %s\n",
 topic, rd_kafka_err2str(rd_kafka_errno2err(errno)));
}

You can pass topic-specific configuration in the third argument to rd_kafka_topic_new. The previous example passed the topic_conf and seeded with a configuration for acknowledgments. Passing NULL will cause the producer to use the default configuration.

The second argument to rd_kafka_produce can be used to set the desired partition for the message. If set to RD_KAFKA_PARTITION_UA, as in this case, librdkafka will use the default partitioner to select the partition for this message. The third argument indicates that librdkafka should copy the payload and key, which would let us free it upon returning.

If you want to invoke some code after the write has completed, you have to configure it on initialization:

static void on_delivery(rd_kafka_t *rk,
                        const rd_kafka_message_t *rkmessage
                        void *opaque) {
  if (rkmessage->err)
    fprintf(stderr, "%% Message delivery failed: %s\n",
            rd_kafka_message_errstr(rkmessage));
}

void init_rd_kafka() {
  rd_kafka_conf_t *conf = rd_kafka_conf_new();
  rd_kafka_conf_set_dr_msg_cb(conf, on_delivery);

  // initialization omitted
}

The delivery callback in librdkafka is invoked in the user’s thread by calling rd_kafka_poll. A common pattern is to call this function after every call to the produce API, but this may not be sufficient to ensure regular delivery reports if the message produce rate is not steady. However, this API does not provide a direct way to block for the result of any particular message delivery. If you need to do this, then see the synchronous write example below.

Synchronous writes

Making writes synchronous is typically a bad idea since it kills throughput, so librdkafka is async by nature. It may be justified in some cases, but the implementation is not trivial.

A full example can be found here.

Consumer

Initialization

char hostname[128];
char errstr[512];

rd_kafka_conf_t *conf = rd_kafka_conf_new();

if (gethostname(hostname, sizeof(hostname))) {
 fprintf(stderr, "%% Failed to lookup hostname\n");
 exit(1);
}

if (rd_kafka_conf_set(conf, "client.id", hostname,
                     errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
 fprintf(stderr, "%% %s\n", errstr);
 exit(1);
}

if (rd_kafka_conf_set(conf, "group.id", "foo",
                     errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
 fprintf(stderr, "%% %s\n", errstr);
 exit(1);
}

if (rd_kafka_conf_set(conf, "bootstrap.servers", "host1:9092,host2:9092",
                     errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
 fprintf(stderr, "%% %s\n", errstr);
 exit(1);
}

/* Create Kafka consumer handle */
rd_kafka_t *rk;
if (!(rk = rd_kafka_new(RD_KAFKA_CONSUMER, conf,
                       errstr, sizeof(errstr)))) {
 fprintf(stderr, "%% Failed to create new consumer: %s\n", errstr);
 exit(1);
}

Librdkafka handles configuration errors directly when setting properties.

Basic usage

Librdkafka uses a multi-threaded approach to Kafka consumption. The API returns only a single message or event at a time:

while (running) {
 rd_kafka_message_t *rkmessage = rd_kafka_consumer_poll(rk, 500);
 if (rkmessage) {
   msg_process(rkmessage);
   rd_kafka_message_destroy(rkmessage);

   if ((++msg_count % MIN_COMMIT_COUNT) == 0)
     rd_kafka_commit(rk, NULL, 0);
 }
}

Librdkafka does all fetching and coordinator communication in background threads. This frees you from the complication of tuning the session timeout according to the expected processing time. However, since the background thread will keep the consumer alive until the client has been closed, you must ensure that your process does not become a zombie since it will continue to hold on to assigned partitions in that case.

Important

Partition rebalances also take place in a background thread, which means you still have to handle the potential for commit failures as the consumer may no longer have the same partition assignment when the commit begins. This is unnecessary if you enable autocommit since commit failures will be ignored silently, which also implies that you have no way to rollback processing.

while (running) {
  rd_kafka_message_t *rkmessage = rd_kafka_consumer_poll(rk, 1000);
  if (!rkmessage)
    continue; // timeout: no message

  msg_process(rkmessage); // application-specific processing
  rd_kafka_message_destroy(rkmessage);

  if ((++msg_count % MIN_COMMIT_COUNT) == 0) {
    rd_kafka_resp_err_t err = rd_kafka_commit(rk, NULL, 0);
    if (err) {
      // application-specific rollback of processed records
    }
  }
}

C++ Client code examples

Basic poll loop

The consumer API is centered around the rd_kafka_consumer_poll method, which is used to retrieve records from the brokers. The rd_kafka_subscribe method controls which topics will be fetched in poll. Typically, consumer usage involves an initial call to rd_kafka_subscribe to set up the topics of interest and then a loop that calls rd_kafka_consumer_poll until the application is shut down.

static int shutdown = 0;
static void msg_process(rd_kafka_message_t message);

void basic_consume_loop(rd_kafka_t *rk,
                        rd_kafka_topic_partition_list_t *topics) {
  rd_kafka_resp_err_t err;

  if ((err = rd_kafka_subscribe(rk, topics))) {
    fprintf(stderr, "%% Failed to start consuming topics: %s\n", rd_kafka_err2str(err));
    exit(1);
  }

  while (running) {
    rd_kafka_message_t *rkmessage = rd_kafka_consumer_poll(rk, 500);
    if (rkmessage) {
      msg_process(rkmessage);
      rd_kafka_message_destroy(rkmessage);
    }
  }

  err = rd_kafka_consumer_close(rk);
  if (err)
    fprintf(stderr, "%% Failed to close consumer: %s\n", rd_kafka_err2str(err));
  else
    fprintf(stderr, "%% Consumer closed\n");
}

The poll timeout is hard-coded to 500 milliseconds. If no records are received before this timeout expires, then rd_kafka_consumer_poll will return an empty record set.

You should always call rd_kafka_consumer_close after you are finished using the consumer. Doing so will ensure that active sockets are closed and internal state is cleaned up. It will also trigger a group rebalance immediately, which ensures that any partitions owned by the consumer are re-assigned to another member in the group. If not closed properly, the broker will trigger the rebalance only after the session timeout has expired. A latch is added to this example to ensure that the consumer has time to finish closing before finishing shut down.

Synchronous commits

The simplest and most reliable way to manually commit offsets is using rd_kafka_commit for both synchronous and asynchronous commits. When committing, it’s important to keep in mind that rd_kafka_consumer_poll returns single messages instead of batches.

void consume_loop(rd_kafka_t *rk,
                  rd_kafka_topic_partition_list_t *topics) {
  static const int MIN_COMMIT_COUNT = 1000;

  int msg_count = 0;
  rd_kafka_resp_err_t err;

  if ((err = rd_kafka_subscribe(rk, topics))) {
    fprintf(stderr, "%% Failed to start consuming topics: %s\n", rd_kafka_err2str(err));
    exit(1);
  }

  while (running) {
    rd_kafka_message_t *rkmessage = rd_kafka_consumer_poll(rk, 500);
    if (rkmessage) {
      msg_process(rkmessage);
      rd_kafka_message_destroy(rkmessage);

      if ((++msg_count % MIN_COMMIT_COUNT) == 0)
        rd_kafka_commit(rk, NULL, 0);
    }
 }

  err = rd_kafka_consumer_close(rk);
  if (err)
    fprintf(stderr, "%% Failed to close consumer: %s\n", rd_kafka_err2str(err));
  else
    fprintf(stderr, "%% Consumer closed\n");
}

In this example, a synchronous commit is triggered every 1000 messages. The second argument to rd_kafka_commit is the list of offsets to be committed; if set to NULL, librdkafka will commit the latest offsets for the assigned positions. The third argument in rd_kafka_commit is a flag which controls whether this call is asynchronous. You could also trigger the commit on expiration of a timeout to ensure there the committed position is updated regularly.

Delivery guarantees

In the previous example, you get “at least once” delivery because the commit follows the message processing. By changing the order, however, you can get “at most once” delivery, but you must be a little careful with the commit failure.

void consume_loop(rd_kafka_t *rk,
                  rd_kafka_topic_partition_list_t *topics) {
  rd_kafka_resp_err_t err;

  if ((err = rd_kafka_subscribe(rk, topics))) {
    fprintf(stderr, "%% Failed to start consuming topics: %s\n", rd_kafka_err2str(err));
    exit(1);
  }

  while (running) {
    rd_kafka_message_t *rkmessage = rd_kafka_consumer_poll(rk, 500);
    if (rkmessage && !rd_kafka_commit_message(rk, rkmessage, 0)) {
      msg_process(rkmessage);
      rd_kafka_message_destroy(rkmessage);
    }
  }

  err = rd_kafka_consumer_close(rk);
  if (err)
    fprintf(stderr, "%% Failed to close consumer: %s\n", rd_kafka_err2str(err));
  else
    fprintf(stderr, "%% Consumer closed\n");
}

For simplicity in this example, rd_kafka_commit_message is used prior to processing the message. Committing on every message would produce a lot of overhead in practice. A better approach would be to collect a batch of messages, execute the synchronous commit, and then process the messages only if the commit succeeded.

Asynchronous Commits

void consume_loop(rd_kafka_t *rk,
                  rd_kafka_topic_partition_list_t *topics) {
  static const int MIN_COMMIT_COUNT = 1000;

  int msg_count = 0;
  rd_kafka_resp_err_t err;

  if ((err = rd_kafka_subscribe(rk, topics))) {
    fprintf(stderr, "%% Failed to start consuming topics: %s\n", rd_kafka_err2str(err));
    exit(1);
  }

  while (running) {
    rd_kafka_message_t *rkmessage = rd_kafka_consumer_poll(rk, 500);
    if (rkmessage) {
      msg_process(rkmessage);
      rd_kafka_message_destroy(rkmessage);

      if ((++msg_count % MIN_COMMIT_COUNT) == 0)
        rd_kafka_commit(rk, NULL, 1);
    }
  }

  err = rd_kafka_consumer_close(rk);
  if (err)
    fprintf(stderr, "%% Failed to close consumer: %s\n", rd_kafka_err2str(err));
  else
    fprintf(stderr, "%% Consumer closed\n");
}

In this example, the consumer sends the request and returns immediately by using asynchronous commits. This is achieved by enabling asynchronous commit in the call to rd_kafka_commit.

The API gives you a callback that is invoked when the commit either succeeds or fails. In librdkafka, this is configured on initialization:

static void on_commit(rd_kafka_t *rk,
                      rd_kafka_resp_err_t err,
                      rd_kafka_topic_partition_list_t *offsets,
                      void *opaque) {
  if (err)
    fprintf(stderr, "%% Failed to commit offsets: %s\n", rd_kafka_err2str(err));
}

void init_rd_kafka() {
  rd_kafka_conf_t *conf = rd_kafka_conf_new();
  rd_kafka_conf_set_offset_commit_cb(conf, on_commit);

  // initialization omitted
}