.. _dotnet_client:
|ak| |dotnet|
=============
Confluent develops and maintains **confluent-kafka-dotnet**, a .NET library that provides
a high-level ``Producer``, ``Consumer`` and ``AdminClient`` compatible with all |ak| brokers >= v0.8,
|ccloud| and |cp|.
.. _installation_dotnet_client:
|dotnet| Installation
---------------------
confluent-kafka-dotnet is made available via `NuGet `_. It's a
binding to the C client librdkafka, which is provided automatically via the dependent
`librdkafka.redist `__ package for a number
of popular platforms (win-x64, win-x86, debian-x64, rhel-x64 and osx).
To reference confluent-kafka-dotnet from within a Visual Studio project, run the following command in the
Package Manager Console:
.. codewithvars:: bash
PM> Install-Package Confluent.Kafka
.. note::
The dependent librdkafka.redist package will be installed automatically.
To reference confluent-kafka-dotnet in a .NET Core project, execute the following command in your project's directory:
.. sourcecode:: bash
> dotnet add package Confluent.Kafka
confluent-kafka-dotnet is compatible with the .NET Framework >= v4.5.1, .NET Core >= v1.0 (on Windows,
Linux and Mac) and .NET Standard >= v1.3. Mono is not supported.
In addition to the ``Confluent.Kafka`` package, we provide the ``Confluent.SchemaRegistry`` and
``Confluent.SchemaRegistry.Serdes`` packages for integration with |sr-long|. For more information, refer
to `Working With Apache Avro `__
in the repo README file.
.. _producer_dotnet_client:
Producer
--------
To create a .NET Producer, first construct an instance of the strongly typed ``ProducerConfig`` class,
then pass this into the ``ProducerBuilder``'s constructor:
.. sourcecode:: bash
using Confluent.Kafka;
using System.Net;
...
var config = new ProducerConfig
{
BootstrapServers: "host1:9092,host2:9092",
ClientId: Dns.GetHostName(),
...
};
using (var producer = new ProducerBuilder(config).Build())
{
...
}
To write messages to |ak|, use either the ``ProduceAsync`` or ``Produce`` method.
``ProduceAsync``
is very useful in highly concurrent scenarios, for example in ASP.NET request handlers:
.. sourcecode:: bash
...
await producer.ProduceAsync("weblog", new Message { Value="a log message" });
...
Unless your application is highly concurrent though, it's best to avoid synchronous execution like
the above as it will reduce maximum throughput enormously.
To asynchronously handle delivery result notifications, you can use ``Task.ContinueWith``:
.. sourcecode:: bash
var t = producer.ProduceAsync("topic", new Message { Value="hello world" });
t.ContinueWith(task => {
if (task.IsFaulted)
{
...
}
else
{
...
Console.WriteLine($"Wrote to offset: {task.Result.Offset}");
}
});
Or you can use the ``Produce`` method, which takes a delivery handler delegate as a parameter:
.. sourcecode:: bash
public static void handler(DeliveryReport)
{
...
}
public static process(...)
{
...
producer.Produce(
"my-topic", new Message { Value = "hello world" }, handler);
}
There are a couple of additional benefits of using the ``Produce`` method. First, notification
of message delivery (or failure) is strictly in the order of broker acknowledgement. With
``ProduceAsync``, this is not the case because ``Tasks`` may complete on any thread pool thread.
Second, ``Produce`` is more performant because there is unavoidable overhead in the higher
level ``Task`` based API.
.. _consumer_dotnet_client:
Consumer
--------
Initialization
~~~~~~~~~~~~~~
To create a .NET Consumer, first construct an instance of the strongly typed
``ConsumerConfig`` class, then pass this into the ``ConsumerBuilder``’s constructor:
.. sourcecode:: bash
using System.Collections.Generic;
using Confluent.Kafka;
...
var config = new ConsumerConfig
{
BootstrapServers = "host1:9092,host2:9092",
GroupId: "foo",
AutoOffsetReset: AutoOffsetReset.Earliest
};
using (var consumer = new ConsumerBuilder(config).Build())
{
...
}
The ``GroupId`` property is mandatory and specifies which consumer group the
consumer is a member of. The ``AutoOffsetReset`` property specifies what
offset the consumer should start reading from in the event there are no
committed offsets for a partition, or the committed offset is invalid
(perhaps due to log truncation).
The Consume Loop
^^^^^^^^^^^^^^^^
A typical Kafka consumer application is centered around a consume loop, which
repeatedly calls the ``Consume`` method to retrieve records one-by-one that
have been efficiently pre-fetched by the consumer in background threads. Before
entering the consume loop, you'll typically use the ``Subscribe`` method to
specify which topics should be fetched from:
.. sourcecode:: bash
...
using (var consumer = new ConsumerBuilder(config).Build())
{
consumer.Subscribe(topics);
while (!cancelled)
{
var consumeResult = consumer.Consume(cancellationToken);
// handle consumed message.
...
}
consumer.Close();
}
Note that disposing the consumer instance after you are finished
using it (achieved with the ``using`` block in the above example)
will ensure that active sockets are closed and internal state is
cleaned up. In order to leave the group cleanly - i.e. commit
final offsets and trigger a group rebalance which ensures that
any partitions owned by the consumer are re-assigned to other
members in the group in a timely fashion - you additionally need
to call the ``Close`` method prior to disposing.
Auto Offset Commit
^^^^^^^^^^^^^^^^^^
By default, the .NET Consumer will commit offsets automatically. This
is done periodically by a background thread at an interval specified
by the ``AutoCommitIntervalMs`` config property. An offset becomes
eligible to be committed immediately prior to being delivered to
the application via the ``Consume`` method.
This strategy introduces the potential for messages to be missed
in the case of application failure because the application may terminate
before it finishes processing a particular message, whilst the offset
corresponding to that message may be successfully committed to Kafka
by the background thread.
Furthermore, this strategy may also introduce duplicate
processing in the case of application failure since offsets are only
committed periodically.
Synchronous Commits
^^^^^^^^^^^^^^^^^^^
The C# client allows you to commit offsets explicitly via the
``Commit`` method. In the following example, a synchronous commit
is triggered every ``commitPeriod`` messages:
.. sourcecode:: bash
var config = new ConsumerConfig
{
...
// Disable auto-committing of offsets.
EnableAutoCommit = false
}
...
while (!cancelled)
{
var consumeResult = consumer.Consume(cancellationToken);
// process message here.
if (consumeResult.Offset % commitPeriod == 0)
{
try
{
consumer.Commit(consumeResult);
}
catch (KafkaException e)
{
Console.WriteLine($"Commit error: {e.Error.Reason}");
}
}
}
This approach gives "at least once" delivery semantics since the offset
corresponding to a message is only committed after the message has been
successfully processed.
If you reverse the order of the processing and commit, as well as commit
before every message (not just periodically), you will get "at most once"
delivery semantics.
.. note::
You should generally avoid blocking network calls (including synchronous
use of ``Commit``) because of the ramifications for throughput.
Store Offsets
^^^^^^^^^^^^^
The auto offset commit capability in the .NET Client is actually quite
flexible. As outlined above, by default, the offsets to be commited to
Kafka are updated immediately prior to the ``Consume`` method
deliverying messages to the application. However, you can prevent
this from happening by setting the ``EnableAutoOffsetStore`` config property
to ``false``. You can then use the ``StoreOffsets`` method to specify
the offsets you would like the background thread to commit, and you can call
this precisely when you want. This approach is preferred over the
synchronous commit approach outlined in the previous section.
The below example uses this approach to achieve at least once delivery
semantics without blocking the main processing loop:
.. sourcecode:: bash
var config = new ConsumerConfig
{
...
EnableAutoCommit = true // (the default)
EnableAutoOffsetStore = false
}
...
while (!cancelled)
{
var consumeResult = consumer.Consume(cancellationToken);
// process message here.
consumer.StoreOffset(consumeResult);
}