confluent-kafka-dotnet
Show / Hide Table of Contents

Class Producer<TKey, TValue>

Implements a high-level Apache Kafka producer with key and value serialization.

Inheritance
System.Object
Producer<TKey, TValue>
Implements
ISerializingProducer<TKey, TValue>
IDisposable
Inherited Members
System.Object.ToString()
System.Object.Equals(System.Object)
System.Object.Equals(System.Object, System.Object)
System.Object.ReferenceEquals(System.Object, System.Object)
System.Object.GetHashCode()
System.Object.GetType()
System.Object.MemberwiseClone()
Namespace: Confluent.Kafka
Assembly: cs.temp.dll.dll
Syntax
public class Producer<TKey, TValue> : ISerializingProducer<TKey, TValue>, IDisposable
Type Parameters
Name Description
TKey
TValue

Constructors

Producer(IEnumerable<KeyValuePair<String, Object>>, ISerializer<TKey>, ISerializer<TValue>)

Initializes a new Producer instance.

Declaration
public Producer(IEnumerable<KeyValuePair<string, object>> config, ISerializer<TKey> keySerializer, ISerializer<TValue> valueSerializer)
Parameters
Type Name Description
IEnumerable<KeyValuePair<System.String, System.Object>> config

librdkafka configuration parameters (refer to https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md).

ISerializer<TKey> keySerializer

An ISerializer implementation instance that will be used to serialize keys.

ISerializer<TValue> valueSerializer

An ISerializer implementation instance that will be used to serialize values.

Properties

KeySerializer

Gets the ISerializer implementation instance used to serialize keys.

Declaration
public ISerializer<TKey> KeySerializer { get; }
Property Value
Type Description
ISerializer<TKey>

Name

Gets the name of this producer instance. Contains (but is not equal to) the client.id configuration parameter.

Declaration
public string Name { get; }
Property Value
Type Description
System.String
Remarks

This name will be unique across all producer instances in a given application which allows log messages to be associated with the corresponding instance.

ValueSerializer

Gets the ISerializer implementation instance used to serialize values.

Declaration
public ISerializer<TValue> ValueSerializer { get; }
Property Value
Type Description
ISerializer<TValue>

Methods

AddBrokers(String)

Adds one or more brokers to the Producer's list of initial bootstrap brokers.

Note: Additional brokers are discovered automatically as soon as the Producer connects to any broker by querying the broker metadata. Calling this method is only required in some scenarios where the address of all brokers in the cluster changes.

Declaration
public int AddBrokers(string brokers)
Parameters
Type Name Description
System.String brokers

Coma-separated list of brokers in the same format as the bootstrap.server configuration parameter.

Returns
Type Description
System.Int32

The number of brokers added. This value includes brokers that may have been specified a second time.

Remarks

There is currently no API to remove existing configured, added or learnt brokers.

Dispose()

Releases all resources used by this Producer.

Declaration
public void Dispose()
Remarks

You will often want to call Flush(Int32) before disposing a Producer instance.

Flush(Int32)

Wait until all outstanding produce requests and delievery report callbacks are completed.

[UNSTABLE-API] - the semantics and/or type of the return value is subject to change.

Declaration
public int Flush(int millisecondsTimeout)
Parameters
Type Name Description
System.Int32 millisecondsTimeout

The maximum time to block in milliseconds, or -1 to block indefinitely. You should typically use a relatively short timout period because this operation cannot be cancelled.

Returns
Type Description
System.Int32

The current librdkafka out queue length. This should be interpreted as a rough indication of the number of messages waiting to be sent to or acknowledged by the broker. If zero, there are no outstanding messages or callbacks. Specifically, the value is equal to the sum of the number of produced messages for which a delivery report has not yet been handled and a number which is less than or equal to the number of pending delivery report callback events (as determined by an internal librdkafka implementation detail).

Remarks

This method should typically be called prior to destroying a producer instance to make sure all queued and in-flight produce requests are completed before terminating. The wait time is bounded by the millisecondsTimeout parameter.

A related topic configuration parameter is message.timeout.ms which determines the maximum length of time librdkafka attempts to deliver a message before giving up and so also affects the maximum time a call to Flush may block.

Flush(TimeSpan)

Wait until all outstanding produce requests and delievery report callbacks are completed. Refer to Flush(Int32) for more information.

[UNSTABLE-API] - the semantics and/or type of the return value is subject to change.

Declaration
public int Flush(TimeSpan timeout)
Parameters
Type Name Description
TimeSpan timeout

The maximum length of time to block. You should typically use a relatively short timout period because this operation cannot be cancelled.

Returns
Type Description
System.Int32

The current librdkafka out queue length. Refer to Flush(Int32) for more information.

GetMetadata(Boolean, String)

Refer to Producer.GetMetadata(bool, string) for more information

[UNSTABLE-API] - The API associated with this functionality is subject to change.

Declaration
public Metadata GetMetadata(bool allTopics, string topic)
Parameters
Type Name Description
System.Boolean allTopics
System.String topic
Returns
Type Description
Metadata

GetMetadata(Boolean, String, TimeSpan)

Refer to Producer.GetMetadata(bool, string, TimeSpan) for more information.

[UNSTABLE-API] - The API associated with this functionality is subject to change.

Declaration
public Metadata GetMetadata(bool allTopics, string topic, TimeSpan timeout)
Parameters
Type Name Description
System.Boolean allTopics
System.String topic
TimeSpan timeout
Returns
Type Description
Metadata

ListGroup(String)

Get information pertaining to a particular group in the Kafka cluster (blocks, potentially indefinitely).

[UNSTABLE-API] - The API associated with this functionality is subject to change.

Declaration
public GroupInfo ListGroup(string group)
Parameters
Type Name Description
System.String group

The group of interest.

Returns
Type Description
GroupInfo

Returns information pertaining to the specified group or null if this group does not exist.

ListGroup(String, TimeSpan)

Get information pertaining to a particular group in the Kafka cluster (blocks)

[UNSTABLE-API] - The API associated with this functionality is subject to change.

Declaration
public GroupInfo ListGroup(string group, TimeSpan timeout)
Parameters
Type Name Description
System.String group

The group of interest.

TimeSpan timeout

The maximum period of time the call may block.

Returns
Type Description
GroupInfo

Returns information pertaining to the specified group or null if this group does not exist.

ListGroups(TimeSpan)

Get information pertaining to all groups in the Kafka cluster (blocking)

[UNSTABLE-API] - The API associated with this functionality is subject to change.

Declaration
public List<GroupInfo> ListGroups(TimeSpan timeout)
Parameters
Type Name Description
TimeSpan timeout

The maximum period of time the call may block.

Returns
Type Description
List<GroupInfo>

ProduceAsync(String, TKey, TValue)

Asynchronously send a single message to the broker. Refer to ProduceAsync(String, TKey, TValue, Int32, Boolean) for more information.

Declaration
public Task<Message<TKey, TValue>> ProduceAsync(string topic, TKey key, TValue val)
Parameters
Type Name Description
System.String topic
TKey key
TValue val
Returns
Type Description
Task<Message<TKey, TValue>>
Remarks

The partition the message is produced to is determined using the configured partitioner.

Blocks if the send queue is full. Warning: if background polling is disabled and Poll is not being called in another thread, this will block indefinitely.

ProduceAsync(String, TKey, TValue, IDeliveryHandler<TKey, TValue>)

Asynchronously send a single message to the broker (order of delivery reports strictly guarenteed). See ProduceAsync(String, TKey, TValue, Int32, Boolean, IDeliveryHandler<TKey, TValue>) for more information.

Declaration
public void ProduceAsync(string topic, TKey key, TValue val, IDeliveryHandler<TKey, TValue> deliveryHandler)
Parameters
Type Name Description
System.String topic
TKey key
TValue val
IDeliveryHandler<TKey, TValue> deliveryHandler
Remarks

The partition the message is produced to is determined using the configured partitioner.

Blocks if the send queue is full. Warning: if background polling is disabled and Poll is not being called in another thread, this will block indefinitely.

ProduceAsync(String, TKey, TValue, Boolean)

Asynchronously send a single message to the broker. Refer to ProduceAsync(String, TKey, TValue, Int32, Boolean) for more information.

Declaration
public Task<Message<TKey, TValue>> ProduceAsync(string topic, TKey key, TValue val, bool blockIfQueueFull)
Parameters
Type Name Description
System.String topic
TKey key
TValue val
System.Boolean blockIfQueueFull
Returns
Type Description
Task<Message<TKey, TValue>>
Remarks

The partition the message is produced to is determined using the configured partitioner.

ProduceAsync(String, TKey, TValue, Boolean, IDeliveryHandler<TKey, TValue>)

Asynchronously send a single message to the broker (order of delivery reports strictly guarenteed). See ProduceAsync(String, TKey, TValue, Int32, Boolean, IDeliveryHandler<TKey, TValue>) for more information.

Declaration
public void ProduceAsync(string topic, TKey key, TValue val, bool blockIfQueueFull, IDeliveryHandler<TKey, TValue> deliveryHandler)
Parameters
Type Name Description
System.String topic
TKey key
TValue val
System.Boolean blockIfQueueFull
IDeliveryHandler<TKey, TValue> deliveryHandler
Remarks

The partition the message is produced to is determined using the configured partitioner.

ProduceAsync(String, TKey, TValue, Int32)

Asynchronously send a single message to the broker. Refer to ProduceAsync(String, TKey, TValue, Int32, Boolean) for more information.

Declaration
public Task<Message<TKey, TValue>> ProduceAsync(string topic, TKey key, TValue val, int partition)
Parameters
Type Name Description
System.String topic
TKey key
TValue val
System.Int32 partition
Returns
Type Description
Task<Message<TKey, TValue>>
Remarks

Blocks if the send queue is full. Warning: if background polling is disabled and Poll is not being called in another thread, this will block indefinitely.

ProduceAsync(String, TKey, TValue, Int32, IDeliveryHandler<TKey, TValue>)

Asynchronously send a single message to the broker (order of delivery reports strictly guarenteed). See ProduceAsync(String, TKey, TValue, Int32, Boolean, IDeliveryHandler<TKey, TValue>) for more information.

Declaration
public void ProduceAsync(string topic, TKey key, TValue val, int partition, IDeliveryHandler<TKey, TValue> deliveryHandler)
Parameters
Type Name Description
System.String topic
TKey key
TValue val
System.Int32 partition
IDeliveryHandler<TKey, TValue> deliveryHandler
Remarks

Blocks if the send queue is full. Warning: if background polling is disabled and Poll is not being called in another thread, this will block indefinitely.

ProduceAsync(String, TKey, TValue, Int32, Boolean)

Asynchronously send a single message to the broker.

Declaration
public Task<Message<TKey, TValue>> ProduceAsync(string topic, TKey key, TValue val, int partition, bool blockIfQueueFull)
Parameters
Type Name Description
System.String topic

The target topic.

TKey key

the message key (possibly null if allowed by the key serializer).

TValue val

the message value (possibly null if allowed by the value serializer).

System.Int32 partition

The target partition (if -1, this is determined by the partitioner configured for the topic).

System.Boolean blockIfQueueFull

Whether or not to block if the send queue is full. If false, a KafkaExcepion (with Error.Code == ErrorCode.Local_QueueFull) will be thrown if an attempt is made to produce a message and the send queue is full.

Warning: blockIfQueueFull is set to true, background polling is disabled and Poll is not being called in another thread, this will block indefinitely.

Returns
Type Description
Task<Message<TKey, TValue>>

A Task which will complete with the corresponding delivery report for this request.

Remarks

If you require strict ordering of delivery reports to be maintained, you should use a variant of ProduceAsync that takes an IDeliveryHandler parameter, not a variant that returns a Task<Message> because Tasks are completed on arbitrary thread pool threads and can be executed out of order.

ProduceAsync(String, TKey, TValue, Int32, Boolean, IDeliveryHandler<TKey, TValue>)

Asynchronously send a single message to the broker (order of delivery reports strictly guarenteed).

Declaration
public void ProduceAsync(string topic, TKey key, TValue val, int partition, bool blockIfQueueFull, IDeliveryHandler<TKey, TValue> deliveryHandler)
Parameters
Type Name Description
System.String topic
TKey key
TValue val
System.Int32 partition
System.Boolean blockIfQueueFull
IDeliveryHandler<TKey, TValue> deliveryHandler
Remarks

Notification of delivery reports is via an IDeliveryHandler instance. Use IDeliveryHandler variants of ProduceAsync if you require notification of delivery reports strictly in the order they were acknowledged by the broker / failed (failure may be via broker or local). IDeliveryHandler.HandleDeliveryReport callbacks are executed on the Poll thread.

Refer to ProduceAsync(String, TKey, TValue, Int32, Boolean) for more information.

QueryWatermarkOffsets(TopicPartition)

Query the Kafka cluster for low (oldest/beginning) and high (newest/end) offsets for the specified topic/partition (blocks, potentialy indefinitely).

[UNSTABLE-API] - The API associated with this functionality is subject to change.

Declaration
public WatermarkOffsets QueryWatermarkOffsets(TopicPartition topicPartition)
Parameters
Type Name Description
TopicPartition topicPartition

The topic/partition of interest.

Returns
Type Description
WatermarkOffsets

The requested WatermarkOffsets.

QueryWatermarkOffsets(TopicPartition, TimeSpan)

Query the Kafka cluster for low (oldest/beginning) and high (newest/end) offsets for the specified topic/partition (blocking).

[UNSTABLE-API] - The API associated with this functionality is subject to change.

Declaration
public WatermarkOffsets QueryWatermarkOffsets(TopicPartition topicPartition, TimeSpan timeout)
Parameters
Type Name Description
TopicPartition topicPartition

The topic/partition of interest.

TimeSpan timeout

The maximum period of time the call may block.

Returns
Type Description
WatermarkOffsets

The requested WatermarkOffsets.

Events

OnError

Raised on critical errors, e.g. connection failures or all brokers down. Note that the client will try to automatically recover from errors - these errors should be seen as informational rather than catastrophic

Declaration
public event EventHandler<Error> OnError
Event Type
Type Description
EventHandler<Error>
Remarks

Called on the Producer poll thread.

OnLog

Raised when there is information that should be logged.

Declaration
public event EventHandler<LogMessage> OnLog
Event Type
Type Description
EventHandler<LogMessage>
Remarks

Note: By default not many log messages are generated.

You can specify one or more debug contexts using the 'debug' configuration property and a log level using the 'log_level' configuration property to enable more verbose logging, however you shouldn't typically need to do this.

Warning: Log handlers are called spontaneously from internal librdkafka threads and the application must not call any Confluent.Kafka APIs from within a log handler or perform any prolonged operations.

OnStatistics

Raised on librdkafka statistics events. JSON formatted string as defined here: https://github.com/edenhill/librdkafka/wiki/Statistics

Declaration
public event EventHandler<string> OnStatistics
Event Type
Type Description
EventHandler<System.String>
Remarks

You can enable statistics and set the statistics interval using the statistics.interval.ms configuration parameter (disabled by default).

Called on the Producer poll thread.

Implements

ISerializingProducer<TKey, TValue>
IDisposable