confluent-kafka-dotnet
Show / Hide Table of Contents

Class Producer

Implements a high-level Apache Kafka producer (without serialization).

Inheritance
System.Object
Producer
Namespace: Confluent.Kafka
Assembly: cs.temp.dll.dll
Syntax
public class Producer : IDisposable

Constructors

Producer(IEnumerable<KeyValuePair<String, Object>>)

Initializes a new Producer instance.

Declaration
public Producer(IEnumerable<KeyValuePair<string, object>> config)
Parameters
Type Name Description
IEnumerable<KeyValuePair<System.String, System.Object>> config

librdkafka configuration parameters (refer to https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md).

Producer(IEnumerable<KeyValuePair<String, Object>>, Boolean, Boolean)

Initializes a new Producer instance.

Declaration
public Producer(IEnumerable<KeyValuePair<string, object>> config, bool manualPoll, bool disableDeliveryReports)
Parameters
Type Name Description
IEnumerable<KeyValuePair<System.String, System.Object>> config

librdkafka configuration parameters (refer to https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md).

System.Boolean manualPoll

If true, does not start a dedicated polling thread to trigger events or receive delivery reports - you must call the Poll method periodically instead. Typically you should set this parameter to false.

System.Boolean disableDeliveryReports

If true, disables delivery report notification. Note: if set to true and you use a ProduceAsync variant that returns a Task, the Tasks will never complete. Typically you should set this parameter to false. Set it to true for "fire and forget" semantics and a small boost in performance.

Properties

Name

Gets the name of this producer instance. Contains (but is not equal to) the client.id configuration parameter.

Declaration
public string Name { get; }
Property Value
Type Description
System.String
Remarks

This name will be unique across all producer instances in a given application which allows log messages to be associated with the corresponding instance.

Methods

AddBrokers(String)

Adds one or more brokers to the Producer's list of initial bootstrap brokers.

Note: Additional brokers are discovered automatically as soon as the Producer connects to any broker by querying the broker metadata. Calling this method is only required in some scenarios where the address of all brokers in the cluster changes.

Declaration
public int AddBrokers(string brokers)
Parameters
Type Name Description
System.String brokers

Coma-separated list of brokers in the same format as the bootstrap.server configuration parameter.

Returns
Type Description
System.Int32

The number of brokers added. This value includes brokers that may have been specified a second time.

Remarks

There is currently no API to remove existing configured, added or learnt brokers.

Dispose()

Releases all resources used by this Producer.

Declaration
public void Dispose()
Remarks

You will often want to call Flush() before disposing a Producer instance.

Flush()

Equivalent to Flush(Int32) with infinite timeout.

[UNSTABLE-API] - the semantics and/or type of the return value is subject to change.

Declaration
public int Flush()
Returns
Type Description
System.Int32

Refer to Flush(Int32).

Flush(Int32)

Wait until all outstanding produce requests and delievery report callbacks are completed.

[UNSTABLE-API] - the semantics and/or type of the return value is subject to change.

Declaration
public int Flush(int millisecondsTimeout)
Parameters
Type Name Description
System.Int32 millisecondsTimeout

The maximum time to block in milliseconds or -1 to block indefinitely. You should typically use a relatively short timout period because this operation cannot be cancelled.

Returns
Type Description
System.Int32

The current librdkafka out queue length. This should be interpreted as a rough indication of the number of messages waiting to be sent to or acknowledged by the broker. If zero, there are no outstanding messages or callbacks. Specifically, the value is equal to the sum of the number of produced messages for which a delivery report has not yet been handled and a number which is less than or equal to the number of pending delivery report callback events (as determined by an internal librdkafka implementation detail).

Remarks

This method should typically be called prior to destroying a producer instance to make sure all queued and in-flight produce requests are completed before terminating. The wait time is bounded by the millisecondsTimeout parameter.

A related configuration parameter is message.timeout.ms which determines the maximum length of time librdkafka attempts to deliver a message before giving up and so also affects the maximum time a call to Flush may block.

Flush(TimeSpan)

Wait until all outstanding produce requests and delievery report callbacks are completed. Refer to Flush(Int32) for more information.

[UNSTABLE-API] - the semantics and/or type of the return value is subject to change.

Declaration
public int Flush(TimeSpan timeout)
Parameters
Type Name Description
TimeSpan timeout

The maximum length of time to block. You should typically use a relatively short timout period because this operation cannot be cancelled.

Returns
Type Description
System.Int32

The current librdkafka out queue length. Refer to Flush(Int32) for more information.

GetMetadata()

Refer to GetMetadata(Boolean, String, TimeSpan)

[UNSTABLE-API] - The API associated with this functionality is subject to change.

Declaration
public Metadata GetMetadata()
Returns
Type Description
Metadata

GetMetadata(Boolean, String)

Refer to GetMetadata(Boolean, String, TimeSpan)

[UNSTABLE-API] - The API associated with this functionality is subject to change.

Declaration
public Metadata GetMetadata(bool allTopics, string topic)
Parameters
Type Name Description
System.Boolean allTopics
System.String topic
Returns
Type Description
Metadata

GetMetadata(Boolean, String, TimeSpan)

Query the cluster for metadata (blocking).

  • allTopics = true - request all topics from cluster
  • allTopics = false, topic = null - request only locally known topics.
  • allTopics = false, topic = valid - request specific topic

[UNSTABLE-API] - The API associated with this functionality is subject to change.

Declaration
public Metadata GetMetadata(bool allTopics, string topic, TimeSpan timeout)
Parameters
Type Name Description
System.Boolean allTopics
System.String topic
TimeSpan timeout
Returns
Type Description
Metadata

GetSerializingProducer<TKey, TValue>(ISerializer<TKey>, ISerializer<TValue>)

Returns a serializing producer that uses this Producer to produce messages. The same underlying Producer can be used as the basis of many serializing producers (potentially with different TKey and TValue types). Threadsafe.

Declaration
public ISerializingProducer<TKey, TValue> GetSerializingProducer<TKey, TValue>(ISerializer<TKey> keySerializer, ISerializer<TValue> valueSerializer)
Parameters
Type Name Description
ISerializer<TKey> keySerializer

The key serializer.

ISerializer<TValue> valueSerializer

The value serializer.

Returns
Type Description
ISerializingProducer<TKey, TValue>
Type Parameters
Name Description
TKey

The key type.

TValue

The value type.

ListGroup(String)

Get information pertaining to a particular group in the Kafka cluster (blocks, potentially indefinitely).

[UNSTABLE-API] - The API associated with this functionality is subject to change.

Declaration
public GroupInfo ListGroup(string group)
Parameters
Type Name Description
System.String group

The group of interest.

Returns
Type Description
GroupInfo

Returns information pertaining to the specified group or null if this group does not exist.

ListGroup(String, TimeSpan)

Get information pertaining to a particular group in the Kafka cluster (blocking).

[UNSTABLE-API] - The API associated with this functionality is subject to change.

Declaration
public GroupInfo ListGroup(string group, TimeSpan timeout)
Parameters
Type Name Description
System.String group

The group of interest.

TimeSpan timeout

The maximum period of time the call may block.

Returns
Type Description
GroupInfo

Returns information pertaining to the specified group or null if this group does not exist.

ListGroups(TimeSpan)

Get information pertaining to all groups in the Kafka cluster (blocking)

[UNSTABLE-API] - The API associated with this functionality is subject to change.

Declaration
public List<GroupInfo> ListGroups(TimeSpan timeout)
Parameters
Type Name Description
TimeSpan timeout

The maximum period of time the call may block.

Returns
Type Description
List<GroupInfo>

Poll()

Poll for callback events. You will not typically need to call this method. Only call on producer instances where background polling is not enabled. Blocks until there is a callback event ready to be served.

Declaration
public int Poll()
Returns
Type Description
System.Int32

Returns the number of events served.

Poll(Int32)

Poll for callback events. You will not typically need to call this method. Only call on producer instances where background polling is not enabled.

Declaration
public int Poll(int millisecondsTimeout)
Parameters
Type Name Description
System.Int32 millisecondsTimeout

The maximum period of time to block (in milliseconds) if no callback events are waiting or -1 to block indefinitely. You should typically use a relatively short timout period because this operation cannot be cancelled.

Returns
Type Description
System.Int32

Returns the number of events served.

Poll(TimeSpan)

Poll for callback events. You will not typically need to call this method. Only call on producer instances where background polling is not enabled.

Declaration
public int Poll(TimeSpan timeout)
Parameters
Type Name Description
TimeSpan timeout

The maximum period of time to block if no callback events are waiting. You should typically use a relatively short timout period because this operation cannot be cancelled.

Returns
Type Description
System.Int32

Returns the number of events served.

ProduceAsync(String, Byte[], Byte[])

Asynchronously send a single message to the broker. Refer to ProduceAsync(String, Byte[], Int32, Int32, Byte[], Int32, Int32, Int32, Boolean) for more information.

Declaration
public Task<Message> ProduceAsync(string topic, byte[] key, byte[] val)
Parameters
Type Name Description
System.String topic
System.Byte[] key
System.Byte[] val
Returns
Type Description
Task<Message>

ProduceAsync(String, Byte[], Byte[], IDeliveryHandler)

Asynchronously send a single message to the broker (order of delivery reports strictly guarenteed). Refer to ProduceAsync(String, Byte[], Int32, Int32, Byte[], Int32, Int32, Int32, Boolean, IDeliveryHandler) for more information.

Declaration
public void ProduceAsync(string topic, byte[] key, byte[] val, IDeliveryHandler deliveryHandler)
Parameters
Type Name Description
System.String topic
System.Byte[] key
System.Byte[] val
IDeliveryHandler deliveryHandler

ProduceAsync(String, Byte[], Int32, Int32, Byte[], Int32, Int32)

Asynchronously send a single message to the broker. Refer to ProduceAsync(String, Byte[], Int32, Int32, Byte[], Int32, Int32, Int32, Boolean) for more information.

Declaration
public Task<Message> ProduceAsync(string topic, byte[] key, int keyOffset, int keyLength, byte[] val, int valOffset, int valLength)
Parameters
Type Name Description
System.String topic
System.Byte[] key
System.Int32 keyOffset
System.Int32 keyLength
System.Byte[] val
System.Int32 valOffset
System.Int32 valLength
Returns
Type Description
Task<Message>

ProduceAsync(String, Byte[], Int32, Int32, Byte[], Int32, Int32, IDeliveryHandler)

Asynchronously send a single message to the broker (order of delivery reports strictly guarenteed). Refer to ProduceAsync(String, Byte[], Int32, Int32, Byte[], Int32, Int32, Int32, Boolean, IDeliveryHandler) for more information.

Declaration
public void ProduceAsync(string topic, byte[] key, int keyOffset, int keyLength, byte[] val, int valOffset, int valLength, IDeliveryHandler deliveryHandler)
Parameters
Type Name Description
System.String topic
System.Byte[] key
System.Int32 keyOffset
System.Int32 keyLength
System.Byte[] val
System.Int32 valOffset
System.Int32 valLength
IDeliveryHandler deliveryHandler

ProduceAsync(String, Byte[], Int32, Int32, Byte[], Int32, Int32, Boolean)

Asynchronously send a single message to the broker. Refer to ProduceAsync(String, Byte[], Int32, Int32, Byte[], Int32, Int32, Int32, Boolean) for more information.

Declaration
public Task<Message> ProduceAsync(string topic, byte[] key, int keyOffset, int keyLength, byte[] val, int valOffset, int valLength, bool blockIfQueueFull)
Parameters
Type Name Description
System.String topic
System.Byte[] key
System.Int32 keyOffset
System.Int32 keyLength
System.Byte[] val
System.Int32 valOffset
System.Int32 valLength
System.Boolean blockIfQueueFull
Returns
Type Description
Task<Message>

ProduceAsync(String, Byte[], Int32, Int32, Byte[], Int32, Int32, Boolean, IDeliveryHandler)

Asynchronously send a single message to the broker (order of delivery reports strictly guarenteed). Refer to ProduceAsync(String, Byte[], Int32, Int32, Byte[], Int32, Int32, Int32, Boolean, IDeliveryHandler) for more information.

Declaration
public void ProduceAsync(string topic, byte[] key, int keyOffset, int keyLength, byte[] val, int valOffset, int valLength, bool blockIfQueueFull, IDeliveryHandler deliveryHandler)
Parameters
Type Name Description
System.String topic
System.Byte[] key
System.Int32 keyOffset
System.Int32 keyLength
System.Byte[] val
System.Int32 valOffset
System.Int32 valLength
System.Boolean blockIfQueueFull
IDeliveryHandler deliveryHandler

ProduceAsync(String, Byte[], Int32, Int32, Byte[], Int32, Int32, Int32)

Asynchronously send a single message to the broker. Refer to ProduceAsync(String, Byte[], Int32, Int32, Byte[], Int32, Int32, Int32, Boolean) for more information.

Declaration
public Task<Message> ProduceAsync(string topic, byte[] key, int keyOffset, int keyLength, byte[] val, int valOffset, int valLength, int partition)
Parameters
Type Name Description
System.String topic
System.Byte[] key
System.Int32 keyOffset
System.Int32 keyLength
System.Byte[] val
System.Int32 valOffset
System.Int32 valLength
System.Int32 partition
Returns
Type Description
Task<Message>

ProduceAsync(String, Byte[], Int32, Int32, Byte[], Int32, Int32, Int32, IDeliveryHandler)

Asynchronously send a single message to the broker (order of delivery reports strictly guarenteed). Refer to ProduceAsync(String, Byte[], Int32, Int32, Byte[], Int32, Int32, Int32, Boolean, IDeliveryHandler) for more information.

Declaration
public void ProduceAsync(string topic, byte[] key, int keyOffset, int keyLength, byte[] val, int valOffset, int valLength, int partition, IDeliveryHandler deliveryHandler)
Parameters
Type Name Description
System.String topic
System.Byte[] key
System.Int32 keyOffset
System.Int32 keyLength
System.Byte[] val
System.Int32 valOffset
System.Int32 valLength
System.Int32 partition
IDeliveryHandler deliveryHandler

ProduceAsync(String, Byte[], Int32, Int32, Byte[], Int32, Int32, Int32, Boolean)

Asynchronously send a single message to the broker.

Declaration
public Task<Message> ProduceAsync(string topic, byte[] key, int keyOffset, int keyLength, byte[] val, int valOffset, int valLength, int partition, bool blockIfQueueFull)
Parameters
Type Name Description
System.String topic

The target topic.

System.Byte[] key

null, or a byte array that contains the message key.

System.Int32 keyOffset

for non-null values, the offset into the key array of the sub-array to use as the message key. if key is null, keyOffset must be 0.

System.Int32 keyLength

for non-null keys, the length of the sequence of bytes that constitutes the key. if key is null, keyOffset must be 0.

System.Byte[] val

null, or a byte array that contains the message value.

System.Int32 valOffset

for non-null values, the offset into the val array of the sub-array to use as the message value. if val is null, valOffset must be 0.

System.Int32 valLength

for non-null values, the length of the sequence of bytes that constitutes the value. if val is null, valLength must be 0.

System.Int32 partition

The target partition (if -1, this is determined by the partitioner configured for the topic).

System.Boolean blockIfQueueFull

Whether or not to block if the send queue is full. If false, a KafkaExcepion (with Error.Code == ErrorCode.Local_QueueFull) will be thrown if an attempt is made to produce a message and the send queue is full.

Warning: blockIfQueueFull is set to true, background polling is disabled and Poll is not being called in another thread, this will block indefinitely.

Returns
Type Description
Task<Message>

A Task which will complete with the corresponding delivery report for this request.

Remarks

If you require strict ordering of delivery reports to be maintained, you should use a variant of ProduceAsync that takes an IDeliveryHandler parameter, not a variant that returns a Task<Message> because Tasks are completed on arbitrary thread pool threads and can be executed out of order.

ProduceAsync(String, Byte[], Int32, Int32, Byte[], Int32, Int32, Int32, Boolean, IDeliveryHandler)

Asynchronously send a single message to the broker (order of delivery reports strictly guarenteed).

Declaration
public void ProduceAsync(string topic, byte[] key, int keyOffset, int keyLength, byte[] val, int valOffset, int valLength, int partition, bool blockIfQueueFull, IDeliveryHandler deliveryHandler)
Parameters
Type Name Description
System.String topic
System.Byte[] key
System.Int32 keyOffset
System.Int32 keyLength
System.Byte[] val
System.Int32 valOffset
System.Int32 valLength
System.Int32 partition
System.Boolean blockIfQueueFull
IDeliveryHandler deliveryHandler
Remarks

Notification of delivery reports is via an IDeliveryHandler instance. Use IDeliveryHandler variants of ProduceAsync if you require notification of delivery reports strictly in the order they were acknowledged by the broker / failed (failure may be via broker or local). IDeliveryHandler.HandleDeliveryReport callbacks are executed on the Poll thread.

Refer to ProduceAsync(String, Byte[], Int32, Int32, Byte[], Int32, Int32, Int32, Boolean) for more information.

QueryWatermarkOffsets(TopicPartition)

Query the Kafka cluster for low (oldest/beginning) and high (newest/end) offsets for the specified topic/partition (blocks, potentially indefinitely).

[UNSTABLE-API] - The API associated with this functionality is subject to change.

Declaration
public WatermarkOffsets QueryWatermarkOffsets(TopicPartition topicPartition)
Parameters
Type Name Description
TopicPartition topicPartition

The topic/partition of interest.

Returns
Type Description
WatermarkOffsets

The requested WatermarkOffsets.

QueryWatermarkOffsets(TopicPartition, TimeSpan)

Query the Kafka cluster for low (oldest/beginning) and high (newest/end) offsets for the specified topic/partition (blocking).

[UNSTABLE-API] - The API associated with this functionality is subject to change.

Declaration
public WatermarkOffsets QueryWatermarkOffsets(TopicPartition topicPartition, TimeSpan timeout)
Parameters
Type Name Description
TopicPartition topicPartition

The topic/partition of interest.

TimeSpan timeout

The maximum period of time the call may block.

Returns
Type Description
WatermarkOffsets

The requested WatermarkOffsets.

Events

OnError

Raised on critical errors, e.g. connection failures or all brokers down. Note that the client will try to automatically recover from errors - these errors should be seen as informational rather than catastrophic

Declaration
public event EventHandler<Error> OnError
Event Type
Type Description
EventHandler<Error>
Remarks

Called on the Producer poll thread.

OnLog

Raised when there is information that should be logged.

Declaration
public event EventHandler<LogMessage> OnLog
Event Type
Type Description
EventHandler<LogMessage>
Remarks

By default not many log messages are generated.

You can specify one or more debug contexts using the 'debug' configuration property and a log level using the 'log_level' configuration property to enable more verbose logging, however you shouldn't typically need to do this.

Warning: Log handlers are called spontaneously from internal librdkafka threads and the application must not call any Confluent.Kafka APIs from within a log handler or perform any prolonged operations.

OnStatistics

Raised on librdkafka statistics events. JSON formatted string as defined here: https://github.com/edenhill/librdkafka/wiki/Statistics

Declaration
public event EventHandler<string> OnStatistics
Event Type
Type Description
EventHandler<System.String>
Remarks

You can enable statistics and set the statistics interval using the statistics.interval.ms configuration parameter (disabled by default).

Called on the Producer poll thread.