Interface IConsumer<TKey, TValue>
Defines a high-level Apache Kafka consumer (with key and value deserialization).
Inherited Members
Namespace: Confluent.Kafka
Assembly: Confluent.Kafka.dll
Syntax
public interface IConsumer<TKey, TValue> : IClient, IDisposable
Type Parameters
Name | Description |
---|---|
TKey | |
TValue |
Properties
Assignment
Gets the current partition assignment as set by Assign(TopicPartition) or implicitly.
Declaration
List<TopicPartition> Assignment { get; }
Property Value
Type | Description |
---|---|
List<TopicPartition> |
ConsumerGroupMetadata
The current consumer group metadata associated with this consumer, or null if a GroupId has not been specified for the consumer. This metadata object should be passed to the transactional producer's SendOffsetsToTransaction(IEnumerable<TopicPartitionOffset>, IConsumerGroupMetadata, TimeSpan) method.
Declaration
IConsumerGroupMetadata ConsumerGroupMetadata { get; }
Property Value
Type | Description |
---|---|
IConsumerGroupMetadata |
MemberId
Gets the (dynamic) group member id of this consumer (as set by the broker).
Declaration
string MemberId { get; }
Property Value
Type | Description |
---|---|
string |
Subscription
Gets the current topic subscription as set by Subscribe(string).
Declaration
List<string> Subscription { get; }
Property Value
Type | Description |
---|---|
List<string> |
Methods
Assign(TopicPartition)
Sets the current set of assigned partitions
(the set of partitions the consumer will consume
from) to a single partition
.
Note: The newly specified set is the complete set of partitions to consume from. If the consumer is already assigned to a set of partitions, the previous set will be replaced.
Declaration
void Assign(TopicPartition partition)
Parameters
Type | Name | Description |
---|---|---|
TopicPartition | partition | The partition to consume from. Consumption will resume from the last committed offset, or according to the 'auto.offset.reset' configuration parameter if no offsets have been committed yet. |
Assign(TopicPartitionOffset)
Sets the current set of assigned partitions
(the set of partitions the consumer will consume
from) to a single partition
.
Note: The newly specified set is the complete set of partitions to consume from. If the consumer is already assigned to a set of partitions, the previous set will be replaced.
Declaration
void Assign(TopicPartitionOffset partition)
Parameters
Type | Name | Description |
---|---|---|
TopicPartitionOffset | partition | The partition to consume from. If an offset value of Offset.Unset (-1001) is specified, consumption will resume from the last committed offset, or according to the 'auto.offset.reset' configuration parameter if no offsets have been committed yet. |
Assign(IEnumerable<TopicPartitionOffset>)
Sets the current set of assigned partitions
(the set of partitions the consumer will consume
from) to partitions
.
Note: The newly specified set is the complete set of partitions to consume from. If the consumer is already assigned to a set of partitions, the previous set will be replaced.
Declaration
void Assign(IEnumerable<TopicPartitionOffset> partitions)
Parameters
Type | Name | Description |
---|---|---|
IEnumerable<TopicPartitionOffset> | partitions | The set of partitions to consume from. If an offset value of Offset.Unset (-1001) is specified for a partition, consumption will resume from the last committed offset on that partition, or according to the 'auto.offset.reset' configuration parameter if no offsets have been committed yet. |
Assign(IEnumerable<TopicPartition>)
Sets the current set of assigned partitions
(the set of partitions the consumer will consume
from) to partitions
.
Note: The newly specified set is the complete set of partitions to consume from. If the consumer is already assigned to a set of partitions, the previous set will be replaced.
Declaration
void Assign(IEnumerable<TopicPartition> partitions)
Parameters
Type | Name | Description |
---|---|---|
IEnumerable<TopicPartition> | partitions | The set of partitions to consume from. Consumption will resume from the last committed offset on each partition, or according to the 'auto.offset.reset' configuration parameter if no offsets have been committed yet. |
Close()
Commits offsets (if auto commit is enabled),
alerts the group coordinator
that the consumer is exiting the group then
releases all resources used by this consumer.
You should call Confluent.Kafka.Consumer<TKey, TValue>.Close()
instead of Confluent.Kafka.Consumer<TKey, TValue>.Dispose()
(or just before) to ensure a timely consumer
group rebalance. If you do not call
Confluent.Kafka.Consumer<TKey, TValue>.Close()
or Confluent.Kafka.Consumer<TKey, TValue>.Unsubscribe(),
the group will rebalance after a timeout
specified by the group's session.timeout.ms
.
Note: the partition assignment and partitions
revoked handlers may be called as a side-effect
of calling this method.
Declaration
void Close()
Exceptions
Type | Condition |
---|---|
KafkaException | Thrown if the operation fails. |
Commit()
Commit all offsets for the current assignment.
Declaration
List<TopicPartitionOffset> Commit()
Returns
Type | Description |
---|---|
List<TopicPartitionOffset> |
Exceptions
Type | Condition |
---|---|
KafkaException | Thrown if the request failed. |
TopicPartitionOffsetException | Thrown if any of the constituent results is in error. The entire result (which may contain constituent results that are not in error) is available via the Results property of the exception. |
Commit(ConsumeResult<TKey, TValue>)
Commits an offset based on the topic/partition/offset of a ConsumeResult.
Declaration
void Commit(ConsumeResult<TKey, TValue> result)
Parameters
Type | Name | Description |
---|---|---|
ConsumeResult<TKey, TValue> | result | The ConsumeResult instance used to determine the committed offset. |
Remarks
A consumer at position N has consumed
messages with offsets up to N-1 and will
next receive the message with offset N.
Hence, this method commits an offset of
result
.Offset + 1.
Exceptions
Type | Condition |
---|---|
KafkaException | Thrown if the request failed. |
TopicPartitionOffsetException | Thrown if the result is in error. |
Commit(IEnumerable<TopicPartitionOffset>)
Commit an explicit list of offsets.
Declaration
void Commit(IEnumerable<TopicPartitionOffset> offsets)
Parameters
Type | Name | Description |
---|---|---|
IEnumerable<TopicPartitionOffset> | offsets | The topic/partition offsets to commit. |
Exceptions
Type | Condition |
---|---|
KafkaException | Thrown if the request failed. |
TopicPartitionOffsetException | Thrown if any of the constituent results is in error. The entire result (which may contain constituent results that are not in error) is available via the Results property of the exception. |
Committed(IEnumerable<TopicPartition>, TimeSpan)
Retrieve current committed offsets for the specified topic partitions.
The offset field of each requested partition will be set to the offset of the last consumed message, or Offset.Unset in case there was no previous message, or, alternately a partition specific error may also be returned.
Declaration
List<TopicPartitionOffset> Committed(IEnumerable<TopicPartition> partitions, TimeSpan timeout)
Parameters
Type | Name | Description |
---|---|---|
IEnumerable<TopicPartition> | partitions | the partitions to get the committed offsets for. |
TimeSpan | timeout | The maximum period of time the call may block. |
Returns
Type | Description |
---|---|
List<TopicPartitionOffset> |
Exceptions
Type | Condition |
---|---|
KafkaException | Thrown if the request failed. |
TopicPartitionOffsetException | Thrown if any of the constituent results is in error. The entire result (which may contain constituent results that are not in error) is available via the Results property of the exception. |
Committed(TimeSpan)
Retrieve current committed offsets for the current assignment.
The offset field of each requested partition will be set to the offset of the last consumed message, or Offset.Unset in case there was no previous message, or, alternately a partition specific error may also be returned.
Declaration
List<TopicPartitionOffset> Committed(TimeSpan timeout)
Parameters
Type | Name | Description |
---|---|---|
TimeSpan | timeout | The maximum period of time the call may block. |
Returns
Type | Description |
---|---|
List<TopicPartitionOffset> |
Exceptions
Type | Condition |
---|---|
KafkaException | Thrown if the request failed. |
TopicPartitionOffsetException | Thrown if any of the constituent results is in error. The entire result (which may contain constituent results that are not in error) is available via the Results property of the exception. |
Consume(int)
Poll for new messages / events. Blocks until a consume result is available or the timeout period has elapsed.
Declaration
ConsumeResult<TKey, TValue> Consume(int millisecondsTimeout)
Parameters
Type | Name | Description |
---|---|---|
int | millisecondsTimeout | The maximum period of time (in milliseconds) the call may block. |
Returns
Type | Description |
---|---|
ConsumeResult<TKey, TValue> | The consume result. |
Remarks
The partitions assigned/revoked and offsets committed handlers may be invoked as a side-effect of calling this method (on the same thread).
Exceptions
Type | Condition |
---|---|
ConsumeException | Thrown when a call to this method is unsuccessful for any reason. Inspect the Error property of the exception for detailed information. |
Consume(CancellationToken)
Poll for new messages / events. Blocks until a consume result is available or the operation has been cancelled.
Declaration
ConsumeResult<TKey, TValue> Consume(CancellationToken cancellationToken = default)
Parameters
Type | Name | Description |
---|---|---|
CancellationToken | cancellationToken | A cancellation token that can be used to cancel this operation. |
Returns
Type | Description |
---|---|
ConsumeResult<TKey, TValue> | The consume result. |
Remarks
The partitions assigned/revoked and offsets committed handlers may be invoked as a side-effect of calling this method (on the same thread).
Exceptions
Type | Condition |
---|---|
ConsumeException | Thrown when a call to this method is unsuccessful for any reason (except cancellation by user). Inspect the Error property of the exception for detailed information. |
OperationCanceledException | Thrown on cancellation. |
Consume(TimeSpan)
Poll for new messages / events. Blocks until a consume result is available or the timeout period has elapsed.
Declaration
ConsumeResult<TKey, TValue> Consume(TimeSpan timeout)
Parameters
Type | Name | Description |
---|---|---|
TimeSpan | timeout | The maximum period of time the call may block. |
Returns
Type | Description |
---|---|
ConsumeResult<TKey, TValue> | The consume result. |
Remarks
The partitions assigned/revoked and offsets committed handlers may be invoked as a side-effect of calling this method (on the same thread).
Exceptions
Type | Condition |
---|---|
ConsumeException | Thrown when a call to this method is unsuccessful for any reason. Inspect the Error property of the exception for detailed information. |
GetWatermarkOffsets(TopicPartition)
Get the last cached low (oldest available / beginning) and high (newest/end) offsets for a topic/partition. Does not block.
Declaration
WatermarkOffsets GetWatermarkOffsets(TopicPartition topicPartition)
Parameters
Type | Name | Description |
---|---|---|
TopicPartition | topicPartition | The topic partition of interest. |
Returns
Type | Description |
---|---|
WatermarkOffsets | The requested WatermarkOffsets (see that class for additional documentation). |
Remarks
The low offset is updated periodically (if statistics.interval.ms is set) while the high offset is updated on each fetched message set from the broker. If there is no cached offset (either low or high, or both) then Offset.Unset will be returned for the respective offset.
IncrementalAssign(IEnumerable<TopicPartitionOffset>)
Incrementally add partitions
to the current assignment, starting consumption
from the specified offsets.
Declaration
void IncrementalAssign(IEnumerable<TopicPartitionOffset> partitions)
Parameters
Type | Name | Description |
---|---|---|
IEnumerable<TopicPartitionOffset> | partitions | The set of additional partitions to consume from. If an offset value of Offset.Unset (-1001) is specified for a partition, consumption will resume from the last committed offset on that partition, or according to the 'auto.offset.reset' configuration parameter if no offsets have been committed yet. |
IncrementalAssign(IEnumerable<TopicPartition>)
Incrementally add partitions
to the current assignment.
Declaration
void IncrementalAssign(IEnumerable<TopicPartition> partitions)
Parameters
Type | Name | Description |
---|---|---|
IEnumerable<TopicPartition> | partitions | The set of additional partitions to consume from. Consumption will resume from the last committed offset on each partition, or according to the 'auto.offset.reset' configuration parameter if no offsets have been committed yet. |
IncrementalUnassign(IEnumerable<TopicPartition>)
Incrementally remove partitions
to the current assignment.
Declaration
void IncrementalUnassign(IEnumerable<TopicPartition> partitions)
Parameters
Type | Name | Description |
---|---|---|
IEnumerable<TopicPartition> | partitions | The set of partitions to remove from the current assignment. |
OffsetsForTimes(IEnumerable<TopicPartitionTimestamp>, TimeSpan)
Look up the offsets for the given partitions by timestamp. The returned offset for each partition is the earliest offset for which the timestamp is greater than or equal to the given timestamp. If the provided timestamp exceeds that of the last message in the partition, a value of Offset.End (-1) will be returned.
Declaration
List<TopicPartitionOffset> OffsetsForTimes(IEnumerable<TopicPartitionTimestamp> timestampsToSearch, TimeSpan timeout)
Parameters
Type | Name | Description |
---|---|---|
IEnumerable<TopicPartitionTimestamp> | timestampsToSearch | The mapping from partition to the timestamp to look up. |
TimeSpan | timeout | The maximum period of time the call may block. |
Returns
Type | Description |
---|---|
List<TopicPartitionOffset> | A mapping from partition to the timestamp and offset of the first message with timestamp greater than or equal to the target timestamp. |
Remarks
The consumer does not need to be assigned to the requested partitions.
Exceptions
Type | Condition |
---|---|
KafkaException | Thrown if the operation fails. |
TopicPartitionOffsetException | Thrown if any of the constituent results is in error. The entire result (which may contain constituent results that are not in error) is available via the Results property of the exception. |
Pause(IEnumerable<TopicPartition>)
Pause consumption for the provided list of partitions.
Declaration
void Pause(IEnumerable<TopicPartition> partitions)
Parameters
Type | Name | Description |
---|---|---|
IEnumerable<TopicPartition> | partitions | The partitions to pause consumption of. |
Exceptions
Type | Condition |
---|---|
KafkaException | Thrown if the request failed. |
TopicPartitionException | Per partition success or error. |
Position(TopicPartition)
Gets the current position (offset) for the specified topic / partition.
The offset field of each requested partition will be set to the offset of the last consumed message + 1, or Offset.Unset in case there was no previous message consumed by this consumer.
Declaration
Offset Position(TopicPartition partition)
Parameters
Type | Name | Description |
---|---|---|
TopicPartition | partition |
Returns
Type | Description |
---|---|
Offset |
Exceptions
Type | Condition |
---|---|
KafkaException | Thrown if the request failed. |
QueryWatermarkOffsets(TopicPartition, TimeSpan)
Query the Kafka cluster for low (oldest available/beginning) and high (newest/end) offsets for the specified topic/partition. This is a blocking call - always contacts the cluster for the required information.
Declaration
WatermarkOffsets QueryWatermarkOffsets(TopicPartition topicPartition, TimeSpan timeout)
Parameters
Type | Name | Description |
---|---|---|
TopicPartition | topicPartition | The topic/partition of interest. |
TimeSpan | timeout | The maximum period of time the call may block. |
Returns
Type | Description |
---|---|
WatermarkOffsets | The requested WatermarkOffsets (see that class for additional documentation). |
Resume(IEnumerable<TopicPartition>)
Resume consumption for the provided list of partitions.
Declaration
void Resume(IEnumerable<TopicPartition> partitions)
Parameters
Type | Name | Description |
---|---|---|
IEnumerable<TopicPartition> | partitions | The partitions to resume consumption of. |
Exceptions
Type | Condition |
---|---|
KafkaException | Thrown if the request failed. |
TopicPartitionException | Per partition success or error. |
Seek(TopicPartitionOffset)
Seek to
Declaration
void Seek(TopicPartitionOffset tpo)
Parameters
Type | Name | Description |
---|---|---|
TopicPartitionOffset | tpo | The topic partition to seek on and the offset to seek to. |
Exceptions
Type | Condition |
---|---|
KafkaException | Thrown if the request failed. |
StoreOffset(ConsumeResult<TKey, TValue>)
Store offsets for a single partition based on the topic/partition/offset of a consume result.
The offset will be committed according to
auto.commit.interval.ms
(and
enable.auto.commit
) or manual offset-less
commit().
Declaration
void StoreOffset(ConsumeResult<TKey, TValue> result)
Parameters
Type | Name | Description |
---|---|---|
ConsumeResult<TKey, TValue> | result | A consume result used to determine the offset to store and topic/partition. |
Remarks
enable.auto.offset.store
must be set to
"false" when using this API.
Exceptions
Type | Condition |
---|---|
KafkaException | Thrown if the request failed. |
TopicPartitionOffsetException | Thrown if result is in error. |
StoreOffset(TopicPartitionOffset)
Store offsets for a single partition.
The offset will be committed (written) to the
offset store according to auto.commit.interval.ms
or manual offset-less commit(). Calling
this method in itself does not commit offsets,
only store them for future commit.
Declaration
void StoreOffset(TopicPartitionOffset offset)
Parameters
Type | Name | Description |
---|---|---|
TopicPartitionOffset | offset | The offset to be committed. |
Remarks
enable.auto.offset.store
must be set to
"false" when using this API.
Exceptions
Type | Condition |
---|---|
KafkaException | Thrown if the request failed. |
Subscribe(IEnumerable<string>)
Update the topic subscription.
Any previous subscription will be unassigned and unsubscribed first.
Declaration
void Subscribe(IEnumerable<string> topics)
Parameters
Type | Name | Description |
---|---|---|
IEnumerable<string> | topics | The topics to subscribe to. A regex can be specified to subscribe to the set of all matching topics (which is updated as topics are added / removed from the cluster). A regex must be front anchored to be recognized as a regex. e.g. ^myregex |
Remarks
The topic subscription set denotes the desired set of topics to consume from. This set is provided to the consumer group leader (one of the group members) which uses the configured partition.assignment.strategy to allocate partitions of topics in the subscription set to the consumers in the group.
Subscribe(string)
Sets the subscription set to a single topic.
Any previous subscription will be unassigned and unsubscribed first.
Declaration
void Subscribe(string topic)
Parameters
Type | Name | Description |
---|---|---|
string | topic | The topic to subscribe to. A regex can be specified to subscribe to the set of all matching topics (which is updated as topics are added / removed from the cluster). A regex must be front anchored to be recognized as a regex. e.g. ^myregex |
Unassign()
Remove the current set of assigned partitions and stop consumption.
Declaration
void Unassign()
Unsubscribe()
Unsubscribe from the current subscription set.
Declaration
void Unsubscribe()