.. _dotnet_client: |ak| |dotnet| ============= Confluent develops and maintains **confluent-kafka-dotnet**, a .NET library that provides a high-level ``Producer``, ``Consumer`` and ``AdminClient`` compatible with all |ak| brokers >= v0.8, |ccloud| and |cp|. .. _installation_dotnet_client: |dotnet| Installation --------------------- confluent-kafka-dotnet is made available via `NuGet `_. It's a binding to the C client librdkafka, which is provided automatically via the dependent `librdkafka.redist `__ package for a number of popular platforms (win-x64, win-x86, debian-x64, rhel-x64 and osx). To reference confluent-kafka-dotnet from within a Visual Studio project, run the following command in the Package Manager Console: .. codewithvars:: bash PM> Install-Package Confluent.Kafka .. note:: The dependent librdkafka.redist package will be installed automatically. To reference confluent-kafka-dotnet in a .NET Core project, execute the following command in your project's directory: .. sourcecode:: bash > dotnet add package Confluent.Kafka confluent-kafka-dotnet is compatible with the .NET Framework >= v4.5.1, .NET Core >= v1.0 (on Windows, Linux and Mac) and .NET Standard >= v1.3. Mono is not supported. In addition to the ``Confluent.Kafka`` package, we provide the ``Confluent.SchemaRegistry`` and ``Confluent.SchemaRegistry.Serdes`` packages for integration with |sr-long|. For more information, refer to `Working With Apache Avro `__ in the repo README file. .. _producer_dotnet_client: Producer -------- To create a .NET Producer, first construct an instance of the strongly typed ``ProducerConfig`` class, then pass this into the ``ProducerBuilder``'s constructor: .. sourcecode:: bash using Confluent.Kafka; using System.Net; ... var config = new ProducerConfig { BootstrapServers: "host1:9092,host2:9092", ClientId: Dns.GetHostName(), ... }; using (var producer = new ProducerBuilder(config).Build()) { ... } To write messages to |ak|, use either the ``ProduceAsync`` or ``Produce`` method. ``ProduceAsync`` is very useful in highly concurrent scenarios, for example in ASP.NET request handlers: .. sourcecode:: bash ... await producer.ProduceAsync("weblog", new Message { Value="a log message" }); ... Unless your application is highly concurrent though, it's best to avoid synchronous execution like the above as it will reduce maximum throughput enormously. To asynchronously handle delivery result notifications, you can use ``Task.ContinueWith``: .. sourcecode:: bash var t = producer.ProduceAsync("topic", new Message { Value="hello world" }); t.ContinueWith(task => { if (task.IsFaulted) { ... } else { ... Console.WriteLine($"Wrote to offset: {task.Result.Offset}"); } }); Or you can use the ``Produce`` method, which takes a delivery handler delegate as a parameter: .. sourcecode:: bash public static void handler(DeliveryReport) { ... } public static process(...) { ... producer.Produce( "my-topic", new Message { Value = "hello world" }, handler); } There are a couple of additional benefits of using the ``Produce`` method. First, notification of message delivery (or failure) is strictly in the order of broker acknowledgement. With ``ProduceAsync``, this is not the case because ``Tasks`` may complete on any thread pool thread. Second, ``Produce`` is more performant because there is unavoidable overhead in the higher level ``Task`` based API. .. _consumer_dotnet_client: Consumer -------- Initialization ~~~~~~~~~~~~~~ To create a .NET Consumer, first construct an instance of the strongly typed ``ConsumerConfig`` class, then pass this into the ``ConsumerBuilder``’s constructor: .. sourcecode:: bash using System.Collections.Generic; using Confluent.Kafka; ... var config = new ConsumerConfig { BootstrapServers = "host1:9092,host2:9092", GroupId: "foo", AutoOffsetReset: AutoOffsetReset.Earliest }; using (var consumer = new ConsumerBuilder(config).Build()) { ... } The ``GroupId`` property is mandatory and specifies which consumer group the consumer is a member of. The ``AutoOffsetReset`` property specifies what offset the consumer should start reading from in the event there are no committed offsets for a partition, or the committed offset is invalid (perhaps due to log truncation). The Consume Loop ^^^^^^^^^^^^^^^^ A typical Kafka consumer application is centered around a consume loop, which repeatedly calls the ``Consume`` method to retrieve records one-by-one that have been efficiently pre-fetched by the consumer in background threads. Before entering the consume loop, you'll typically use the ``Subscribe`` method to specify which topics should be fetched from: .. sourcecode:: bash ... using (var consumer = new ConsumerBuilder(config).Build()) { consumer.Subscribe(topics); while (!cancelled) { var consumeResult = consumer.Consume(cancellationToken); // handle consumed message. ... } consumer.Close(); } Note that disposing the consumer instance after you are finished using it (achieved with the ``using`` block in the above example) will ensure that active sockets are closed and internal state is cleaned up. In order to leave the group cleanly - i.e. commit final offsets and trigger a group rebalance which ensures that any partitions owned by the consumer are re-assigned to other members in the group in a timely fashion - you additionally need to call the ``Close`` method prior to disposing. Auto Offset Commit ^^^^^^^^^^^^^^^^^^ By default, the .NET Consumer will commit offsets automatically. This is done periodically by a background thread at an interval specified by the ``AutoCommitIntervalMs`` config property. An offset becomes eligible to be committed immediately prior to being delivered to the application via the ``Consume`` method. This strategy introduces the potential for messages to be missed in the case of application failure because the application may terminate before it finishes processing a particular message, whilst the offset corresponding to that message may be successfully committed to Kafka by the background thread. Furthermore, this strategy may also introduce duplicate processing in the case of application failure since offsets are only committed periodically. Synchronous Commits ^^^^^^^^^^^^^^^^^^^ The C# client allows you to commit offsets explicitly via the ``Commit`` method. In the following example, a synchronous commit is triggered every ``commitPeriod`` messages: .. sourcecode:: bash var config = new ConsumerConfig { ... // Disable auto-committing of offsets. EnableAutoCommit = false } ... while (!cancelled) { var consumeResult = consumer.Consume(cancellationToken); // process message here. if (consumeResult.Offset % commitPeriod == 0) { try { consumer.Commit(consumeResult); } catch (KafkaException e) { Console.WriteLine($"Commit error: {e.Error.Reason}"); } } } This approach gives "at least once" delivery semantics since the offset corresponding to a message is only committed after the message has been successfully processed. If you reverse the order of the processing and commit, as well as commit before every message (not just periodically), you will get "at most once" delivery semantics. .. note:: You should generally avoid blocking network calls (including synchronous use of ``Commit``) because of the ramifications for throughput. Store Offsets ^^^^^^^^^^^^^ The auto offset commit capability in the .NET Client is actually quite flexible. As outlined above, by default, the offsets to be commited to Kafka are updated immediately prior to the ``Consume`` method deliverying messages to the application. However, you can prevent this from happening by setting the ``EnableAutoOffsetStore`` config property to ``false``. You can then use the ``StoreOffsets`` method to specify the offsets you would like the background thread to commit, and you can call this precisely when you want. This approach is preferred over the synchronous commit approach outlined in the previous section. The below example uses this approach to achieve at least once delivery semantics without blocking the main processing loop: .. sourcecode:: bash var config = new ConsumerConfig { ... EnableAutoCommit = true // (the default) EnableAutoOffsetStore = false } ... while (!cancelled) { var consumeResult = consumer.Consume(cancellationToken); // process message here. consumer.StoreOffset(consumeResult); }