Important
You are viewing documentation for an older version of Confluent Platform. For the latest, click here.
Kafka .NET Client¶
Confluent develops and maintains confluent-kafka-dotnet, a .NET library that provides
a high-level Producer
, Consumer
and AdminClient
compatible with all Kafka brokers >= v0.8,
Confluent Cloud and Confluent Platform.
For a step-by-step guide on building a .NET Client client application for Kafka, see Getting Started with Apache Kafka and .NET on Confluent Developer.
.NET Client Installation¶
confluent-kafka-dotnet is made available via NuGet. It’s a binding to the C client librdkafka, which is provided automatically via the dependent librdkafka.redist package for a number of popular platforms (win-x64, win-x86, debian-x64, rhel-x64 and osx).
To reference confluent-kafka-dotnet from within a Visual Studio project, run the following command in the Package Manager Console:
PM> Install-Package Confluent.Kafka
Note
The dependent librdkafka.redist package will be installed automatically.
To reference confluent-kafka-dotnet in a .NET Core project, execute the following command in your project’s directory:
> dotnet add package Confluent.Kafka
confluent-kafka-dotnet is compatible with the .NET Framework >= v4.5.1, .NET Core >= v1.0 (on Windows, Linux and Mac) and .NET Standard >= v1.3. Mono is not supported.
In addition to the Confluent.Kafka
package, we provide the Confluent.SchemaRegistry
and
Confluent.SchemaRegistry.Serdes
packages for integration with Confluent Schema Registry. For more information, refer
to Working With Apache Avro
in the repo README file.
.NET Client example code¶
For Hello World examples of Kafka clients in .NET, see .NET. All examples include a producer and consumer that can connect to any Kafka cluster running on-premises or in Confluent Cloud.
Producer¶
To create a .NET Producer, first construct an instance of the strongly typed ProducerConfig
class,
then pass this into the ProducerBuilder
’s constructor:
using Confluent.Kafka;
using System.Net;
...
var config = new ProducerConfig
{
BootstrapServers = "host1:9092,host2:9092",
ClientId = Dns.GetHostName(),
...
};
using (var producer = new ProducerBuilder<Null, string>(config).Build())
{
...
}
To write messages to Kafka, use either the ProduceAsync
or Produce
method.
ProduceAsync
is very useful in highly concurrent scenarios, for example in ASP.NET request handlers:
...
await producer.ProduceAsync("weblog", new Message<Null, string> { Value="a log message" });
...
Unless your application is highly concurrent though, it’s best to avoid synchronous execution like the above as it will reduce maximum throughput enormously.
To asynchronously handle delivery result notifications, you can use Task.ContinueWith
:
var t = producer.ProduceAsync("topic", new Message<Null, string> { Value="hello world" });
t.ContinueWith(task => {
if (task.IsFaulted)
{
...
}
else
{
...
Console.WriteLine($"Wrote to offset: {task.Result.Offset}");
}
});
Or you can use the Produce
method, which takes a delivery handler delegate as a parameter:
public static void handler(DeliveryReport<Null, string>)
{
...
}
public static process(...)
{
...
producer.Produce(
"my-topic", new Message<Null, string> { Value = "hello world" }, handler);
}
There are a couple of additional benefits of using the Produce
method. First, notification
of message delivery (or failure) is strictly in the order of broker acknowledgement. With
ProduceAsync
, this is not the case because Tasks
may complete on any thread pool thread.
Second, Produce
is more performant because there is unavoidable overhead in the higher
level Task
based API.
Consumer¶
Initialization¶
To create a .NET Consumer, first construct an instance of the strongly typed
ConsumerConfig
class, then pass this into the ConsumerBuilder
’s constructor:
using System.Collections.Generic;
using Confluent.Kafka;
...
var config = new ConsumerConfig
{
BootstrapServers = "host1:9092,host2:9092",
GroupId = "foo",
AutoOffsetReset = AutoOffsetReset.Earliest
};
using (var consumer = new ConsumerBuilder<Ignore, string>(config).Build())
{
...
}
The GroupId
property is mandatory and specifies which consumer group the
consumer is a member of. The AutoOffsetReset
property specifies what
offset the consumer should start reading from in the event there are no
committed offsets for a partition, or the committed offset is invalid
(perhaps due to log truncation).
The Consume Loop¶
A typical Kafka consumer application is centered around a consume loop, which
repeatedly calls the Consume
method to retrieve records one-by-one that
have been efficiently pre-fetched by the consumer in background threads. Before
entering the consume loop, you’ll typically use the Subscribe
method to
specify which topics should be fetched from:
...
using (var consumer = new ConsumerBuilder<Ignore, string>(config).Build())
{
consumer.Subscribe(topics);
while (!cancelled)
{
var consumeResult = consumer.Consume(cancellationToken);
// handle consumed message.
...
}
consumer.Close();
}
Note that disposing the consumer instance after you are finished
using it (achieved with the using
block in the above example)
will ensure that active sockets are closed and internal state is
cleaned up. In order to leave the group cleanly - i.e. commit
final offsets and trigger a group rebalance which ensures that
any partitions owned by the consumer are re-assigned to other
members in the group in a timely fashion - you additionally need
to call the Close
method prior to disposing.
Auto Offset Commit¶
By default, the .NET Consumer will commit offsets automatically. This
is done periodically by a background thread at an interval specified
by the AutoCommitIntervalMs
config property. An offset becomes
eligible to be committed immediately prior to being delivered to
the application via the Consume
method.
This strategy introduces the potential for messages to be missed in the case of application failure because the application may terminate before it finishes processing a particular message, whilst the offset corresponding to that message may be successfully committed to Kafka by the background thread.
Furthermore, this strategy may also introduce duplicate processing in the case of application failure since offsets are only committed periodically.
Synchronous Commits¶
The C# client allows you to commit offsets explicitly via the
Commit
method. In the following example, a synchronous commit
is triggered every commitPeriod
messages:
var config = new ConsumerConfig
{
...
// Disable auto-committing of offsets.
EnableAutoCommit = false
}
...
while (!cancelled)
{
var consumeResult = consumer.Consume(cancellationToken);
// process message here.
if (consumeResult.Offset % commitPeriod == 0)
{
try
{
consumer.Commit(consumeResult);
}
catch (KafkaException e)
{
Console.WriteLine($"Commit error: {e.Error.Reason}");
}
}
}
This approach gives “at least once” delivery semantics since the offset corresponding to a message is only committed after the message has been successfully processed.
If you reverse the order of the processing and commit, as well as commit before every message (not just periodically), you will get “at most once” delivery semantics.
Note
You should generally avoid blocking network calls (including synchronous
use of Commit
) because of the ramifications for throughput.
Store Offsets¶
The auto offset commit capability in the .NET Client is actually quite
flexible. As outlined above, by default, the offsets to be commited to
Kafka are updated immediately prior to the Consume
method
deliverying messages to the application. However, you can prevent
this from happening by setting the EnableAutoOffsetStore
config property
to false
. You can then use the StoreOffsets
method to specify
the offsets you would like the background thread to commit, and you can call
this precisely when you want. This approach is preferred over the
synchronous commit approach outlined in the previous section.
The below example uses this approach to achieve at least once delivery semantics without blocking the main processing loop:
var config = new ConsumerConfig
{
...
EnableAutoCommit = true // (the default)
EnableAutoOffsetStore = false
}
...
while (!cancelled)
{
var consumeResult = consumer.Consume(cancellationToken);
// process message here.
consumer.StoreOffset(consumeResult);
}
Suggested resource¶
For a step-by-step guide on building a .NET Client client application for Kafka, see Getting Started with Apache Kafka and .NET on Confluent Developer.
Learn about the design details of Kafka .NET API in Designing the .NET API for Apache Kafka.