Important

You are viewing documentation for an older version of Confluent Platform. For the latest, click here.

Kafka .NET Client

Confluent develops and maintains confluent-kafka-dotnet, a .NET library that provides a high-level Producer, Consumer and AdminClient compatible with all Kafka brokers >= v0.8, Confluent Cloud and Confluent Platform.

.NET Client Installation

confluent-kafka-dotnet is made available via NuGet. It’s a binding to the C client librdkafka, which is provided automatically via the dependent librdkafka.redist package for a number of popular platforms (win-x64, win-x86, debian-x64, rhel-x64 and osx).

To reference confluent-kafka-dotnet from within a Visual Studio project, run the following command in the Package Manager Console:

PM> Install-Package Confluent.Kafka

Note

The dependent librdkafka.redist package will be installed automatically.

To reference confluent-kafka-dotnet in a .NET Core project, execute the following command in your project’s directory:

> dotnet add package Confluent.Kafka

confluent-kafka-dotnet is compatible with the .NET Framework >= v4.5.1, .NET Core >= v1.0 (on Windows, Linux and Mac) and .NET Standard >= v1.3. Mono is not supported.

In addition to the Confluent.Kafka package, we provide the Confluent.SchemaRegistry and Confluent.SchemaRegistry.Serdes packages for integration with Confluent Schema Registry. For more information, refer to Working With Apache Avro in the repo README file.

Producer

To create a .NET Producer, first construct an instance of the strongly typed ProducerConfig class, then pass this into the ProducerBuilder’s constructor:

using Confluent.Kafka;
using System.Net;

...

var config = new ProducerConfig
{
    BootstrapServers: "host1:9092,host2:9092",
    ClientId: Dns.GetHostName(),
    ...
};


using (var producer = new ProducerBuilder<Null, string>(config).Build())
{
    ...
}

To write messages to Kafka, use either the ProduceAsync or Produce method.

ProduceAsync is very useful in highly concurrent scenarios, for example in ASP.NET request handlers:

...

await producer.ProduceAsync("weblog", new Message<Null, string> { Value="a log message" });

...

Unless your application is highly concurrent though, it’s best to avoid synchronous execution like the above as it will reduce maximum throughput enormously.

To asynchronously handle delivery result notifications, you can use Task.ContinueWith:

var t = producer.ProduceAsync("topic", new Message<Null, string> { Value="hello world" });
t.ContinueWith(task => {
    if (task.IsFaulted)
    {
        ...
    }
    else
    {
        ...

        Console.WriteLine($"Wrote to offset: {task.Result.Offset}");
    }
});

Or you can use the Produce method, which takes a delivery handler delegate as a parameter:

public static void handler(DeliveryReport<Null, string>)
{
    ...
}

public static process(...)
{
    ...
    producer.Produce(
        "my-topic", new Message<Null, string> { Value = "hello world" }, handler);
}

There are a couple of additional benefits of using the Produce method. First, notification of message delivery (or failure) is strictly in the order of broker acknowledgement. With ProduceAsync, this is not the case because Tasks may complete on any thread pool thread. Second, Produce is more performant because there is unavoidable overhead in the higher level Task based API.

Consumer

Initialization

To create a .NET Consumer, first construct an instance of the strongly typed ConsumerConfig class, then pass this into the ConsumerBuilder’s constructor:

using System.Collections.Generic;
using Confluent.Kafka;

...

var config = new ConsumerConfig
{
    BootstrapServers = "host1:9092,host2:9092",
    GroupId: "foo",
    AutoOffsetReset: AutoOffsetReset.Earliest
};

using (var consumer = new ConsumerBuilder<Ignore, string>(config).Build())
{
    ...
}

The GroupId property is mandatory and specifies which consumer group the consumer is a member of. The AutoOffsetReset property specifies what offset the consumer should start reading from in the event there are no committed offsets for a partition, or the committed offset is invalid (perhaps due to log truncation).

The Consume Loop

A typical Kafka consumer application is centered around a consume loop, which repeatedly calls the Consume method to retrieve records one-by-one that have been efficiently pre-fetched by the consumer in background threads. Before entering the consume loop, you’ll typically use the Subscribe method to specify which topics should be fetched from:

...

using (var consumer = new ConsumerBuilder<Ignore, string>(config).Build())
{
    consumer.Subscribe(topics);

    while (!cancelled)
    {
        var consumeResult = consumer.Consume(cancellationToken);

        // handle consumed message.
        ...
    }

    consumer.Close();
}

Note that disposing the consumer instance after you are finished using it (achieved with the using block in the above example) will ensure that active sockets are closed and internal state is cleaned up. In order to leave the group cleanly - i.e. commit final offsets and trigger a group rebalance which ensures that any partitions owned by the consumer are re-assigned to other members in the group in a timely fashion - you additionally need to call the Close method prior to disposing.

Auto Offset Commit

By default, the .NET Consumer will commit offsets automatically. This is done periodically by a background thread at an interval specified by the AutoCommitIntervalMs config property. An offset becomes eligible to be committed immediately prior to being delivered to the application via the Consume method.

This strategy introduces the potential for messages to be missed in the case of application failure because the application may terminate before it finishes processing a particular message, whilst the offset corresponding to that message may be successfully committed to Kafka by the background thread.

Furthermore, this strategy may also introduce duplicate processing in the case of application failure since offsets are only committed periodically.

Synchronous Commits

The C# client allows you to commit offsets explicitly via the Commit method. In the following example, a synchronous commit is triggered every commitPeriod messages:

var config = new ConsumerConfig
{
    ...
    // Disable auto-committing of offsets.
    EnableAutoCommit = false
}

...

while (!cancelled)
{
    var consumeResult = consumer.Consume(cancellationToken);

    // process message here.

    if (consumeResult.Offset % commitPeriod == 0)
    {
        try
        {
            consumer.Commit(consumeResult);
        }
        catch (KafkaException e)
        {
            Console.WriteLine($"Commit error: {e.Error.Reason}");
        }
    }
}

This approach gives “at least once” delivery semantics since the offset corresponding to a message is only committed after the message has been successfully processed.

If you reverse the order of the processing and commit, as well as commit before every message (not just periodically), you will get “at most once” delivery semantics.

Note

You should generally avoid blocking network calls (including synchronous use of Commit) because of the ramifications for throughput.

Store Offsets

The auto offset commit capability in the .NET Client is actually quite flexible. As outlined above, by default, the offsets to be commited to Kafka are updated immediately prior to the Consume method deliverying messages to the application. However, you can prevent this from happening by setting the EnableAutoOffsetStore config property to false. You can then use the StoreOffsets method to specify the offsets you would like the background thread to commit, and you can call this precisely when you want. This approach is preferred over the synchronous commit approach outlined in the previous section.

The below example uses this approach to achieve at least once delivery semantics without blocking the main processing loop:

var config = new ConsumerConfig
{
    ...
    EnableAutoCommit = true // (the default)
    EnableAutoOffsetStore = false
}

...

while (!cancelled)
{
    var consumeResult = consumer.Consume(cancellationToken);

    // process message here.

    consumer.StoreOffset(consumeResult);
}