confluent-kafka-dotnet
Show / Hide Table of Contents

Class Consumer<TKey, TValue>

Implements a high-level Apache Kafka consumer (with key and value deserialization).

Inheritance
System.Object
Consumer<TKey, TValue>
Namespace: Confluent.Kafka
Assembly: cs.temp.dll.dll
Syntax
public class Consumer<TKey, TValue> : IDisposable
Type Parameters
Name Description
TKey
TValue

Constructors

Consumer(IEnumerable<KeyValuePair<String, Object>>, IDeserializer<TKey>, IDeserializer<TValue>)

Creates a new Consumer instance.

Declaration
public Consumer(IEnumerable<KeyValuePair<string, object>> config, IDeserializer<TKey> keyDeserializer, IDeserializer<TValue> valueDeserializer)
Parameters
Type Name Description
IEnumerable<KeyValuePair<System.String, System.Object>> config

librdkafka configuration parameters (refer to https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md)

IDeserializer<TKey> keyDeserializer

An IDeserializer implementation instance for deserializing keys.

IDeserializer<TValue> valueDeserializer

An IDeserializer implementation instance for deserializing values.

Properties

Assignment

Gets the current partition assignment as set by Assign.

Declaration
public List<TopicPartition> Assignment { get; }
Property Value
Type Description
List<TopicPartition>

KeyDeserializer

The IDeserializer implementation instance used to deserialize keys.

Declaration
public IDeserializer<TKey> KeyDeserializer { get; }
Property Value
Type Description
IDeserializer<TKey>

MemberId

Gets the (dynamic) group member id of this consumer (as set by the broker).

Declaration
public string MemberId { get; }
Property Value
Type Description
System.String

Name

Gets the name of this consumer instance. Contains (but is not equal to) the client.id configuration parameter.

Declaration
public string Name { get; }
Property Value
Type Description
System.String
Remarks

This name will be unique across all consumer instances in a given application which allows log messages to be associated with the corresponding instance.

Subscription

Gets the current partition subscription as set by Subscribe.

Declaration
public List<string> Subscription { get; }
Property Value
Type Description
List<System.String>

ValueDeserializer

The IDeserializer implementation instance used to deserialize values.

Declaration
public IDeserializer<TValue> ValueDeserializer { get; }
Property Value
Type Description
IDeserializer<TValue>

Methods

AddBrokers(String)

Adds one or more brokers to the Consumer's list of initial bootstrap brokers.

Note: Additional brokers are discovered automatically as soon as the Consumer connects to any broker by querying the broker metadata. Calling this method is only required in some scenarios where the address of all brokers in the cluster changes.

Declaration
public int AddBrokers(string brokers)
Parameters
Type Name Description
System.String brokers

Coma-separated list of brokers in the same format as the bootstrap.server configuration parameter.

Returns
Type Description
System.Int32

The number of brokers added. This value includes brokers that may have been specified a second time.

Remarks

There is currently no API to remove existing configured, added or learnt brokers.

Assign(IEnumerable<TopicPartition>)

Update the assignment set to partitions.

The assignment set is the complete set of partitions to consume from and will replace any previous assignment.

Declaration
public void Assign(IEnumerable<TopicPartition> partitions)
Parameters
Type Name Description
IEnumerable<TopicPartition> partitions

The set of partitions to consume from. Consumption will resume from the last committed offset on each partition, or according to the 'auto.offset.reset' configuration parameter if no offsets have been committed yet.

Assign(IEnumerable<TopicPartitionOffset>)

Update the assignment set to partitions.

The assignment set is the complete set of partitions to consume from and will replace any previous assignment.

Declaration
public void Assign(IEnumerable<TopicPartitionOffset> partitions)
Parameters
Type Name Description
IEnumerable<TopicPartitionOffset> partitions

The set of partitions to consume from. If an offset value of Offset.Invalid (-1001) is specified for a partition, consumption will resume from the last committed offset on that partition, or according to the 'auto.offset.reset' configuration parameter if no offsets have been committed yet.

CommitAsync()

Commit offsets for the current assignment.

Declaration
public Task<CommittedOffsets> CommitAsync()
Returns
Type Description
Task<CommittedOffsets>

CommitAsync(Message<TKey, TValue>)

Commits an offset based on the topic/partition/offset of a message. The next message to be read will be that following message.

Declaration
public Task<CommittedOffsets> CommitAsync(Message<TKey, TValue> message)
Parameters
Type Name Description
Message<TKey, TValue> message

The message used to determine the committed offset.

Returns
Type Description
Task<CommittedOffsets>
Remarks

A consumer which has position N has consumed records with offsets 0 through N-1 and will next receive the record with offset N. Hence, this method commits an offset of message.Offset + 1.

CommitAsync(IEnumerable<TopicPartitionOffset>)

Commit an explicit list of offsets.

Declaration
public Task<CommittedOffsets> CommitAsync(IEnumerable<TopicPartitionOffset> offsets)
Parameters
Type Name Description
IEnumerable<TopicPartitionOffset> offsets
Returns
Type Description
Task<CommittedOffsets>

Committed(IEnumerable<TopicPartition>, TimeSpan)

Retrieve current committed offsets for topics + partitions.

The offset field of each requested partition will be set to the offset of the last consumed message, or RD_KAFKA_OFFSET_INVALID in case there was no previous message, or, alternately a partition specific error may also be returned.

throws KafkaException if there was a problem retrieving the above information.

Declaration
public List<TopicPartitionOffsetError> Committed(IEnumerable<TopicPartition> partitions, TimeSpan timeout)
Parameters
Type Name Description
IEnumerable<TopicPartition> partitions
TimeSpan timeout
Returns
Type Description
List<TopicPartitionOffsetError>

Consume(out Message<TKey, TValue>, Int32)

Poll for new messages / consumer events. Blocks until a new message or event is ready to be handled or the timeout period millisecondsTimeout has elapsed.

Declaration
public bool Consume(out Message<TKey, TValue> message, int millisecondsTimeout)
Parameters
Type Name Description
Message<TKey, TValue> message

A consumed message, or null if no messages are available for consumption.

System.Int32 millisecondsTimeout

The maximum time to block (in milliseconds), or -1 to block indefinitely. You should typically use a relatively short timout period because this operation cannot be cancelled.

Returns
Type Description
System.Boolean

true: a message (with non-error state) was consumed. false: no message was available for consumption.

Remarks

Will invoke events for OnPartitionsAssigned/Revoked, OnOffsetsCommitted, OnConsumeError etc. on the calling thread.

Consume(out Message<TKey, TValue>, TimeSpan)

Refer to Consume(out Message<TKey, TValue>, Int32).

Declaration
public bool Consume(out Message<TKey, TValue> message, TimeSpan timeout)
Parameters
Type Name Description
Message<TKey, TValue> message
TimeSpan timeout
Returns
Type Description
System.Boolean

Dispose()

Releases all resources used by this Consumer.

This call will block until the consumer has revoked its assignment, calling the rebalance event if it is configured, committed offsets to broker, and left the consumer group.

[UNSTABLE-API] - The Dispose method should not block. We will separate out consumer close functionality from this method.

Declaration
public void Dispose()

GetMetadata(Boolean)

Refer to Confluent.Kafka.Producer.GetMetadata(System.Boolean,System.String,System.Int32) for more information.

[UNSTABLE-API] - The API associated with this functionality is subject to change.

Declaration
public Metadata GetMetadata(bool allTopics)
Parameters
Type Name Description
System.Boolean allTopics
Returns
Type Description
Metadata

GetMetadata(Boolean, TimeSpan)

Refer to Confluent.Kafka.Producer.GetMetadata(System.Boolean,System.String,System.Int32) for more information.

[UNSTABLE-API] - The API associated with this functionality is subject to change.

Declaration
public Metadata GetMetadata(bool allTopics, TimeSpan timeout)
Parameters
Type Name Description
System.Boolean allTopics
TimeSpan timeout
Returns
Type Description
Metadata

GetWatermarkOffsets(TopicPartition)

Get last known low (oldest/beginning) and high (newest/end) offsets for a topic/partition.

[UNSTABLE-API] - The API associated with this functionality is subject to change.

Declaration
public WatermarkOffsets GetWatermarkOffsets(TopicPartition topicPartition)
Parameters
Type Name Description
TopicPartition topicPartition

The topic/partition of interest.

Returns
Type Description
WatermarkOffsets

The requested WatermarkOffsets.

Remarks

The low offset is updated periodically (if statistics.interval.ms is set) while the high offset is updated on each fetched message set from the broker.

If there is no cached offset (either low or high, or both) then Offset.Invalid will be returned for the respective offset.

ListGroup(String)

Get information pertaining to a particular group in the Kafka cluster (blocks, potentially indefinitely).

[UNSTABLE-API] - The API associated with this functionality is subject to change.

Declaration
public GroupInfo ListGroup(string group)
Parameters
Type Name Description
System.String group

The group of interest.

Returns
Type Description
GroupInfo

Returns information pertaining to the specified group or null if this group does not exist.

ListGroup(String, TimeSpan)

Get information pertaining to a particular group in the Kafka cluster (blocking).

[UNSTABLE-API] - The API associated with this functionality is subject to change.

Declaration
public GroupInfo ListGroup(string group, TimeSpan timeout)
Parameters
Type Name Description
System.String group

The group of interest.

TimeSpan timeout

The maximum period of time the call may block.

Returns
Type Description
GroupInfo

Returns information pertaining to the specified group or null if this group does not exist.

ListGroups(TimeSpan)

Get information pertaining to all groups in the Kafka cluster (blocking).

[UNSTABLE-API] - The API associated with this functionality is subject to change.

Declaration
public List<GroupInfo> ListGroups(TimeSpan timeout)
Parameters
Type Name Description
TimeSpan timeout

The maximum period of time the call may block.

Returns
Type Description
List<GroupInfo>

OffsetsForTimes(IEnumerable<TopicPartitionTimestamp>, TimeSpan)

Declaration
public IEnumerable<TopicPartitionOffsetError> OffsetsForTimes(IEnumerable<TopicPartitionTimestamp> timestampsToSearch, TimeSpan timeout)
Parameters
Type Name Description
IEnumerable<TopicPartitionTimestamp> timestampsToSearch
TimeSpan timeout
Returns
Type Description
IEnumerable<TopicPartitionOffsetError>

Pause(IEnumerable<TopicPartition>)

Declaration
public List<TopicPartitionError> Pause(IEnumerable<TopicPartition> partitions)
Parameters
Type Name Description
IEnumerable<TopicPartition> partitions
Returns
Type Description
List<TopicPartitionError>

Poll()

Poll for new consumer events, including new messages ready to be consumed (which will trigger the OnMessage event).

Declaration
public void Poll()
Remarks

Blocks indefinitely until a new event is ready.

Poll(Int32)

Poll for new consumer events, including new messages ready to be consumed (which will trigger the OnMessage event). Blocks until a new event is available to be handled or the timeout period millisecondsTimeout has elapsed.

Declaration
public void Poll(int millisecondsTimeout)
Parameters
Type Name Description
System.Int32 millisecondsTimeout

The maximum time to block (in milliseconds), or -1 to block indefinitely. You should typically use a relatively short timout period because this operation cannot be cancelled.

Poll(TimeSpan)

Poll for new consumer events, including new messages ready to be consumed (which will trigger the OnMessage event). Blocks until a new event is available to be handled or the timeout period timeout has elapsed.

Declaration
public void Poll(TimeSpan timeout)
Parameters
Type Name Description
TimeSpan timeout

The maximum time to block. You should typically use a relatively short timout period because this operation cannot be cancelled.

Position(IEnumerable<TopicPartition>)

Retrieve current positions (offsets) for topics + partitions.

The offset field of each requested partition will be set to the offset of the last consumed message + 1, or RD_KAFKA_OFFSET_INVALID in case there was no previous message, or, alternately a partition specific error may also be returned.

throws KafkaException if there was a problem retrieving the above information.

Declaration
public List<TopicPartitionOffsetError> Position(IEnumerable<TopicPartition> partitions)
Parameters
Type Name Description
IEnumerable<TopicPartition> partitions
Returns
Type Description
List<TopicPartitionOffsetError>

QueryWatermarkOffsets(TopicPartition)

Query the Kafka cluster for low (oldest/beginning) and high (newest/end) offsets for the specified topic/partition (blocks, potentially indefinitely).

[UNSTABLE-API] - The API associated with this functionality is subject to change.

Declaration
public WatermarkOffsets QueryWatermarkOffsets(TopicPartition topicPartition)
Parameters
Type Name Description
TopicPartition topicPartition

The topic/partition of interest.

Returns
Type Description
WatermarkOffsets

The requested WatermarkOffsets.

QueryWatermarkOffsets(TopicPartition, TimeSpan)

Query the Kafka cluster for low (oldest/beginning) and high (newest/end) offsets for the specified topic/partition (blocking).

[UNSTABLE-API] - The API associated with this functionality is subject to change.

Declaration
public WatermarkOffsets QueryWatermarkOffsets(TopicPartition topicPartition, TimeSpan timeout)
Parameters
Type Name Description
TopicPartition topicPartition

The topic/partition of interest.

TimeSpan timeout

The maximum period of time the call may block.

Returns
Type Description
WatermarkOffsets

The requested WatermarkOffsets.

Resume(IEnumerable<TopicPartition>)

Declaration
public List<TopicPartitionError> Resume(IEnumerable<TopicPartition> partitions)
Parameters
Type Name Description
IEnumerable<TopicPartition> partitions
Returns
Type Description
List<TopicPartitionError>

Seek(TopicPartitionOffset)

Declaration
public void Seek(TopicPartitionOffset tpo)
Parameters
Type Name Description
TopicPartitionOffset tpo

StoreOffset(Message<TKey, TValue>)

Store offsets for a single partition based on the topic/partition/offset of a message.

The offset will be committed (written) to the offset store according to auto.commit.interval.ms or manual offset-less commit().

Declaration
public TopicPartitionOffsetError StoreOffset(Message<TKey, TValue> message)
Parameters
Type Name Description
Message<TKey, TValue> message

A message used to determine the offset to store and topic/partition.

Returns
Type Description
TopicPartitionOffsetError

Current stored offset or a partition specific error.

Remarks

enable.auto.offset.store must be set to "false" when using this API.

StoreOffsets(IEnumerable<TopicPartitionOffset>)

Declaration
public List<TopicPartitionOffsetError> StoreOffsets(IEnumerable<TopicPartitionOffset> offsets)
Parameters
Type Name Description
IEnumerable<TopicPartitionOffset> offsets
Returns
Type Description
List<TopicPartitionOffsetError>

Subscribe(IEnumerable<String>)

Update the subscription set to topics.

Any previous subscription will be unassigned and unsubscribed first.

The subscription set denotes the desired topics to consume and this set is provided to the partition assignor (one of the elected group members) for all clients which then uses the configured partition.assignment.strategy to assign the subscription sets's topics's partitions to the consumers, depending on their subscription.

Declaration
public void Subscribe(IEnumerable<string> topics)
Parameters
Type Name Description
IEnumerable<System.String> topics

Subscribe(String)

Update the subscription set to a single topic.

Any previous subscription will be unassigned and unsubscribed first.

Declaration
public void Subscribe(string topic)
Parameters
Type Name Description
System.String topic

Unassign()

Stop consumption and remove the current assignment.

Declaration
public void Unassign()

Unsubscribe()

Unsubscribe from the current subscription set.

Declaration
public void Unsubscribe()

Events

OnConsumeError

Raised when a consumed message has an error != NoError (both when Consume or Poll is used for polling). Also raised on deserialization errors.

Declaration
public event EventHandler<Message> OnConsumeError
Event Type
Type Description
EventHandler<Message>
Remarks

Executes on the same thread as every other Consumer event handler (except OnLog which may be called from an arbitrary thread).

OnError

Raised on critical errors, e.g. connection failures or all brokers down. Note that the client will try to automatically recover from errors - these errors should be seen as informational rather than catastrophic

Declaration
public event EventHandler<Error> OnError
Event Type
Type Description
EventHandler<Error>
Remarks

Executes on the same thread as every other Consumer event handler (except OnLog which may be called from an arbitrary thread).

OnLog

Raised when there is information that should be logged.

Declaration
public event EventHandler<LogMessage> OnLog
Event Type
Type Description
EventHandler<LogMessage>
Remarks

Note: By default not many log messages are generated.

You can specify one or more debug contexts using the 'debug' configuration property and a log level using the 'log_level' configuration property to enable more verbose logging, however you shouldn't typically need to do this.

Warning: Log handlers are called spontaneously from internal librdkafka threads and the application must not call any Confluent.Kafka APIs from within a log handler or perform any prolonged operations.

OnMessage

Raised when a new message is avaiable for consumption. NOT raised when Consumer.Consume is used for polling (only when Consmer.Poll is used for polling). NOT raised when the message has an Error (OnConsumeError is raised in that case).

Declaration
public event EventHandler<Message<TKey, TValue>> OnMessage
Event Type
Type Description
EventHandler<Message<TKey, TValue>>
Remarks

Executes on the same thread as every other Consumer event handler (except OnLog which may be called from an arbitrary thread).

OnOffsetsCommitted

Raised to report the result of (automatic) offset commits. Not raised as a result of the use of the CommitAsync method.

Declaration
public event EventHandler<CommittedOffsets> OnOffsetsCommitted
Event Type
Type Description
EventHandler<CommittedOffsets>
Remarks

Executes on the same thread as every other Consumer event handler (except OnLog which may be called from an arbitrary thread).

OnPartitionEOF

Raised when the consumer reaches the end of a topic/partition it is reading from.

Declaration
public event EventHandler<TopicPartitionOffset> OnPartitionEOF
Event Type
Type Description
EventHandler<TopicPartitionOffset>
Remarks

Executes on the same thread as every other Consumer event handler (except OnLog which may be called from an arbitrary thread).

OnPartitionsAssigned

Raised on new partition assignment. You should typically call the Consumer.Assign method in this handler.

Declaration
public event EventHandler<List<TopicPartition>> OnPartitionsAssigned
Event Type
Type Description
EventHandler<List<TopicPartition>>
Remarks

Executes on the same thread as every other Consumer event handler (except OnLog which may be called from an arbitrary thread).

OnPartitionsRevoked

Raised when a partition assignment is revoked. You should typically call the Consumer.Unassign method in this handler.

Declaration
public event EventHandler<List<TopicPartition>> OnPartitionsRevoked
Event Type
Type Description
EventHandler<List<TopicPartition>>
Remarks

Executes on the same thread as every other Consumer event handler (except OnLog which may be called from an arbitrary thread).

OnStatistics

Raised on librdkafka statistics events. JSON formatted string as defined here: https://github.com/edenhill/librdkafka/wiki/Statistics

Declaration
public event EventHandler<string> OnStatistics
Event Type
Type Description
EventHandler<System.String>
Remarks

Executes on the same thread as every other Consumer event handler (except OnLog which may be called from an arbitrary thread).