Class Producer
Implements a high-level Apache Kafka producer (without serialization).
Inheritance
Namespace: Confluent.Kafka
Assembly: cs.temp.dll.dll
Syntax
public class Producer : IDisposable
Constructors
Producer(IEnumerable<KeyValuePair<String, Object>>)
Initializes a new Producer instance.
Declaration
public Producer(IEnumerable<KeyValuePair<string, object>> config)
Parameters
Type | Name | Description |
---|---|---|
IEnumerable<KeyValuePair<System.String, System.Object>> | config | librdkafka configuration parameters (refer to https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md). |
Producer(IEnumerable<KeyValuePair<String, Object>>, Boolean, Boolean)
Initializes a new Producer instance.
Declaration
public Producer(IEnumerable<KeyValuePair<string, object>> config, bool manualPoll, bool disableDeliveryReports)
Parameters
Type | Name | Description |
---|---|---|
IEnumerable<KeyValuePair<System.String, System.Object>> | config | librdkafka configuration parameters (refer to https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md). |
System.Boolean | manualPoll | If true, does not start a dedicated polling thread to trigger events or receive delivery reports - you must call the Poll method periodically instead. Typically you should set this parameter to false. |
System.Boolean | disableDeliveryReports | If true, disables delivery report notification. Note: if set to true and you use a ProduceAsync variant that returns a Task, the Tasks will never complete. Typically you should set this parameter to false. Set it to true for "fire and forget" semantics and a small boost in performance. |
Properties
Name
Gets the name of this producer instance. Contains (but is not equal to) the client.id configuration parameter.
Declaration
public string Name { get; }
Property Value
Type | Description |
---|---|
System.String |
Remarks
This name will be unique across all producer instances in a given application which allows log messages to be associated with the corresponding instance.
Methods
AddBrokers(String)
Adds one or more brokers to the Producer's list of initial bootstrap brokers.
Note: Additional brokers are discovered automatically as soon as the Producer connects to any broker by querying the broker metadata. Calling this method is only required in some scenarios where the address of all brokers in the cluster changes.
Declaration
public int AddBrokers(string brokers)
Parameters
Type | Name | Description |
---|---|---|
System.String | brokers | Coma-separated list of brokers in the same format as the bootstrap.server configuration parameter. |
Returns
Type | Description |
---|---|
System.Int32 | The number of brokers added. This value includes brokers that may have been specified a second time. |
Remarks
There is currently no API to remove existing configured, added or learnt brokers.
Dispose()
Releases all resources used by this Producer.
Declaration
public void Dispose()
Remarks
You will often want to call Flush() before disposing a Producer instance.
Flush()
Equivalent to Flush(Int32) with infinite timeout.
[UNSTABLE-API] - the semantics and/or type of the return value is subject to change.
Declaration
public int Flush()
Returns
Type | Description |
---|---|
System.Int32 | Refer to Flush(Int32). |
Flush(Int32)
Wait until all outstanding produce requests and delievery report callbacks are completed.
[UNSTABLE-API] - the semantics and/or type of the return value is subject to change.
Declaration
public int Flush(int millisecondsTimeout)
Parameters
Type | Name | Description |
---|---|---|
System.Int32 | millisecondsTimeout | The maximum time to block in milliseconds or -1 to block indefinitely. You should typically use a relatively short timout period because this operation cannot be cancelled. |
Returns
Type | Description |
---|---|
System.Int32 | The current librdkafka out queue length. This should be interpreted as a rough indication of the number of messages waiting to be sent to or acknowledged by the broker. If zero, there are no outstanding messages or callbacks. Specifically, the value is equal to the sum of the number of produced messages for which a delivery report has not yet been handled and a number which is less than or equal to the number of pending delivery report callback events (as determined by an internal librdkafka implementation detail). |
Remarks
This method should typically be called prior to destroying a producer instance to make sure all queued and in-flight produce requests are completed before terminating. The wait time is bounded by the millisecondsTimeout parameter.
A related configuration parameter is message.timeout.ms which determines the maximum length of time librdkafka attempts to deliver a message before giving up and so also affects the maximum time a call to Flush may block.
Flush(TimeSpan)
Wait until all outstanding produce requests and delievery report callbacks are completed. Refer to Flush(Int32) for more information.
[UNSTABLE-API] - the semantics and/or type of the return value is subject to change.
Declaration
public int Flush(TimeSpan timeout)
Parameters
Type | Name | Description |
---|---|---|
TimeSpan | timeout | The maximum length of time to block. You should typically use a relatively short timout period because this operation cannot be cancelled. |
Returns
Type | Description |
---|---|
System.Int32 | The current librdkafka out queue length. Refer to Flush(Int32) for more information. |
GetMetadata()
Refer to GetMetadata(Boolean, String, TimeSpan)
[UNSTABLE-API] - The API associated with this functionality is subject to change.
Declaration
public Metadata GetMetadata()
Returns
Type | Description |
---|---|
Metadata |
GetMetadata(Boolean, String)
Refer to GetMetadata(Boolean, String, TimeSpan)
[UNSTABLE-API] - The API associated with this functionality is subject to change.
Declaration
public Metadata GetMetadata(bool allTopics, string topic)
Parameters
Type | Name | Description |
---|---|---|
System.Boolean | allTopics | |
System.String | topic |
Returns
Type | Description |
---|---|
Metadata |
GetMetadata(Boolean, String, TimeSpan)
Query the cluster for metadata (blocking).
- allTopics = true - request all topics from cluster
- allTopics = false, topic = null - request only locally known topics.
- allTopics = false, topic = valid - request specific topic
[UNSTABLE-API] - The API associated with this functionality is subject to change.
Declaration
public Metadata GetMetadata(bool allTopics, string topic, TimeSpan timeout)
Parameters
Type | Name | Description |
---|---|---|
System.Boolean | allTopics | |
System.String | topic | |
TimeSpan | timeout |
Returns
Type | Description |
---|---|
Metadata |
GetSerializingProducer<TKey, TValue>(ISerializer<TKey>, ISerializer<TValue>)
Returns a serializing producer that uses this Producer to produce messages. The same underlying Producer can be used as the basis of many serializing producers (potentially with different TKey and TValue types). Threadsafe.
Declaration
public ISerializingProducer<TKey, TValue> GetSerializingProducer<TKey, TValue>(ISerializer<TKey> keySerializer, ISerializer<TValue> valueSerializer)
Parameters
Type | Name | Description |
---|---|---|
ISerializer<TKey> | keySerializer | The key serializer. |
ISerializer<TValue> | valueSerializer | The value serializer. |
Returns
Type | Description |
---|---|
ISerializingProducer<TKey, TValue> |
Type Parameters
Name | Description |
---|---|
TKey | The key type. |
TValue | The value type. |
ListGroup(String)
Get information pertaining to a particular group in the Kafka cluster (blocks, potentially indefinitely).
[UNSTABLE-API] - The API associated with this functionality is subject to change.
Declaration
public GroupInfo ListGroup(string group)
Parameters
Type | Name | Description |
---|---|---|
System.String | group | The group of interest. |
Returns
Type | Description |
---|---|
GroupInfo | Returns information pertaining to the specified group or null if this group does not exist. |
ListGroup(String, TimeSpan)
Get information pertaining to a particular group in the Kafka cluster (blocking).
[UNSTABLE-API] - The API associated with this functionality is subject to change.
Declaration
public GroupInfo ListGroup(string group, TimeSpan timeout)
Parameters
Type | Name | Description |
---|---|---|
System.String | group | The group of interest. |
TimeSpan | timeout | The maximum period of time the call may block. |
Returns
Type | Description |
---|---|
GroupInfo | Returns information pertaining to the specified group or null if this group does not exist. |
ListGroups(TimeSpan)
Get information pertaining to all groups in the Kafka cluster (blocking)
[UNSTABLE-API] - The API associated with this functionality is subject to change.
Declaration
public List<GroupInfo> ListGroups(TimeSpan timeout)
Parameters
Type | Name | Description |
---|---|---|
TimeSpan | timeout | The maximum period of time the call may block. |
Returns
Type | Description |
---|---|
List<GroupInfo> |
Poll()
Poll for callback events. You will not typically need to call this method. Only call on producer instances where background polling is not enabled. Blocks until there is a callback event ready to be served.
Declaration
public int Poll()
Returns
Type | Description |
---|---|
System.Int32 | Returns the number of events served. |
Poll(Int32)
Poll for callback events. You will not typically need to call this method. Only call on producer instances where background polling is not enabled.
Declaration
public int Poll(int millisecondsTimeout)
Parameters
Type | Name | Description |
---|---|---|
System.Int32 | millisecondsTimeout | The maximum period of time to block (in milliseconds) if no callback events are waiting or -1 to block indefinitely. You should typically use a relatively short timout period because this operation cannot be cancelled. |
Returns
Type | Description |
---|---|
System.Int32 | Returns the number of events served. |
Poll(TimeSpan)
Poll for callback events. You will not typically need to call this method. Only call on producer instances where background polling is not enabled.
Declaration
public int Poll(TimeSpan timeout)
Parameters
Type | Name | Description |
---|---|---|
TimeSpan | timeout | The maximum period of time to block if no callback events are waiting. You should typically use a relatively short timout period because this operation cannot be cancelled. |
Returns
Type | Description |
---|---|
System.Int32 | Returns the number of events served. |
ProduceAsync(String, Byte[], Byte[])
Asynchronously send a single message to the broker. Refer to ProduceAsync(String, Byte[], Int32, Int32, Byte[], Int32, Int32, Int32, Boolean) for more information.
Declaration
public Task<Message> ProduceAsync(string topic, byte[] key, byte[] val)
Parameters
Type | Name | Description |
---|---|---|
System.String | topic | |
System.Byte[] | key | |
System.Byte[] | val |
Returns
Type | Description |
---|---|
Task<Message> |
ProduceAsync(String, Byte[], Byte[], IDeliveryHandler)
Asynchronously send a single message to the broker (order of delivery reports strictly guarenteed). Refer to ProduceAsync(String, Byte[], Int32, Int32, Byte[], Int32, Int32, Int32, Boolean, IDeliveryHandler) for more information.
Declaration
public void ProduceAsync(string topic, byte[] key, byte[] val, IDeliveryHandler deliveryHandler)
Parameters
Type | Name | Description |
---|---|---|
System.String | topic | |
System.Byte[] | key | |
System.Byte[] | val | |
IDeliveryHandler | deliveryHandler |
ProduceAsync(String, Byte[], Int32, Int32, Byte[], Int32, Int32)
Asynchronously send a single message to the broker. Refer to ProduceAsync(String, Byte[], Int32, Int32, Byte[], Int32, Int32, Int32, Boolean) for more information.
Declaration
public Task<Message> ProduceAsync(string topic, byte[] key, int keyOffset, int keyLength, byte[] val, int valOffset, int valLength)
Parameters
Type | Name | Description |
---|---|---|
System.String | topic | |
System.Byte[] | key | |
System.Int32 | keyOffset | |
System.Int32 | keyLength | |
System.Byte[] | val | |
System.Int32 | valOffset | |
System.Int32 | valLength |
Returns
Type | Description |
---|---|
Task<Message> |
ProduceAsync(String, Byte[], Int32, Int32, Byte[], Int32, Int32, IDeliveryHandler)
Asynchronously send a single message to the broker (order of delivery reports strictly guarenteed). Refer to ProduceAsync(String, Byte[], Int32, Int32, Byte[], Int32, Int32, Int32, Boolean, IDeliveryHandler) for more information.
Declaration
public void ProduceAsync(string topic, byte[] key, int keyOffset, int keyLength, byte[] val, int valOffset, int valLength, IDeliveryHandler deliveryHandler)
Parameters
Type | Name | Description |
---|---|---|
System.String | topic | |
System.Byte[] | key | |
System.Int32 | keyOffset | |
System.Int32 | keyLength | |
System.Byte[] | val | |
System.Int32 | valOffset | |
System.Int32 | valLength | |
IDeliveryHandler | deliveryHandler |
ProduceAsync(String, Byte[], Int32, Int32, Byte[], Int32, Int32, Boolean)
Asynchronously send a single message to the broker. Refer to ProduceAsync(String, Byte[], Int32, Int32, Byte[], Int32, Int32, Int32, Boolean) for more information.
Declaration
public Task<Message> ProduceAsync(string topic, byte[] key, int keyOffset, int keyLength, byte[] val, int valOffset, int valLength, bool blockIfQueueFull)
Parameters
Type | Name | Description |
---|---|---|
System.String | topic | |
System.Byte[] | key | |
System.Int32 | keyOffset | |
System.Int32 | keyLength | |
System.Byte[] | val | |
System.Int32 | valOffset | |
System.Int32 | valLength | |
System.Boolean | blockIfQueueFull |
Returns
Type | Description |
---|---|
Task<Message> |
ProduceAsync(String, Byte[], Int32, Int32, Byte[], Int32, Int32, Boolean, IDeliveryHandler)
Asynchronously send a single message to the broker (order of delivery reports strictly guarenteed). Refer to ProduceAsync(String, Byte[], Int32, Int32, Byte[], Int32, Int32, Int32, Boolean, IDeliveryHandler) for more information.
Declaration
public void ProduceAsync(string topic, byte[] key, int keyOffset, int keyLength, byte[] val, int valOffset, int valLength, bool blockIfQueueFull, IDeliveryHandler deliveryHandler)
Parameters
Type | Name | Description |
---|---|---|
System.String | topic | |
System.Byte[] | key | |
System.Int32 | keyOffset | |
System.Int32 | keyLength | |
System.Byte[] | val | |
System.Int32 | valOffset | |
System.Int32 | valLength | |
System.Boolean | blockIfQueueFull | |
IDeliveryHandler | deliveryHandler |
ProduceAsync(String, Byte[], Int32, Int32, Byte[], Int32, Int32, Int32)
Asynchronously send a single message to the broker. Refer to ProduceAsync(String, Byte[], Int32, Int32, Byte[], Int32, Int32, Int32, Boolean) for more information.
Declaration
public Task<Message> ProduceAsync(string topic, byte[] key, int keyOffset, int keyLength, byte[] val, int valOffset, int valLength, int partition)
Parameters
Type | Name | Description |
---|---|---|
System.String | topic | |
System.Byte[] | key | |
System.Int32 | keyOffset | |
System.Int32 | keyLength | |
System.Byte[] | val | |
System.Int32 | valOffset | |
System.Int32 | valLength | |
System.Int32 | partition |
Returns
Type | Description |
---|---|
Task<Message> |
ProduceAsync(String, Byte[], Int32, Int32, Byte[], Int32, Int32, Int32, IDeliveryHandler)
Asynchronously send a single message to the broker (order of delivery reports strictly guarenteed). Refer to ProduceAsync(String, Byte[], Int32, Int32, Byte[], Int32, Int32, Int32, Boolean, IDeliveryHandler) for more information.
Declaration
public void ProduceAsync(string topic, byte[] key, int keyOffset, int keyLength, byte[] val, int valOffset, int valLength, int partition, IDeliveryHandler deliveryHandler)
Parameters
Type | Name | Description |
---|---|---|
System.String | topic | |
System.Byte[] | key | |
System.Int32 | keyOffset | |
System.Int32 | keyLength | |
System.Byte[] | val | |
System.Int32 | valOffset | |
System.Int32 | valLength | |
System.Int32 | partition | |
IDeliveryHandler | deliveryHandler |
ProduceAsync(String, Byte[], Int32, Int32, Byte[], Int32, Int32, Int32, Boolean)
Asynchronously send a single message to the broker.
Declaration
public Task<Message> ProduceAsync(string topic, byte[] key, int keyOffset, int keyLength, byte[] val, int valOffset, int valLength, int partition, bool blockIfQueueFull)
Parameters
Type | Name | Description |
---|---|---|
System.String | topic | The target topic. |
System.Byte[] | key | null, or a byte array that contains the message key. |
System.Int32 | keyOffset | for non-null values, the offset into the key array of the
sub-array to use as the message key.
if |
System.Int32 | keyLength | for non-null keys, the length of the sequence of bytes that
constitutes the key.
if |
System.Byte[] | val | null, or a byte array that contains the message value. |
System.Int32 | valOffset | for non-null values, the offset into the val array of the
sub-array to use as the message value.
if |
System.Int32 | valLength | for non-null values, the length of the sequence of bytes that
constitutes the value.
if |
System.Int32 | partition | The target partition (if -1, this is determined by the partitioner configured for the topic). |
System.Boolean | blockIfQueueFull | Whether or not to block if the send queue is full. If false, a KafkaExcepion (with Error.Code == ErrorCode.Local_QueueFull) will be thrown if an attempt is made to produce a message and the send queue is full. Warning: blockIfQueueFull is set to true, background polling is disabled and Poll is not being called in another thread, this will block indefinitely. |
Returns
Type | Description |
---|---|
Task<Message> | A Task which will complete with the corresponding delivery report for this request. |
Remarks
If you require strict ordering of delivery reports to be maintained, you should use a variant of ProduceAsync that takes an IDeliveryHandler parameter, not a variant that returns a Task<Message> because Tasks are completed on arbitrary thread pool threads and can be executed out of order.
ProduceAsync(String, Byte[], Int32, Int32, Byte[], Int32, Int32, Int32, Boolean, IDeliveryHandler)
Asynchronously send a single message to the broker (order of delivery reports strictly guarenteed).
Declaration
public void ProduceAsync(string topic, byte[] key, int keyOffset, int keyLength, byte[] val, int valOffset, int valLength, int partition, bool blockIfQueueFull, IDeliveryHandler deliveryHandler)
Parameters
Type | Name | Description |
---|---|---|
System.String | topic | |
System.Byte[] | key | |
System.Int32 | keyOffset | |
System.Int32 | keyLength | |
System.Byte[] | val | |
System.Int32 | valOffset | |
System.Int32 | valLength | |
System.Int32 | partition | |
System.Boolean | blockIfQueueFull | |
IDeliveryHandler | deliveryHandler |
Remarks
Notification of delivery reports is via an IDeliveryHandler instance. Use IDeliveryHandler variants of ProduceAsync if you require notification of delivery reports strictly in the order they were acknowledged by the broker / failed (failure may be via broker or local). IDeliveryHandler.HandleDeliveryReport callbacks are executed on the Poll thread.
Refer to ProduceAsync(String, Byte[], Int32, Int32, Byte[], Int32, Int32, Int32, Boolean) for more information.
QueryWatermarkOffsets(TopicPartition)
Query the Kafka cluster for low (oldest/beginning) and high (newest/end) offsets for the specified topic/partition (blocks, potentially indefinitely).
[UNSTABLE-API] - The API associated with this functionality is subject to change.
Declaration
public WatermarkOffsets QueryWatermarkOffsets(TopicPartition topicPartition)
Parameters
Type | Name | Description |
---|---|---|
TopicPartition | topicPartition | The topic/partition of interest. |
Returns
Type | Description |
---|---|
WatermarkOffsets | The requested WatermarkOffsets. |
QueryWatermarkOffsets(TopicPartition, TimeSpan)
Query the Kafka cluster for low (oldest/beginning) and high (newest/end) offsets for the specified topic/partition (blocking).
[UNSTABLE-API] - The API associated with this functionality is subject to change.
Declaration
public WatermarkOffsets QueryWatermarkOffsets(TopicPartition topicPartition, TimeSpan timeout)
Parameters
Type | Name | Description |
---|---|---|
TopicPartition | topicPartition | The topic/partition of interest. |
TimeSpan | timeout | The maximum period of time the call may block. |
Returns
Type | Description |
---|---|
WatermarkOffsets | The requested WatermarkOffsets. |
Events
OnError
Raised on critical errors, e.g. connection failures or all brokers down. Note that the client will try to automatically recover from errors - these errors should be seen as informational rather than catastrophic
Declaration
public event EventHandler<Error> OnError
Event Type
Type | Description |
---|---|
EventHandler<Error> |
Remarks
Called on the Producer poll thread.
OnLog
Raised when there is information that should be logged.
Declaration
public event EventHandler<LogMessage> OnLog
Event Type
Type | Description |
---|---|
EventHandler<LogMessage> |
Remarks
By default not many log messages are generated.
You can specify one or more debug contexts using the 'debug' configuration property and a log level using the 'log_level' configuration property to enable more verbose logging, however you shouldn't typically need to do this.
Warning: Log handlers are called spontaneously from internal librdkafka threads and the application must not call any Confluent.Kafka APIs from within a log handler or perform any prolonged operations.
OnStatistics
Raised on librdkafka statistics events. JSON formatted string as defined here: https://github.com/edenhill/librdkafka/wiki/Statistics
Declaration
public event EventHandler<string> OnStatistics
Event Type
Type | Description |
---|---|
EventHandler<System.String> |
Remarks
You can enable statistics and set the statistics interval using the statistics.interval.ms configuration parameter (disabled by default).
Called on the Producer poll thread.