confluent-kafka-dotnet
Show / Hide Table of Contents

Class ConsumerBuilder<TKey, TValue>

A builder class for IConsumer<TKey, TValue>.

Inheritance
System.Object
ConsumerBuilder<TKey, TValue>
Inherited Members
System.Object.ToString()
System.Object.Equals(System.Object)
System.Object.Equals(System.Object, System.Object)
System.Object.ReferenceEquals(System.Object, System.Object)
System.Object.GetHashCode()
System.Object.GetType()
System.Object.MemberwiseClone()
Namespace: Confluent.Kafka
Assembly: cs.temp.dll.dll
Syntax
public class ConsumerBuilder<TKey, TValue>
Type Parameters
Name Description
TKey
TValue

Constructors

ConsumerBuilder(IEnumerable<KeyValuePair<String, String>>)

Initialize a new ConsumerBuilder instance.

Declaration
public ConsumerBuilder(IEnumerable<KeyValuePair<string, string>> config)
Parameters
Type Name Description
System.Collections.Generic.IEnumerable<System.Collections.Generic.KeyValuePair<System.String, System.String>> config

A collection of librdkafka configuration parameters (refer to https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md) and parameters specific to this client (refer to: ConfigPropertyNames). At a minimum, 'bootstrap.servers' and 'group.id' must be specified.

Properties

Config

The config dictionary.

Declaration
protected IEnumerable<KeyValuePair<string, string>> Config { get; set; }
Property Value
Type Description
System.Collections.Generic.IEnumerable<System.Collections.Generic.KeyValuePair<System.String, System.String>>

ErrorHandler

The configured error handler.

Declaration
protected Action<IConsumer<TKey, TValue>, Error> ErrorHandler { get; set; }
Property Value
Type Description
System.Action<IConsumer<TKey, TValue>, Error>

KeyDeserializer

The configured key deserializer.

Declaration
protected IDeserializer<TKey> KeyDeserializer { get; set; }
Property Value
Type Description
IDeserializer<TKey>

LogHandler

The configured log handler.

Declaration
protected Action<IConsumer<TKey, TValue>, LogMessage> LogHandler { get; set; }
Property Value
Type Description
System.Action<IConsumer<TKey, TValue>, LogMessage>

OAuthBearerTokenRefreshHandler

The configured OAuthBearer Token Refresh handler.

Declaration
protected Action<IConsumer<TKey, TValue>, string> OAuthBearerTokenRefreshHandler { get; set; }
Property Value
Type Description
System.Action<IConsumer<TKey, TValue>, System.String>

OffsetsCommittedHandler

The configured offsets committed handler.

Declaration
protected Action<IConsumer<TKey, TValue>, CommittedOffsets> OffsetsCommittedHandler { get; set; }
Property Value
Type Description
System.Action<IConsumer<TKey, TValue>, CommittedOffsets>

PartitionsAssignedHandler

The configured partitions assigned handler.

Declaration
protected Func<IConsumer<TKey, TValue>, List<TopicPartition>, IEnumerable<TopicPartitionOffset>> PartitionsAssignedHandler { get; set; }
Property Value
Type Description
System.Func<IConsumer<TKey, TValue>, System.Collections.Generic.List<TopicPartition>, System.Collections.Generic.IEnumerable<TopicPartitionOffset>>

PartitionsRevokedHandler

The configured partitions revoked handler.

Declaration
protected Func<IConsumer<TKey, TValue>, List<TopicPartitionOffset>, IEnumerable<TopicPartitionOffset>> PartitionsRevokedHandler { get; set; }
Property Value
Type Description
System.Func<IConsumer<TKey, TValue>, System.Collections.Generic.List<TopicPartitionOffset>, System.Collections.Generic.IEnumerable<TopicPartitionOffset>>

StatisticsHandler

The configured statistics handler.

Declaration
protected Action<IConsumer<TKey, TValue>, string> StatisticsHandler { get; set; }
Property Value
Type Description
System.Action<IConsumer<TKey, TValue>, System.String>

ValueDeserializer

The configured value deserializer.

Declaration
protected IDeserializer<TValue> ValueDeserializer { get; set; }
Property Value
Type Description
IDeserializer<TValue>

Methods

Build()

Build a new IConsumer implementation instance.

Declaration
public virtual IConsumer<TKey, TValue> Build()
Returns
Type Description
IConsumer<TKey, TValue>

SetErrorHandler(Action<IConsumer<TKey, TValue>, Error>)

Set the handler to call on error events e.g. connection failures or all brokers down. Note that the client will try to automatically recover from errors that are not marked as fatal. Non-fatal errors should be interpreted as informational rather than catastrophic.

Declaration
public ConsumerBuilder<TKey, TValue> SetErrorHandler(Action<IConsumer<TKey, TValue>, Error> errorHandler)
Parameters
Type Name Description
System.Action<IConsumer<TKey, TValue>, Error> errorHandler
Returns
Type Description
ConsumerBuilder<TKey, TValue>
Remarks

Executes as a side-effect of the Consume method (on the same thread).

Exceptions: Any exception thrown by your error handler will be silently ignored.

SetKeyDeserializer(IDeserializer<TKey>)

Set the deserializer to use to deserialize keys.

Declaration
public ConsumerBuilder<TKey, TValue> SetKeyDeserializer(IDeserializer<TKey> deserializer)
Parameters
Type Name Description
IDeserializer<TKey> deserializer
Returns
Type Description
ConsumerBuilder<TKey, TValue>
Remarks

If your key deserializer throws an exception, this will be wrapped in a ConsumeException with ErrorCode Local_KeyDeserialization and thrown by the initiating call to Consume.

SetLogHandler(Action<IConsumer<TKey, TValue>, LogMessage>)

Set the handler to call when there is information available to be logged. If not specified, a default callback that writes to stderr will be used.

Declaration
public ConsumerBuilder<TKey, TValue> SetLogHandler(Action<IConsumer<TKey, TValue>, LogMessage> logHandler)
Parameters
Type Name Description
System.Action<IConsumer<TKey, TValue>, LogMessage> logHandler
Returns
Type Description
ConsumerBuilder<TKey, TValue>
Remarks

By default not many log messages are generated.

For more verbose logging, specify one or more debug contexts using the 'Debug' configuration property.

Warning: Log handlers are called spontaneously from internal librdkafka threads and the application must not call any Confluent.Kafka APIs from within a log handler or perform any prolonged operations.

Exceptions: Any exception thrown by your log handler will be silently ignored.

SetOAuthBearerTokenRefreshHandler(Action<IConsumer<TKey, TValue>, String>)

Set SASL/OAUTHBEARER token refresh callback in provided conf object. The SASL/OAUTHBEARER token refresh callback is triggered via Consume(Int32) (or any of its overloads) whenever OAUTHBEARER is the SASL mechanism and a token needs to be retrieved, typically based on the configuration defined in sasl.oauthbearer.config. The callback should invoke OAuthBearerSetToken(IClient, String, Int64, String, IDictionary<String, String>) or OAuthBearerSetTokenFailure(IClient, String) to indicate success or failure, respectively.

An unsecured JWT refresh handler is provided by librdkafka for development and testing purposes, it is enabled by setting the enable.sasl.oauthbearer.unsecure.jwt property to true and is mutually exclusive to using a refresh callback.

Declaration
public ConsumerBuilder<TKey, TValue> SetOAuthBearerTokenRefreshHandler(Action<IConsumer<TKey, TValue>, string> oAuthBearerTokenRefreshHandler)
Parameters
Type Name Description
System.Action<IConsumer<TKey, TValue>, System.String> oAuthBearerTokenRefreshHandler

the callback to set; callback function arguments: IConsumer - instance of the consumer which should be used to set token or token failure string - Value of configuration property sasl.oauthbearer.config

Returns
Type Description
ConsumerBuilder<TKey, TValue>

SetOffsetsCommittedHandler(Action<IConsumer<TKey, TValue>, CommittedOffsets>)

A handler that is called to report the result of (automatic) offset commits. It is not called as a result of the use of the Commit method.

Declaration
public ConsumerBuilder<TKey, TValue> SetOffsetsCommittedHandler(Action<IConsumer<TKey, TValue>, CommittedOffsets> offsetsCommittedHandler)
Parameters
Type Name Description
System.Action<IConsumer<TKey, TValue>, CommittedOffsets> offsetsCommittedHandler
Returns
Type Description
ConsumerBuilder<TKey, TValue>
Remarks

Executes as a side-effect of the Consumer.Consume call (on the same thread).

Exceptions: Any exception thrown by your offsets committed handler will be wrapped in a ConsumeException with ErrorCode ErrorCode.Local_Application and thrown by the initiating call to Consume/Close.

SetPartitionsAssignedHandler(Action<IConsumer<TKey, TValue>, List<TopicPartition>>)

This handler is called when a new consumer group partition assignment has been received by this consumer.

Note: corresponding to every call to this handler there will be a corresponding call to the partitions revoked handler (if one has been set using SetPartitionsRevokedHandler").

Consumption will resume from the last committed offset for each partition, or if there is no committed offset, in accordance with the auto.offset.reset configuration property.

Declaration
public ConsumerBuilder<TKey, TValue> SetPartitionsAssignedHandler(Action<IConsumer<TKey, TValue>, List<TopicPartition>> partitionAssignmentHandler)
Parameters
Type Name Description
System.Action<IConsumer<TKey, TValue>, System.Collections.Generic.List<TopicPartition>> partitionAssignmentHandler
Returns
Type Description
ConsumerBuilder<TKey, TValue>
Remarks

May execute as a side-effect of the Consumer.Consume call (on the same thread).

Assign/Unassign must not be called in the handler.

Exceptions: Any exception thrown by your partitions assigned handler will be wrapped in a ConsumeException with ErrorCode ErrorCode.Local_Application and thrown by the initiating call to Consume.

SetPartitionsAssignedHandler(Func<IConsumer<TKey, TValue>, List<TopicPartition>, IEnumerable<TopicPartitionOffset>>)

This handler is called when a new consumer group partition assignment has been received by this consumer.

Note: corresponding to every call to this handler there will be a corresponding call to the partitions revoked handler (if one has been set using SetPartitionsRevokedHandler).

The actual partitions to consume from and start offsets are specified by the return value of the handler. This set of partitions is not required to match the assignment provided by the consumer group, but typically will. Partition offsets may be a specific offset, or special value (Beginning, End or Unset). If Unset, consumption will resume from the last committed offset for each partition, or if there is no committed offset, in accordance with the auto.offset.reset configuration property.

Declaration
public ConsumerBuilder<TKey, TValue> SetPartitionsAssignedHandler(Func<IConsumer<TKey, TValue>, List<TopicPartition>, IEnumerable<TopicPartitionOffset>> partitionsAssignedHandler)
Parameters
Type Name Description
System.Func<IConsumer<TKey, TValue>, System.Collections.Generic.List<TopicPartition>, System.Collections.Generic.IEnumerable<TopicPartitionOffset>> partitionsAssignedHandler
Returns
Type Description
ConsumerBuilder<TKey, TValue>
Remarks

May execute as a side-effect of the Consumer.Consume call (on the same thread).

Assign/Unassign must not be called in the handler.

Exceptions: Any exception thrown by your partitions assigned handler will be wrapped in a ConsumeException with ErrorCode ErrorCode.Local_Application and thrown by the initiating call to Consume.

SetPartitionsRevokedHandler(Action<IConsumer<TKey, TValue>, List<TopicPartitionOffset>>)

This handler is called immediately prior to a group partition assignment being revoked. The second parameter provides the set of partitions the consumer is currently assigned to, and the current position of the consumer on each of these partitions.

The return value of the handler specifies the partitions/offsets the consumer should be assigned to following completion of this method (typically empty).

Declaration
public ConsumerBuilder<TKey, TValue> SetPartitionsRevokedHandler(Action<IConsumer<TKey, TValue>, List<TopicPartitionOffset>> partitionsRevokedHandler)
Parameters
Type Name Description
System.Action<IConsumer<TKey, TValue>, System.Collections.Generic.List<TopicPartitionOffset>> partitionsRevokedHandler
Returns
Type Description
ConsumerBuilder<TKey, TValue>
Remarks

May execute as a side-effect of the Consumer.Consume call (on the same thread).

Assign/Unassign must not be called in the handler.

Exceptions: Any exception thrown by your partitions revoked handler will be wrapped in a ConsumeException with ErrorCode ErrorCode.Local_Application and thrown by the initiating call to Consume.

SetPartitionsRevokedHandler(Func<IConsumer<TKey, TValue>, List<TopicPartitionOffset>, IEnumerable<TopicPartitionOffset>>)

This handler is called immediately prior to a group partition assignment being revoked. The second parameter provides the set of partitions the consumer is currently assigned to, and the current position of the consumer on each of these partitions.

Declaration
public ConsumerBuilder<TKey, TValue> SetPartitionsRevokedHandler(Func<IConsumer<TKey, TValue>, List<TopicPartitionOffset>, IEnumerable<TopicPartitionOffset>> partitionsRevokedHandler)
Parameters
Type Name Description
System.Func<IConsumer<TKey, TValue>, System.Collections.Generic.List<TopicPartitionOffset>, System.Collections.Generic.IEnumerable<TopicPartitionOffset>> partitionsRevokedHandler
Returns
Type Description
ConsumerBuilder<TKey, TValue>
Remarks

May execute as a side-effect of the Consumer.Consume call (on the same thread).

Assign/Unassign must not be called in the handler.

Exceptions: Any exception thrown by your partitions revoked handler will be wrapped in a ConsumeException with ErrorCode ErrorCode.Local_Application and thrown by the initiating call to Consume/Close.

SetStatisticsHandler(Action<IConsumer<TKey, TValue>, String>)

Set the handler to call on statistics events. Statistics are provided as a JSON formatted string as defined here: https://github.com/edenhill/librdkafka/blob/master/STATISTICS.md

Declaration
public ConsumerBuilder<TKey, TValue> SetStatisticsHandler(Action<IConsumer<TKey, TValue>, string> statisticsHandler)
Parameters
Type Name Description
System.Action<IConsumer<TKey, TValue>, System.String> statisticsHandler
Returns
Type Description
ConsumerBuilder<TKey, TValue>
Remarks

You can enable statistics and set the statistics interval using the StatisticsIntervalMs configuration property (disabled by default).

Executes as a side-effect of the Consume method (on the same thread).

Exceptions: Any exception thrown by your statistics handler will be wrapped in a ConsumeException with ErrorCode ErrorCode.Local_Application and thrown by the initiating call to Consume.

SetValueDeserializer(IDeserializer<TValue>)

Set the deserializer to use to deserialize values.

Declaration
public ConsumerBuilder<TKey, TValue> SetValueDeserializer(IDeserializer<TValue> deserializer)
Parameters
Type Name Description
IDeserializer<TValue> deserializer
Returns
Type Description
ConsumerBuilder<TKey, TValue>
Remarks

If your value deserializer throws an exception, this will be wrapped in a ConsumeException with ErrorCode Local_ValueDeserialization and thrown by the initiating call to Consume.