confluent-kafka-dotnet
Show / Hide Table of Contents

Class ProducerBuilder<TKey, TValue>

A builder class for IProducer<TKey, TValue>.

Inheritance
object
ProducerBuilder<TKey, TValue>
Inherited Members
object.Equals(object)
object.Equals(object, object)
object.GetHashCode()
object.GetType()
object.MemberwiseClone()
object.ReferenceEquals(object, object)
object.ToString()
Namespace: Confluent.Kafka
Assembly: Confluent.Kafka.dll
Syntax
public class ProducerBuilder<TKey, TValue>
Type Parameters
Name Description
TKey
TValue

Constructors

ProducerBuilder(IEnumerable<KeyValuePair<string, string>>)

A collection of librdkafka configuration parameters (refer to https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md) and parameters specific to this client (refer to: ConfigPropertyNames). At a minimum, 'bootstrap.servers' must be specified.

Declaration
public ProducerBuilder(IEnumerable<KeyValuePair<string, string>> config)
Parameters
Type Name Description
IEnumerable<KeyValuePair<string, string>> config

Properties

AsyncKeySerializer

The configured async key serializer.

Declaration
protected IAsyncSerializer<TKey> AsyncKeySerializer { get; set; }
Property Value
Type Description
IAsyncSerializer<TKey>

AsyncValueSerializer

The configured async value serializer.

Declaration
protected IAsyncSerializer<TValue> AsyncValueSerializer { get; set; }
Property Value
Type Description
IAsyncSerializer<TValue>

Config

The config dictionary.

Declaration
protected IEnumerable<KeyValuePair<string, string>> Config { get; set; }
Property Value
Type Description
IEnumerable<KeyValuePair<string, string>>

DefaultPartitioner

The default custom partitioner.

Declaration
protected PartitionerDelegate DefaultPartitioner { get; set; }
Property Value
Type Description
PartitionerDelegate

ErrorHandler

The configured error handler.

Declaration
protected Action<IProducer<TKey, TValue>, Error> ErrorHandler { get; set; }
Property Value
Type Description
Action<IProducer<TKey, TValue>, Error>

KeySerializer

The configured key serializer.

Declaration
protected ISerializer<TKey> KeySerializer { get; set; }
Property Value
Type Description
ISerializer<TKey>

LogHandler

The configured log handler.

Declaration
protected Action<IProducer<TKey, TValue>, LogMessage> LogHandler { get; set; }
Property Value
Type Description
Action<IProducer<TKey, TValue>, LogMessage>

OAuthBearerTokenRefreshHandler

The configured OAuthBearer Token Refresh handler.

Declaration
protected Action<IProducer<TKey, TValue>, string> OAuthBearerTokenRefreshHandler { get; set; }
Property Value
Type Description
Action<IProducer<TKey, TValue>, string>

Partitioners

The per-topic custom partitioners.

Declaration
protected Dictionary<string, PartitionerDelegate> Partitioners { get; set; }
Property Value
Type Description
Dictionary<string, PartitionerDelegate>

StatisticsHandler

The configured statistics handler.

Declaration
protected Action<IProducer<TKey, TValue>, string> StatisticsHandler { get; set; }
Property Value
Type Description
Action<IProducer<TKey, TValue>, string>

ValueSerializer

The configured value serializer.

Declaration
protected ISerializer<TValue> ValueSerializer { get; set; }
Property Value
Type Description
ISerializer<TValue>

Methods

Build()

Build a new IProducer implementation instance.

Declaration
public virtual IProducer<TKey, TValue> Build()
Returns
Type Description
IProducer<TKey, TValue>

SetDefaultPartitioner(PartitionerDelegate)

Set a custom partitioner that will be used for all topics except those for which a partitioner has been explicitly configured.

Declaration
public ProducerBuilder<TKey, TValue> SetDefaultPartitioner(PartitionerDelegate partitioner)
Parameters
Type Name Description
PartitionerDelegate partitioner
Returns
Type Description
ProducerBuilder<TKey, TValue>

SetErrorHandler(Action<IProducer<TKey, TValue>, Error>)

Set the handler to call on error events e.g. connection failures or all brokers down. Note that the client will try to automatically recover from errors that are not marked as fatal. Non-fatal errors should be interpreted as informational rather than catastrophic.

Declaration
public ProducerBuilder<TKey, TValue> SetErrorHandler(Action<IProducer<TKey, TValue>, Error> errorHandler)
Parameters
Type Name Description
Action<IProducer<TKey, TValue>, Error> errorHandler
Returns
Type Description
ProducerBuilder<TKey, TValue>
Remarks

Executes on the poll thread (by default, a background thread managed by the producer).

Exceptions: Any exception thrown by your error handler will be silently ignored.

SetKeySerializer(IAsyncSerializer<TKey>)

The serializer to use to serialize keys.

Declaration
public ProducerBuilder<TKey, TValue> SetKeySerializer(IAsyncSerializer<TKey> serializer)
Parameters
Type Name Description
IAsyncSerializer<TKey> serializer
Returns
Type Description
ProducerBuilder<TKey, TValue>
Remarks

If your key serializer throws an exception, this will be wrapped in a ProduceException with ErrorCode Local_KeySerialization and thrown by the initiating call to Produce or ProduceAsync.

SetKeySerializer(ISerializer<TKey>)

The serializer to use to serialize keys.

Declaration
public ProducerBuilder<TKey, TValue> SetKeySerializer(ISerializer<TKey> serializer)
Parameters
Type Name Description
ISerializer<TKey> serializer
Returns
Type Description
ProducerBuilder<TKey, TValue>
Remarks

If your key serializer throws an exception, this will be wrapped in a ProduceException with ErrorCode Local_KeySerialization and thrown by the initiating call to Produce or ProduceAsync.

SetLogHandler(Action<IProducer<TKey, TValue>, LogMessage>)

Set the handler to call when there is information available to be logged. If not specified, a default callback that writes to stderr will be used.

Declaration
public ProducerBuilder<TKey, TValue> SetLogHandler(Action<IProducer<TKey, TValue>, LogMessage> logHandler)
Parameters
Type Name Description
Action<IProducer<TKey, TValue>, LogMessage> logHandler
Returns
Type Description
ProducerBuilder<TKey, TValue>
Remarks

By default not many log messages are generated.

For more verbose logging, specify one or more debug contexts using the Debug configuration property.

Warning: Log handlers are called spontaneously from internal librdkafka threads and the application must not call any Confluent.Kafka APIs from within a log handler or perform any prolonged operations.

Exceptions: Any exception thrown by your log handler will be silently ignored.

SetOAuthBearerTokenRefreshHandler(Action<IProducer<TKey, TValue>, string>)

Set SASL/OAUTHBEARER token refresh callback in provided conf object. The SASL/OAUTHBEARER token refresh callback is triggered via Poll(TimeSpan) whenever OAUTHBEARER is the SASL mechanism and a token needs to be retrieved, typically based on the configuration defined in sasl.oauthbearer.config. The callback should invoke OAuthBearerSetToken(IClient, string, long, string, IDictionary<string, string>) or OAuthBearerSetTokenFailure(IClient, string) to indicate success or failure, respectively.

An unsecured JWT refresh handler is provided by librdkafka for development and testing purposes, it is enabled by setting the enable.sasl.oauthbearer.unsecure.jwt property to true and is mutually exclusive to using a refresh callback.

Declaration
public ProducerBuilder<TKey, TValue> SetOAuthBearerTokenRefreshHandler(Action<IProducer<TKey, TValue>, string> oAuthBearerTokenRefreshHandler)
Parameters
Type Name Description
Action<IProducer<TKey, TValue>, string> oAuthBearerTokenRefreshHandler

the callback to set; callback function arguments: IConsumer - instance of the consumer which should be used to set token or token failure string - Value of configuration property sasl.oauthbearer.config

Returns
Type Description
ProducerBuilder<TKey, TValue>

SetPartitioner(string, PartitionerDelegate)

Set a custom partitioner to use when producing messages to topic.

Declaration
public ProducerBuilder<TKey, TValue> SetPartitioner(string topic, PartitionerDelegate partitioner)
Parameters
Type Name Description
string topic
PartitionerDelegate partitioner
Returns
Type Description
ProducerBuilder<TKey, TValue>

SetStatisticsHandler(Action<IProducer<TKey, TValue>, string>)

Set the handler to call on statistics events. Statistics are provided as a JSON formatted string as defined here: https://github.com/edenhill/librdkafka/blob/master/STATISTICS.md

Declaration
public ProducerBuilder<TKey, TValue> SetStatisticsHandler(Action<IProducer<TKey, TValue>, string> statisticsHandler)
Parameters
Type Name Description
Action<IProducer<TKey, TValue>, string> statisticsHandler
Returns
Type Description
ProducerBuilder<TKey, TValue>
Remarks

You can enable statistics and set the statistics interval using the StatisticsIntervalMs configuration property (disabled by default).

Executes on the poll thread (by default, a background thread managed by the producer).

Exceptions: Any exception thrown by your statistics handler will be devivered to your error handler, if set, else they will be silently ignored.

SetValueSerializer(IAsyncSerializer<TValue>)

The serializer to use to serialize values.

Declaration
public ProducerBuilder<TKey, TValue> SetValueSerializer(IAsyncSerializer<TValue> serializer)
Parameters
Type Name Description
IAsyncSerializer<TValue> serializer
Returns
Type Description
ProducerBuilder<TKey, TValue>
Remarks

If your value serializer throws an exception, this will be wrapped in a ProduceException with ErrorCode Local_ValueSerialization and thrown by the initiating call to Produce or ProduceAsync.

SetValueSerializer(ISerializer<TValue>)

The serializer to use to serialize values.

Declaration
public ProducerBuilder<TKey, TValue> SetValueSerializer(ISerializer<TValue> serializer)
Parameters
Type Name Description
ISerializer<TValue> serializer
Returns
Type Description
ProducerBuilder<TKey, TValue>
Remarks

If your value serializer throws an exception, this will be wrapped in a ProduceException with ErrorCode Local_ValueSerialization and thrown by the initiating call to Produce or ProduceAsync.

In this article