Class Producer<TKey, TValue>
Implements a high-level Apache Kafka producer with key and value serialization.
Inheritance
Inherited Members
Namespace: Confluent.Kafka
Assembly: cs.temp.dll.dll
Syntax
public class Producer<TKey, TValue> : ISerializingProducer<TKey, TValue>, IDisposable
Type Parameters
Name | Description |
---|---|
TKey | |
TValue |
Constructors
Producer(IEnumerable<KeyValuePair<String, Object>>, ISerializer<TKey>, ISerializer<TValue>)
Initializes a new Producer instance.
Declaration
public Producer(IEnumerable<KeyValuePair<string, object>> config, ISerializer<TKey> keySerializer, ISerializer<TValue> valueSerializer)
Parameters
Type | Name | Description |
---|---|---|
IEnumerable<KeyValuePair<System.String, System.Object>> | config | librdkafka configuration parameters (refer to https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md). |
ISerializer<TKey> | keySerializer | An ISerializer implementation instance that will be used to serialize keys. |
ISerializer<TValue> | valueSerializer | An ISerializer implementation instance that will be used to serialize values. |
Properties
KeySerializer
Gets the ISerializer implementation instance used to serialize keys.
Declaration
public ISerializer<TKey> KeySerializer { get; }
Property Value
Type | Description |
---|---|
ISerializer<TKey> |
Name
Gets the name of this producer instance. Contains (but is not equal to) the client.id configuration parameter.
Declaration
public string Name { get; }
Property Value
Type | Description |
---|---|
System.String |
Remarks
This name will be unique across all producer instances in a given application which allows log messages to be associated with the corresponding instance.
ValueSerializer
Gets the ISerializer implementation instance used to serialize values.
Declaration
public ISerializer<TValue> ValueSerializer { get; }
Property Value
Type | Description |
---|---|
ISerializer<TValue> |
Methods
AddBrokers(String)
Adds one or more brokers to the Producer's list of initial bootstrap brokers.
Note: Additional brokers are discovered automatically as soon as the Producer connects to any broker by querying the broker metadata. Calling this method is only required in some scenarios where the address of all brokers in the cluster changes.
Declaration
public int AddBrokers(string brokers)
Parameters
Type | Name | Description |
---|---|---|
System.String | brokers | Coma-separated list of brokers in the same format as the bootstrap.server configuration parameter. |
Returns
Type | Description |
---|---|
System.Int32 | The number of brokers added. This value includes brokers that may have been specified a second time. |
Remarks
There is currently no API to remove existing configured, added or learnt brokers.
Dispose()
Releases all resources used by this Producer.
Declaration
public void Dispose()
Remarks
You will often want to call Flush(Int32) before disposing a Producer instance.
Flush(Int32)
Wait until all outstanding produce requests and delievery report callbacks are completed.
[UNSTABLE-API] - the semantics and/or type of the return value is subject to change.
Declaration
public int Flush(int millisecondsTimeout)
Parameters
Type | Name | Description |
---|---|---|
System.Int32 | millisecondsTimeout | The maximum time to block in milliseconds, or -1 to block indefinitely. You should typically use a relatively short timout period because this operation cannot be cancelled. |
Returns
Type | Description |
---|---|
System.Int32 | The current librdkafka out queue length. This should be interpreted as a rough indication of the number of messages waiting to be sent to or acknowledged by the broker. If zero, there are no outstanding messages or callbacks. Specifically, the value is equal to the sum of the number of produced messages for which a delivery report has not yet been handled and a number which is less than or equal to the number of pending delivery report callback events (as determined by an internal librdkafka implementation detail). |
Remarks
This method should typically be called prior to destroying a producer instance to make sure all queued and in-flight produce requests are completed before terminating. The wait time is bounded by the millisecondsTimeout parameter.
A related topic configuration parameter is message.timeout.ms which determines the maximum length of time librdkafka attempts to deliver a message before giving up and so also affects the maximum time a call to Flush may block.
Flush(TimeSpan)
Wait until all outstanding produce requests and delievery report callbacks are completed. Refer to Flush(Int32) for more information.
[UNSTABLE-API] - the semantics and/or type of the return value is subject to change.
Declaration
public int Flush(TimeSpan timeout)
Parameters
Type | Name | Description |
---|---|---|
TimeSpan | timeout | The maximum length of time to block. You should typically use a relatively short timout period because this operation cannot be cancelled. |
Returns
Type | Description |
---|---|
System.Int32 | The current librdkafka out queue length. Refer to Flush(Int32) for more information. |
GetMetadata(Boolean, String)
Refer to Producer.GetMetadata(bool, string) for more information
[UNSTABLE-API] - The API associated with this functionality is subject to change.
Declaration
public Metadata GetMetadata(bool allTopics, string topic)
Parameters
Type | Name | Description |
---|---|---|
System.Boolean | allTopics | |
System.String | topic |
Returns
Type | Description |
---|---|
Metadata |
GetMetadata(Boolean, String, TimeSpan)
Refer to Producer.GetMetadata(bool, string, TimeSpan) for more information.
[UNSTABLE-API] - The API associated with this functionality is subject to change.
Declaration
public Metadata GetMetadata(bool allTopics, string topic, TimeSpan timeout)
Parameters
Type | Name | Description |
---|---|---|
System.Boolean | allTopics | |
System.String | topic | |
TimeSpan | timeout |
Returns
Type | Description |
---|---|
Metadata |
ListGroup(String)
Get information pertaining to a particular group in the Kafka cluster (blocks, potentially indefinitely).
[UNSTABLE-API] - The API associated with this functionality is subject to change.
Declaration
public GroupInfo ListGroup(string group)
Parameters
Type | Name | Description |
---|---|---|
System.String | group | The group of interest. |
Returns
Type | Description |
---|---|
GroupInfo | Returns information pertaining to the specified group or null if this group does not exist. |
ListGroup(String, TimeSpan)
Get information pertaining to a particular group in the Kafka cluster (blocks)
[UNSTABLE-API] - The API associated with this functionality is subject to change.
Declaration
public GroupInfo ListGroup(string group, TimeSpan timeout)
Parameters
Type | Name | Description |
---|---|---|
System.String | group | The group of interest. |
TimeSpan | timeout | The maximum period of time the call may block. |
Returns
Type | Description |
---|---|
GroupInfo | Returns information pertaining to the specified group or null if this group does not exist. |
ListGroups(TimeSpan)
Get information pertaining to all groups in the Kafka cluster (blocking)
[UNSTABLE-API] - The API associated with this functionality is subject to change.
Declaration
public List<GroupInfo> ListGroups(TimeSpan timeout)
Parameters
Type | Name | Description |
---|---|---|
TimeSpan | timeout | The maximum period of time the call may block. |
Returns
Type | Description |
---|---|
List<GroupInfo> |
ProduceAsync(String, TKey, TValue)
Asynchronously send a single message to the broker. Refer to ProduceAsync(String, TKey, TValue, Int32, Boolean) for more information.
Declaration
public Task<Message<TKey, TValue>> ProduceAsync(string topic, TKey key, TValue val)
Parameters
Type | Name | Description |
---|---|---|
System.String | topic | |
TKey | key | |
TValue | val |
Returns
Type | Description |
---|---|
Task<Message<TKey, TValue>> |
Remarks
The partition the message is produced to is determined using the configured partitioner.
Blocks if the send queue is full. Warning: if background polling is disabled and Poll is not being called in another thread, this will block indefinitely.
ProduceAsync(String, TKey, TValue, IDeliveryHandler<TKey, TValue>)
Asynchronously send a single message to the broker (order of delivery reports strictly guarenteed). See ProduceAsync(String, TKey, TValue, Int32, Boolean, IDeliveryHandler<TKey, TValue>) for more information.
Declaration
public void ProduceAsync(string topic, TKey key, TValue val, IDeliveryHandler<TKey, TValue> deliveryHandler)
Parameters
Type | Name | Description |
---|---|---|
System.String | topic | |
TKey | key | |
TValue | val | |
IDeliveryHandler<TKey, TValue> | deliveryHandler |
Remarks
The partition the message is produced to is determined using the configured partitioner.
Blocks if the send queue is full. Warning: if background polling is disabled and Poll is not being called in another thread, this will block indefinitely.
ProduceAsync(String, TKey, TValue, Boolean)
Asynchronously send a single message to the broker. Refer to ProduceAsync(String, TKey, TValue, Int32, Boolean) for more information.
Declaration
public Task<Message<TKey, TValue>> ProduceAsync(string topic, TKey key, TValue val, bool blockIfQueueFull)
Parameters
Type | Name | Description |
---|---|---|
System.String | topic | |
TKey | key | |
TValue | val | |
System.Boolean | blockIfQueueFull |
Returns
Type | Description |
---|---|
Task<Message<TKey, TValue>> |
Remarks
The partition the message is produced to is determined using the configured partitioner.
ProduceAsync(String, TKey, TValue, Boolean, IDeliveryHandler<TKey, TValue>)
Asynchronously send a single message to the broker (order of delivery reports strictly guarenteed). See ProduceAsync(String, TKey, TValue, Int32, Boolean, IDeliveryHandler<TKey, TValue>) for more information.
Declaration
public void ProduceAsync(string topic, TKey key, TValue val, bool blockIfQueueFull, IDeliveryHandler<TKey, TValue> deliveryHandler)
Parameters
Type | Name | Description |
---|---|---|
System.String | topic | |
TKey | key | |
TValue | val | |
System.Boolean | blockIfQueueFull | |
IDeliveryHandler<TKey, TValue> | deliveryHandler |
Remarks
The partition the message is produced to is determined using the configured partitioner.
ProduceAsync(String, TKey, TValue, Int32)
Asynchronously send a single message to the broker. Refer to ProduceAsync(String, TKey, TValue, Int32, Boolean) for more information.
Declaration
public Task<Message<TKey, TValue>> ProduceAsync(string topic, TKey key, TValue val, int partition)
Parameters
Type | Name | Description |
---|---|---|
System.String | topic | |
TKey | key | |
TValue | val | |
System.Int32 | partition |
Returns
Type | Description |
---|---|
Task<Message<TKey, TValue>> |
Remarks
Blocks if the send queue is full. Warning: if background polling is disabled and Poll is not being called in another thread, this will block indefinitely.
ProduceAsync(String, TKey, TValue, Int32, IDeliveryHandler<TKey, TValue>)
Asynchronously send a single message to the broker (order of delivery reports strictly guarenteed). See ProduceAsync(String, TKey, TValue, Int32, Boolean, IDeliveryHandler<TKey, TValue>) for more information.
Declaration
public void ProduceAsync(string topic, TKey key, TValue val, int partition, IDeliveryHandler<TKey, TValue> deliveryHandler)
Parameters
Type | Name | Description |
---|---|---|
System.String | topic | |
TKey | key | |
TValue | val | |
System.Int32 | partition | |
IDeliveryHandler<TKey, TValue> | deliveryHandler |
Remarks
Blocks if the send queue is full. Warning: if background polling is disabled and Poll is not being called in another thread, this will block indefinitely.
ProduceAsync(String, TKey, TValue, Int32, Boolean)
Asynchronously send a single message to the broker.
Declaration
public Task<Message<TKey, TValue>> ProduceAsync(string topic, TKey key, TValue val, int partition, bool blockIfQueueFull)
Parameters
Type | Name | Description |
---|---|---|
System.String | topic | The target topic. |
TKey | key | the message key (possibly null if allowed by the key serializer). |
TValue | val | the message value (possibly null if allowed by the value serializer). |
System.Int32 | partition | The target partition (if -1, this is determined by the partitioner configured for the topic). |
System.Boolean | blockIfQueueFull | Whether or not to block if the send queue is full. If false, a KafkaExcepion (with Error.Code == ErrorCode.Local_QueueFull) will be thrown if an attempt is made to produce a message and the send queue is full. Warning: blockIfQueueFull is set to true, background polling is disabled and Poll is not being called in another thread, this will block indefinitely. |
Returns
Type | Description |
---|---|
Task<Message<TKey, TValue>> | A Task which will complete with the corresponding delivery report for this request. |
Remarks
If you require strict ordering of delivery reports to be maintained, you should use a variant of ProduceAsync that takes an IDeliveryHandler parameter, not a variant that returns a Task<Message> because Tasks are completed on arbitrary thread pool threads and can be executed out of order.
ProduceAsync(String, TKey, TValue, Int32, Boolean, IDeliveryHandler<TKey, TValue>)
Asynchronously send a single message to the broker (order of delivery reports strictly guarenteed).
Declaration
public void ProduceAsync(string topic, TKey key, TValue val, int partition, bool blockIfQueueFull, IDeliveryHandler<TKey, TValue> deliveryHandler)
Parameters
Type | Name | Description |
---|---|---|
System.String | topic | |
TKey | key | |
TValue | val | |
System.Int32 | partition | |
System.Boolean | blockIfQueueFull | |
IDeliveryHandler<TKey, TValue> | deliveryHandler |
Remarks
Notification of delivery reports is via an IDeliveryHandler instance. Use IDeliveryHandler variants of ProduceAsync if you require notification of delivery reports strictly in the order they were acknowledged by the broker / failed (failure may be via broker or local). IDeliveryHandler.HandleDeliveryReport callbacks are executed on the Poll thread.
Refer to ProduceAsync(String, TKey, TValue, Int32, Boolean) for more information.
QueryWatermarkOffsets(TopicPartition)
Query the Kafka cluster for low (oldest/beginning) and high (newest/end) offsets for the specified topic/partition (blocks, potentialy indefinitely).
[UNSTABLE-API] - The API associated with this functionality is subject to change.
Declaration
public WatermarkOffsets QueryWatermarkOffsets(TopicPartition topicPartition)
Parameters
Type | Name | Description |
---|---|---|
TopicPartition | topicPartition | The topic/partition of interest. |
Returns
Type | Description |
---|---|
WatermarkOffsets | The requested WatermarkOffsets. |
QueryWatermarkOffsets(TopicPartition, TimeSpan)
Query the Kafka cluster for low (oldest/beginning) and high (newest/end) offsets for the specified topic/partition (blocking).
[UNSTABLE-API] - The API associated with this functionality is subject to change.
Declaration
public WatermarkOffsets QueryWatermarkOffsets(TopicPartition topicPartition, TimeSpan timeout)
Parameters
Type | Name | Description |
---|---|---|
TopicPartition | topicPartition | The topic/partition of interest. |
TimeSpan | timeout | The maximum period of time the call may block. |
Returns
Type | Description |
---|---|
WatermarkOffsets | The requested WatermarkOffsets. |
Events
OnError
Raised on critical errors, e.g. connection failures or all brokers down. Note that the client will try to automatically recover from errors - these errors should be seen as informational rather than catastrophic
Declaration
public event EventHandler<Error> OnError
Event Type
Type | Description |
---|---|
EventHandler<Error> |
Remarks
Called on the Producer poll thread.
OnLog
Raised when there is information that should be logged.
Declaration
public event EventHandler<LogMessage> OnLog
Event Type
Type | Description |
---|---|
EventHandler<LogMessage> |
Remarks
Note: By default not many log messages are generated.
You can specify one or more debug contexts using the 'debug' configuration property and a log level using the 'log_level' configuration property to enable more verbose logging, however you shouldn't typically need to do this.
Warning: Log handlers are called spontaneously from internal librdkafka threads and the application must not call any Confluent.Kafka APIs from within a log handler or perform any prolonged operations.
OnStatistics
Raised on librdkafka statistics events. JSON formatted string as defined here: https://github.com/edenhill/librdkafka/wiki/Statistics
Declaration
public event EventHandler<string> OnStatistics
Event Type
Type | Description |
---|---|
EventHandler<System.String> |
Remarks
You can enable statistics and set the statistics interval using the statistics.interval.ms configuration parameter (disabled by default).
Called on the Producer poll thread.