.. _c_cpp_client: |ak| |cplus| ============ |cp| includes librdkafka, a C/C++ library offering a producer and a consumer for |ak-tm|. .. _installation_c_cpp_client: |cplus| installation -------------------- The C/C++ client named librdkafka is available in source form and as precompiled binaries for Debian and Red Hat-based Linux distributions, and macOS. Most users will want to use the precompiled binaries. For Linux distributions, follow the instructions for :ref:`Debian ` or :ref:`Red Hat ` distributions to set up the repositories, then use ``yum`` or ``apt-get`` to install the appropriate :ref:`packages `. For example, a developer building a C application on a Red Hat-based distribution would use the ``librdkafka-devel`` package: .. codewithvars:: bash sudo yum install librdkafka-devel And on a Debian-based distribution they would use the ``librdkafka-dev`` package: .. codewithvars:: bash sudo apt-get install librdkafka-dev On macOS, the latest release is available via `Homebrew `_: .. codewithvars:: bash brew install librdkafka The source code is also available in the :ref:`ZIP and TAR archives ` under the directory ``src/``. .. _producer_c_cpp_client: Producer -------- Initialization ~~~~~~~~~~~~~~ .. sourcecode:: c char hostname[128]; char errstr[512]; rd_kafka_conf_t *conf = rd_kafka_conf_new(); if (gethostname(hostname, sizeof(hostname))) { fprintf(stderr, "%% Failed to lookup hostname\n"); exit(1); } if (rd_kafka_conf_set(conf, "client.id", hostname, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) { fprintf(stderr, "%% %s\n", errstr); exit(1); } if (rd_kafka_conf_set(conf, "bootstrap.servers", "host1:9092,host2:9092", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) { fprintf(stderr, "%% %s\n", errstr); exit(1); } rd_kafka_topic_conf_t *topic_conf = rd_kafka_topic_conf_new(); if (rd_kafka_topic_conf_set(topic_conf, "acks", "all", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) { fprintf(stderr, "%% %s\n", errstr); exit(1); } /* Create Kafka producer handle */ rd_kafka_t *rk; if (!(rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr)))) { fprintf(stderr, "%% Failed to create new producer: %s\n", errstr); exit(1); } librdkafka handles the errors for each setting directly. Asynchronous writes ~~~~~~~~~~~~~~~~~~~ With librdkafka, you first need to create a ``rd_kafka_topic_t`` handle for the topic you want to write to. Then you can use ``rd_kafka_produce`` to send messages to it. For example: .. sourcecode:: c rd_kafka_topic_t *rkt = rd_kafka_topic_new(rk, topic, topic_conf); if (rd_kafka_produce(rkt, RD_KAFKA_PARTITION_UA, RD_KAFKA_MSG_F_COPY, payload, payload_len, key, key_len, NULL) == -1) { fprintf(stderr, "%% Failed to produce to topic %s: %s\n", topic, rd_kafka_err2str(rd_kafka_errno2err(errno))); } You can pass topic-specific configuration in the third argument to ``rd_kafka_topic_new``. The previous example passed the ``topic_conf`` and seeded with a configuration for acknowledgments. Passing ``NULL`` will cause the producer to use the default configuration. The second argument to ``rd_kafka_produce`` can be used to set the desired partition for the message. If set to ``RD_KAFKA_PARTITION_UA``, as in this case, librdkafka will use the default partitioner to select the partition for this message. The third argument indicates that librdkafka should copy the payload and key, which would let us free it upon returning. If you want to invoke some code after the write has completed, you have to configure it on initialization: .. sourcecode:: c static void on_delivery(rd_kafka_t *rk, const rd_kafka_message_t *rkmessage void *opaque) { if (rkmessage->err) fprintf(stderr, "%% Message delivery failed: %s\n", rd_kafka_message_errstr(rkmessage)); } void init_rd_kafka() { rd_kafka_conf_t *conf = rd_kafka_conf_new(); rd_kafka_conf_set_dr_msg_cb(conf, on_delivery); // initialization omitted } The delivery callback in librdkafka is invoked in the user's thread by calling ``rd_kafka_poll``. A common pattern is to call this function after every call to the produce API, but this may not be sufficient to ensure regular delivery reports if the message produce rate is not steady. However, this API does not provide a direct way to block for the result of any particular message delivery. If you need to do this, then see the synchronous write example below. Synchronous writes ~~~~~~~~~~~~~~~~~~ Making writes synchronous is typically a bad idea since it kills throughput, so librdkafka is async by nature. It may be justified in some cases, but the implementation is not trivial. A full example can be found `here `_. .. _consumer_c_cpp_client: .. warning:: The default partitioner in Java producer uses the murmur2 hash function while the default partitioner in librdkafka uses crc32. Because of the different hash functions, a message produced by a Java client and a message produced by a librdkafka client may be assigned to different partitions even with the same partition key. Consumer -------- Initialization ~~~~~~~~~~~~~~ .. literalinclude:: consumer-initialization.c :language: c librdkafka handles configuration errors directly when setting properties. Basic usage ~~~~~~~~~~~ librdkafka uses a multi-threaded approach to |ak| consumption. The API returns only a single message or event at a time: .. literalinclude:: consumer-usage-1.c :language: c librdkafka does all fetching and coordinator communication in background threads. This frees you from the complication of tuning the session timeout according to the expected processing time. However, since the background thread will keep the consumer alive until the client has been closed, you must ensure that your process does not become a zombie since it will continue to hold on to assigned partitions in that case. .. important:: Partition rebalances also take place in a background thread, which means you still have to handle the potential for commit failures as the consumer may no longer have the same partition assignment when the commit begins. This is unnecessary if you enable autocommit since commit failures will be ignored silently, which also implies that you have no way to rollback processing. .. literalinclude:: consumer-usage-2.c :language: c |cplus| code examples ~~~~~~~~~~~~~~~~~~~~~~~~~ Basic poll loop ^^^^^^^^^^^^^^^ The consumer API is centered around the ``rd_kafka_consumer_poll`` method, which is used to retrieve records from the brokers. The ``rd_kafka_subscribe`` method controls which topics will be fetched in poll. Typically, consumer usage involves an initial call to ``rd_kafka_subscribe`` to set up the topics of interest and then a loop that calls ``rd_kafka_consumer_poll`` until the application is shut down. .. literalinclude:: consumer-detailed-example.c :language: c The poll timeout is hard-coded to 500 milliseconds. If no records are received before this timeout expires, then ``rd_kafka_consumer_poll`` will return an empty record set. You should always call ``rd_kafka_consumer_close`` after you are finished using the consumer. Doing so will ensure that active sockets are closed and internal state is cleaned up. It will also trigger a group rebalance immediately, which ensures that any partitions owned by the consumer are re-assigned to another member in the group. If not closed properly, the broker will trigger the rebalance only after the session timeout has expired. A latch is added to this example to ensure that the consumer has time to finish closing before finishing shut down. Synchronous commits ^^^^^^^^^^^^^^^^^^^ The simplest and most reliable way to manually commit offsets is using ``rd_kafka_commit`` for both synchronous and asynchronous commits. When committing, it's important to keep in mind that ``rd_kafka_consumer_poll`` returns single messages instead of batches. .. literalinclude:: consumer-synchronous-commits-1.c :language: c In this example, a synchronous commit is triggered every 1000 messages. The second argument to ``rd_kafka_commit`` is the list of offsets to be committed; if set to ``NULL``, librdkafka will commit the latest offsets for the assigned positions. The third argument in ``rd_kafka_commit`` is a flag which controls whether this call is asynchronous. You could also trigger the commit on expiration of a timeout to ensure there the committed position is updated regularly. Delivery guarantees ^^^^^^^^^^^^^^^^^^^ In the previous example, you get "at least once" delivery because the commit follows the message processing. By changing the order, however, you can get "at most once" delivery, but you must be a little careful with the commit failure. .. literalinclude:: consumer-synchronous-commits-2.c :language: c For simplicity in this example, ``rd_kafka_commit_message`` is used prior to processing the message. Committing on every message would produce a lot of overhead in practice. A better approach would be to collect a batch of messages, execute the synchronous commit, and then process the messages only if the commit succeeded. Asynchronous Commits ^^^^^^^^^^^^^^^^^^^^ .. literalinclude:: consumer-asynchronous-commits-1.c :language: c In this example, the consumer sends the request and returns immediately by using asynchronous commits. This is achieved by enabling asynchronous commit in the call to ``rd_kafka_commit``. The API gives you a callback that is invoked when the commit either succeeds or fails. In librdkafka, this is configured on initialization: .. literalinclude:: consumer-asynchronous-commits-2.c :language: c