Interface IProducer<TKey, TValue>
Defines a high-level Apache Kafka producer client that provides key and value serialization.
Inherited Members
Namespace: Confluent.Kafka
Assembly: Confluent.Kafka.dll
Syntax
public interface IProducer<TKey, TValue> : IClient, IDisposable
Type Parameters
Name | Description |
---|---|
TKey | |
TValue |
Methods
AbortTransaction()
Aborts the ongoing transaction.
This function should also be used to recover from non-fatal abortable transaction errors.
Any outstanding messages will be purged and fail.
Declaration
void AbortTransaction()
Exceptions
Type | Condition |
---|---|
KafkaRetriableException | Thrown if an error occured, and the operation may be retried. |
KafkaException | Thrown on all other errors. |
AbortTransaction(TimeSpan)
Aborts the ongoing transaction.
This function should also be used to recover from non-fatal abortable transaction errors.
Any outstanding messages will be purged and fail.
IMPORTANT NOTE: It is currently strongly recommended that the application call AbortTransaction without specifying a timeout (which will block up to the remaining transaction timeout - ProducerConfig.TransactionTimeoutMs) because the Transactional Producer's API timeout handling is inconsistent with the underlying protocol requests (known issue).
Declaration
void AbortTransaction(TimeSpan timeout)
Parameters
Type | Name | Description |
---|---|---|
TimeSpan | timeout | The maximum length of time this method may block. |
Exceptions
Type | Condition |
---|---|
KafkaRetriableException | Thrown if an error occured, and the operation may be retried. |
KafkaException | Thrown on all other errors. |
BeginTransaction()
Begin a new transaction.
InitTransactions must have been called successfully (once) before this function is called.
Any messages produced, offsets sent (SendOffsetsToTransaction), etc, after the successful return of this function will be part of the transaction and committed or aborted atomatically.
Finish the transaction by calling CommitTransaction or abort the transaction by calling AbortTransaction.
Declaration
void BeginTransaction()
Exceptions
Type | Condition |
---|---|
KafkaException | Thrown on all errors. |
CommitTransaction()
Commit the current transaction (as started with BeginTransaction).
Any outstanding messages will be flushed (delivered) before actually committing the transaction.
If any of the outstanding messages fail permanently the current transaction will enter the abortable error state, in this case the application must call AbortTransaction before attempting a new transaction with BeginTransaction.
Declaration
void CommitTransaction()
Exceptions
Type | Condition |
---|---|
KafkaTxnRequiresAbortException | Thrown if the application must call AbortTransaction and start a new transaction with BeginTransaction if it wishes to proceed with transactions. |
KafkaRetriableException | Thrown if an error occured, and the operation may be retried. |
KafkaException | Thrown on all other errors. |
CommitTransaction(TimeSpan)
Commit the current transaction (as started with BeginTransaction).
Any outstanding messages will be flushed (delivered) before actually committing the transaction.
If any of the outstanding messages fail permanently the current transaction will enter the abortable error state, in this case the application must call AbortTransaction before attempting a new transaction with BeginTransaction.
IMPORTANT NOTE: It is currently strongly recommended that the application call CommitTransaction without specifying a timeout (which will block up to the remaining transaction timeout - ProducerConfig.TransactionTimeoutMs) because the Transactional Producer's API timeout handling is inconsistent with the underlying protocol requests (known issue).
Declaration
void CommitTransaction(TimeSpan timeout)
Parameters
Type | Name | Description |
---|---|---|
TimeSpan | timeout | The maximum length of time this method may block. |
Exceptions
Type | Condition |
---|---|
KafkaTxnRequiresAbortException | Thrown if the application must call AbortTransaction and start a new transaction with BeginTransaction if it wishes to proceed with transactions. |
KafkaRetriableException | Thrown if an error occured, and the operation may be retried. |
KafkaException | Thrown on all other errors. |
Flush(CancellationToken)
Wait until all outstanding produce requests and delivery report callbacks are completed.
Declaration
void Flush(CancellationToken cancellationToken = default)
Parameters
Type | Name | Description |
---|---|---|
CancellationToken | cancellationToken | A cancellation token to observe whilst waiting the returned task to complete. |
Remarks
This method should typically be called prior to destroying a producer instance to make sure all queued and in-flight produce requests are completed before terminating.
A related configuration parameter is message.timeout.ms which determines the maximum length of time librdkafka attempts to deliver a message before giving up and so also affects the maximum time a call to Flush may block.
Where this Producer instance shares a Handle with one or more other producer instances, the Flush method will wait on messages produced by the other producer instances as well.
Exceptions
Type | Condition |
---|---|
OperationCanceledException | Thrown if the operation is cancelled. |
Flush(TimeSpan)
Wait until all outstanding produce requests and delivery report callbacks are completed.
[API-SUBJECT-TO-CHANGE] - the semantics and/or type of the return value is subject to change.
Declaration
int Flush(TimeSpan timeout)
Parameters
Type | Name | Description |
---|---|---|
TimeSpan | timeout | The maximum length of time to block. You should typically use a relatively short timeout period and loop until the return value becomes zero because this operation cannot be cancelled. |
Returns
Type | Description |
---|---|
int | The current librdkafka out queue length. This should be interpreted as a rough indication of the number of messages waiting to be sent to or acknowledged by the broker. If zero, there are no outstanding messages or callbacks. Specifically, the value is equal to the sum of the number of produced messages for which a delivery report has not yet been handled and a number which is less than or equal to the number of pending delivery report callback events (as determined by the number of outstanding protocol requests). |
Remarks
This method should typically be called prior to destroying a producer instance to make sure all queued and in-flight produce requests are completed before terminating. The wait time is bounded by the timeout parameter.
A related configuration parameter is message.timeout.ms which determines the maximum length of time librdkafka attempts to deliver a message before giving up and so also affects the maximum time a call to Flush may block.
Where this Producer instance shares a Handle with one or more other producer instances, the Flush method will wait on messages produced by the other producer instances as well.
InitTransactions(TimeSpan)
Initialize transactions for the producer instance. This function ensures any transactions initiated by previous instances of the producer with the same TransactionalId are completed. If the previous instance failed with a transaction in progress the previous transaction will be aborted. This function needs to be called before any other transactional or produce functions are called when the TransactionalId is configured.
If the last transaction had begun completion (following transaction commit) but not yet finished, this function will await the previous transaction's completion.
When any previous transactions have been fenced this function will acquire the internal producer id and epoch, used in all future transactional messages issued by this producer instance.
Upon successful return from this function the application has to perform at least one of the following operations within TransactionalTimeoutMs to avoid timing out the transaction on the broker: * ProduceAsync (et.al) * SendOffsetsToTransaction * CommitTransaction * AbortTransaction
Declaration
void InitTransactions(TimeSpan timeout)
Parameters
Type | Name | Description |
---|---|---|
TimeSpan | timeout | The maximum length of time this method may block. |
Exceptions
Type | Condition |
---|---|
KafkaRetriableException | Thrown if an error occured, and the operation may be retried. |
KafkaException | Thrown on all other errors. |
Poll(TimeSpan)
Poll for callback events.
Declaration
int Poll(TimeSpan timeout)
Parameters
Type | Name | Description |
---|---|---|
TimeSpan | timeout | The maximum period of time to block if no callback events are waiting. You should typically use a relatively short timeout period because this operation cannot be cancelled. |
Returns
Type | Description |
---|---|
int | Returns the number of events served since the last call to this method or if this method has not yet been called, over the lifetime of the producer. |
Produce(TopicPartition, Message<TKey, TValue>, Action<DeliveryReport<TKey, TValue>>)
Asynchronously send a single message to a Kafka topic partition.
Declaration
void Produce(TopicPartition topicPartition, Message<TKey, TValue> message, Action<DeliveryReport<TKey, TValue>> deliveryHandler = null)
Parameters
Type | Name | Description |
---|---|---|
TopicPartition | topicPartition | The topic partition to produce the message to. |
Message<TKey, TValue> | message | The message to produce. |
Action<DeliveryReport<TKey, TValue>> | deliveryHandler | A delegate that will be called with a delivery report corresponding to the produce request (if enabled). |
Exceptions
Type | Condition |
---|---|
ProduceException<TKey, TValue> | Thrown in response to any error that is known
immediately (excluding user application logic errors),
for example ErrorCode.Local_QueueFull. Asynchronous
notification of unsuccessful produce requests is made
available via the |
ArgumentException | Thrown in response to invalid argument values. |
InvalidOperationException | Thrown in response to error conditions that reflect an error in the application logic of the calling application. |
Produce(string, Message<TKey, TValue>, Action<DeliveryReport<TKey, TValue>>)
Asynchronously send a single message to a Kafka topic. The partition the message is sent to is determined by the partitioner defined using the 'partitioner' configuration property.
Declaration
void Produce(string topic, Message<TKey, TValue> message, Action<DeliveryReport<TKey, TValue>> deliveryHandler = null)
Parameters
Type | Name | Description |
---|---|---|
string | topic | The topic to produce the message to. |
Message<TKey, TValue> | message | The message to produce. |
Action<DeliveryReport<TKey, TValue>> | deliveryHandler | A delegate that will be called with a delivery report corresponding to the produce request (if enabled). |
Exceptions
Type | Condition |
---|---|
ProduceException<TKey, TValue> | Thrown in response to any error that is known
immediately (excluding user application logic
errors), for example ErrorCode.Local_QueueFull.
Asynchronous notification of unsuccessful produce
requests is made available via the |
ArgumentException | Thrown in response to invalid argument values. |
InvalidOperationException | Thrown in response to error conditions that reflect an error in the application logic of the calling application. |
ProduceAsync(TopicPartition, Message<TKey, TValue>, CancellationToken)
Asynchronously send a single message to a Kafka topic/partition.
Declaration
Task<DeliveryResult<TKey, TValue>> ProduceAsync(TopicPartition topicPartition, Message<TKey, TValue> message, CancellationToken cancellationToken = default)
Parameters
Type | Name | Description |
---|---|---|
TopicPartition | topicPartition | The topic partition to produce the message to. |
Message<TKey, TValue> | message | The message to produce. |
CancellationToken | cancellationToken | A cancellation token to observe whilst waiting the returned task to complete. |
Returns
Type | Description |
---|---|
Task<DeliveryResult<TKey, TValue>> | A Task which will complete with a delivery report corresponding to the produce request, or an exception if an error occured. |
Exceptions
Type | Condition |
---|---|
ProduceException<TKey, TValue> | Thrown in response to any produce request that was unsuccessful for any reason (excluding user application logic errors). The Error property of the exception provides more detailed information. |
ArgumentException | Thrown in response to invalid argument values. |
ProduceAsync(string, Message<TKey, TValue>, CancellationToken)
Asynchronously send a single message to a Kafka topic. The partition the message is sent to is determined by the partitioner defined using the 'partitioner' configuration property.
Declaration
Task<DeliveryResult<TKey, TValue>> ProduceAsync(string topic, Message<TKey, TValue> message, CancellationToken cancellationToken = default)
Parameters
Type | Name | Description |
---|---|---|
string | topic | The topic to produce the message to. |
Message<TKey, TValue> | message | The message to produce. |
CancellationToken | cancellationToken | A cancellation token to observe whilst waiting the returned task to complete. |
Returns
Type | Description |
---|---|
Task<DeliveryResult<TKey, TValue>> | A Task which will complete with a delivery report corresponding to the produce request, or an exception if an error occured. |
Exceptions
Type | Condition |
---|---|
ProduceException<TKey, TValue> | Thrown in response to any produce request that was unsuccessful for any reason (excluding user application logic errors). The Error property of the exception provides more detailed information. |
ArgumentException | Thrown in response to invalid argument values. |
SendOffsetsToTransaction(IEnumerable<TopicPartitionOffset>, IConsumerGroupMetadata, TimeSpan)
Sends a list of topic partition offsets to the consumer group
coordinator for groupMetadata
, and marks
the offsets as part part of the current transaction.
These offsets will be considered committed only if the transaction is
committed successfully.
The offsets should be the next message your application will consume, i.e., the last processed message's offset + 1 for each partition. Either track the offsets manually during processing or use Position property (on the consumer) to get the current offsets for the partitions assigned to the consumer.
Use this method at the end of a consume-transform-produce loop prior to committing the transaction with CommitTransaction.
Declaration
void SendOffsetsToTransaction(IEnumerable<TopicPartitionOffset> offsets, IConsumerGroupMetadata groupMetadata, TimeSpan timeout)
Parameters
Type | Name | Description |
---|---|---|
IEnumerable<TopicPartitionOffset> | offsets | List of offsets to commit to the consumer group upon successful commit of the transaction. Offsets should be the next message to consume, e.g., last processed message + 1. |
IConsumerGroupMetadata | groupMetadata | The consumer group metadata acquired via ConsumerGroupMetadata |
TimeSpan | timeout | The maximum length of time this method may block. |
Exceptions
Type | Condition |
---|---|
ArgumentException | Thrown if group metadata is invalid. |
KafkaTxnRequiresAbortException | Thrown if the application must call AbortTransaction and start a new transaction with BeginTransaction if it wishes to proceed with transactions. |
KafkaRetriableException | Thrown if an error occured, and the operation may be retried. |
KafkaException | Thrown on all other errors. |