confluent-kafka-dotnet
Show / Hide Table of Contents

Interface IProducer<TKey, TValue>

Defines a high-level Apache Kafka producer client that provides key and value serialization.

Inherited Members
IClient.Handle
IClient.Name
IClient.AddBrokers(String)
System.IDisposable.Dispose()
Namespace: Confluent.Kafka
Assembly: cs.temp.dll.dll
Syntax
public interface IProducer<TKey, TValue> : IClient, IDisposable
Type Parameters
Name Description
TKey
TValue

Methods

AbortTransaction()

Aborts the ongoing transaction.

This function should also be used to recover from non-fatal abortable transaction errors.

Any outstanding messages will be purged and fail.

Declaration
void AbortTransaction()
Exceptions
Type Condition
KafkaRetriableException

Thrown if an error occured, and the operation may be retried.

KafkaException

Thrown on all other errors.

AbortTransaction(TimeSpan)

Aborts the ongoing transaction.

This function should also be used to recover from non-fatal abortable transaction errors.

Any outstanding messages will be purged and fail.

IMPORTANT NOTE: It is currently strongly recommended that the application call AbortTransaction without specifying a timeout (which will block up to the remaining transaction timeout - ProducerConfig.TransactionTimeoutMs) because the Transactional Producer's API timeout handling is inconsistent with the underlying protocol requests (known issue).

Declaration
void AbortTransaction(TimeSpan timeout)
Parameters
Type Name Description
System.TimeSpan timeout

The maximum length of time this method may block.

Exceptions
Type Condition
KafkaRetriableException

Thrown if an error occured, and the operation may be retried.

KafkaException

Thrown on all other errors.

BeginTransaction()

Begin a new transaction.

InitTransactions must have been called successfully (once) before this function is called.

Any messages produced, offsets sent (SendOffsetsToTransaction), etc, after the successful return of this function will be part of the transaction and committed or aborted atomatically.

Finish the transaction by calling CommitTransaction or abort the transaction by calling AbortTransaction.

Declaration
void BeginTransaction()
Exceptions
Type Condition
KafkaException

Thrown on all errors.

CommitTransaction()

Commit the current transaction (as started with BeginTransaction).

Any outstanding messages will be flushed (delivered) before actually committing the transaction.

If any of the outstanding messages fail permanently the current transaction will enter the abortable error state, in this case the application must call AbortTransaction before attempting a new transaction with BeginTransaction.

Declaration
void CommitTransaction()
Exceptions
Type Condition
KafkaTxnRequiresAbortException

Thrown if the application must call AbortTransaction and start a new transaction with BeginTransaction if it wishes to proceed with transactions.

KafkaRetriableException

Thrown if an error occured, and the operation may be retried.

KafkaException

Thrown on all other errors.

CommitTransaction(TimeSpan)

Commit the current transaction (as started with BeginTransaction).

Any outstanding messages will be flushed (delivered) before actually committing the transaction.

If any of the outstanding messages fail permanently the current transaction will enter the abortable error state, in this case the application must call AbortTransaction before attempting a new transaction with BeginTransaction.

IMPORTANT NOTE: It is currently strongly recommended that the application call CommitTransaction without specifying a timeout (which will block up to the remaining transaction timeout - ProducerConfig.TransactionTimeoutMs) because the Transactional Producer's API timeout handling is inconsistent with the underlying protocol requests (known issue).

Declaration
void CommitTransaction(TimeSpan timeout)
Parameters
Type Name Description
System.TimeSpan timeout

The maximum length of time this method may block.

Exceptions
Type Condition
KafkaTxnRequiresAbortException

Thrown if the application must call AbortTransaction and start a new transaction with BeginTransaction if it wishes to proceed with transactions.

KafkaRetriableException

Thrown if an error occured, and the operation may be retried.

KafkaException

Thrown on all other errors.

Flush(CancellationToken)

Wait until all outstanding produce requests and delivery report callbacks are completed.

Declaration
void Flush(CancellationToken cancellationToken = null)
Parameters
Type Name Description
CancellationToken cancellationToken

A cancellation token to observe whilst waiting the returned task to complete.

Remarks

This method should typically be called prior to destroying a producer instance to make sure all queued and in-flight produce requests are completed before terminating.

A related configuration parameter is message.timeout.ms which determines the maximum length of time librdkafka attempts to deliver a message before giving up and so also affects the maximum time a call to Flush may block.

Where this Producer instance shares a Handle with one or more other producer instances, the Flush method will wait on messages produced by the other producer instances as well.

Exceptions
Type Condition
System.OperationCanceledException

Thrown if the operation is cancelled.

Flush(TimeSpan)

Wait until all outstanding produce requests and delivery report callbacks are completed.

[API-SUBJECT-TO-CHANGE] - the semantics and/or type of the return value is subject to change.

Declaration
int Flush(TimeSpan timeout)
Parameters
Type Name Description
System.TimeSpan timeout

The maximum length of time to block. You should typically use a relatively short timeout period and loop until the return value becomes zero because this operation cannot be cancelled.

Returns
Type Description
System.Int32

The current librdkafka out queue length. This should be interpreted as a rough indication of the number of messages waiting to be sent to or acknowledged by the broker. If zero, there are no outstanding messages or callbacks. Specifically, the value is equal to the sum of the number of produced messages for which a delivery report has not yet been handled and a number which is less than or equal to the number of pending delivery report callback events (as determined by the number of outstanding protocol requests).

Remarks

This method should typically be called prior to destroying a producer instance to make sure all queued and in-flight produce requests are completed before terminating. The wait time is bounded by the timeout parameter.

A related configuration parameter is message.timeout.ms which determines the maximum length of time librdkafka attempts to deliver a message before giving up and so also affects the maximum time a call to Flush may block.

Where this Producer instance shares a Handle with one or more other producer instances, the Flush method will wait on messages produced by the other producer instances as well.

InitTransactions(TimeSpan)

Initialize transactions for the producer instance. This function ensures any transactions initiated by previous instances of the producer with the same TransactionalId are completed. If the previous instance failed with a transaction in progress the previous transaction will be aborted. This function needs to be called before any other transactional or produce functions are called when the TransactionalId is configured.

If the last transaction had begun completion (following transaction commit) but not yet finished, this function will await the previous transaction's completion.

When any previous transactions have been fenced this function will acquire the internal producer id and epoch, used in all future transactional messages issued by this producer instance.

Upon successful return from this function the application has to perform at least one of the following operations within TransactionalTimeoutMs to avoid timing out the transaction on the broker:

  • ProduceAsync (et.al)
  • SendOffsetsToTransaction
  • CommitTransaction
  • AbortTransaction
Declaration
void InitTransactions(TimeSpan timeout)
Parameters
Type Name Description
System.TimeSpan timeout

The maximum length of time this method may block.

Exceptions
Type Condition
KafkaRetriableException

Thrown if an error occured, and the operation may be retried.

KafkaException

Thrown on all other errors.

Poll(TimeSpan)

Poll for callback events.

Declaration
int Poll(TimeSpan timeout)
Parameters
Type Name Description
System.TimeSpan timeout

The maximum period of time to block if no callback events are waiting. You should typically use a relatively short timeout period because this operation cannot be cancelled.

Returns
Type Description
System.Int32

Returns the number of events served since the last call to this method or if this method has not yet been called, over the lifetime of the producer.

Produce(TopicPartition, Message<TKey, TValue>, Action<DeliveryReport<TKey, TValue>>)

Asynchronously send a single message to a Kafka topic partition.

Declaration
void Produce(TopicPartition topicPartition, Message<TKey, TValue> message, Action<DeliveryReport<TKey, TValue>> deliveryHandler = null)
Parameters
Type Name Description
TopicPartition topicPartition

The topic partition to produce the message to.

Message<TKey, TValue> message

The message to produce.

System.Action<DeliveryReport<TKey, TValue>> deliveryHandler

A delegate that will be called with a delivery report corresponding to the produce request (if enabled).

Exceptions
Type Condition
ProduceException<TKey, TValue>

Thrown in response to any error that is known immediately (excluding user application logic errors), for example ErrorCode.Local_QueueFull. Asynchronous notification of unsuccessful produce requests is made available via the deliveryHandler parameter (if specified). The Error property of the exception / delivery report provides more detailed information.

System.ArgumentException

Thrown in response to invalid argument values.

System.InvalidOperationException

Thrown in response to error conditions that reflect an error in the application logic of the calling application.

Produce(String, Message<TKey, TValue>, Action<DeliveryReport<TKey, TValue>>)

Asynchronously send a single message to a Kafka topic. The partition the message is sent to is determined by the partitioner defined using the 'partitioner' configuration property.

Declaration
void Produce(string topic, Message<TKey, TValue> message, Action<DeliveryReport<TKey, TValue>> deliveryHandler = null)
Parameters
Type Name Description
System.String topic

The topic to produce the message to.

Message<TKey, TValue> message

The message to produce.

System.Action<DeliveryReport<TKey, TValue>> deliveryHandler

A delegate that will be called with a delivery report corresponding to the produce request (if enabled).

Exceptions
Type Condition
ProduceException<TKey, TValue>

Thrown in response to any error that is known immediately (excluding user application logic errors), for example ErrorCode.Local_QueueFull. Asynchronous notification of unsuccessful produce requests is made available via the deliveryHandler parameter (if specified). The Error property of the exception / delivery report provides more detailed information.

System.ArgumentException

Thrown in response to invalid argument values.

System.InvalidOperationException

Thrown in response to error conditions that reflect an error in the application logic of the calling application.

ProduceAsync(TopicPartition, Message<TKey, TValue>, CancellationToken)

Asynchronously send a single message to a Kafka topic/partition.

Declaration
Task<DeliveryResult<TKey, TValue>> ProduceAsync(TopicPartition topicPartition, Message<TKey, TValue> message, CancellationToken cancellationToken = null)
Parameters
Type Name Description
TopicPartition topicPartition

The topic partition to produce the message to.

Message<TKey, TValue> message

The message to produce.

CancellationToken cancellationToken

A cancellation token to observe whilst waiting the returned task to complete.

Returns
Type Description
System.Threading.Tasks.Task<DeliveryResult<TKey, TValue>>

A Task which will complete with a delivery report corresponding to the produce request, or an exception if an error occured.

Exceptions
Type Condition
ProduceException<TKey, TValue>

Thrown in response to any produce request that was unsuccessful for any reason (excluding user application logic errors). The Error property of the exception provides more detailed information.

System.ArgumentException

Thrown in response to invalid argument values.

ProduceAsync(String, Message<TKey, TValue>, CancellationToken)

Asynchronously send a single message to a Kafka topic. The partition the message is sent to is determined by the partitioner defined using the 'partitioner' configuration property.

Declaration
Task<DeliveryResult<TKey, TValue>> ProduceAsync(string topic, Message<TKey, TValue> message, CancellationToken cancellationToken = null)
Parameters
Type Name Description
System.String topic

The topic to produce the message to.

Message<TKey, TValue> message

The message to produce.

CancellationToken cancellationToken

A cancellation token to observe whilst waiting the returned task to complete.

Returns
Type Description
System.Threading.Tasks.Task<DeliveryResult<TKey, TValue>>

A Task which will complete with a delivery report corresponding to the produce request, or an exception if an error occured.

Exceptions
Type Condition
ProduceException<TKey, TValue>

Thrown in response to any produce request that was unsuccessful for any reason (excluding user application logic errors). The Error property of the exception provides more detailed information.

System.ArgumentException

Thrown in response to invalid argument values.

SendOffsetsToTransaction(IEnumerable<TopicPartitionOffset>, IConsumerGroupMetadata, TimeSpan)

Sends a list of topic partition offsets to the consumer group coordinator for groupMetadata, and marks the offsets as part part of the current transaction. These offsets will be considered committed only if the transaction is committed successfully.

The offsets should be the next message your application will consume, i.e., the last processed message's offset + 1 for each partition. Either track the offsets manually during processing or use Position property (on the consumer) to get the current offsets for the partitions assigned to the consumer.

Use this method at the end of a consume-transform-produce loop prior to committing the transaction with CommitTransaction.

Declaration
void SendOffsetsToTransaction(IEnumerable<TopicPartitionOffset> offsets, IConsumerGroupMetadata groupMetadata, TimeSpan timeout)
Parameters
Type Name Description
System.Collections.Generic.IEnumerable<TopicPartitionOffset> offsets

List of offsets to commit to the consumer group upon successful commit of the transaction. Offsets should be the next message to consume, e.g., last processed message + 1.

IConsumerGroupMetadata groupMetadata

The consumer group metadata acquired via ConsumerGroupMetadata

System.TimeSpan timeout

The maximum length of time this method may block.

Exceptions
Type Condition
System.ArgumentException

Thrown if group metadata is invalid.

KafkaTxnRequiresAbortException

Thrown if the application must call AbortTransaction and start a new transaction with BeginTransaction if it wishes to proceed with transactions.

KafkaRetriableException

Thrown if an error occured, and the operation may be retried.

KafkaException

Thrown on all other errors.

In This Article