Class Consumer
Implements a high-level Apache Kafka consumer (without deserialization).
[UNSTABLE-API] We are considering making this class private in a future version so as to limit API surface area. Prefer to use the deserializing consumer Consumer<TKey, TValue> where possible.
Inheritance
Namespace: Confluent.Kafka
Assembly: cs.temp.dll.dll
Syntax
public class Consumer : IDisposable
Constructors
Consumer(IEnumerable<KeyValuePair<String, Object>>)
Create a new consumer with the supplied configuration.
Declaration
public Consumer(IEnumerable<KeyValuePair<string, object>> config)
Parameters
Type | Name | Description |
---|---|---|
IEnumerable<KeyValuePair<System.String, System.Object>> | config |
Remarks
Properties
Assignment
Gets the current partition assignment as set by Assign.
Declaration
public List<TopicPartition> Assignment { get; }
Property Value
Type | Description |
---|---|
List<TopicPartition> |
MemberId
Gets the (dynamic) group member id of this consumer (as set by the broker).
Declaration
public string MemberId { get; }
Property Value
Type | Description |
---|---|
System.String |
Name
Gets the name of this consumer instance. Contains (but is not equal to) the client.id configuration parameter.
Declaration
public string Name { get; }
Property Value
Type | Description |
---|---|
System.String |
Remarks
This name will be unique across all consumer instances in a given application which allows log messages to be associated with the corresponding instance.
Subscription
Gets the current topic subscription as set by Subscribe.
Declaration
public List<string> Subscription { get; }
Property Value
Type | Description |
---|---|
List<System.String> |
Methods
AddBrokers(String)
Adds one or more brokers to the Consumer's list of initial bootstrap brokers.
Note: Additional brokers are discovered automatically as soon as the Consumer connects to any broker by querying the broker metadata. Calling this method is only required in some scenarios where the address of all brokers in the cluster changes.
Declaration
public int AddBrokers(string brokers)
Parameters
Type | Name | Description |
---|---|---|
System.String | brokers | Coma-separated list of brokers in the same format as the bootstrap.server configuration parameter. |
Returns
Type | Description |
---|---|
System.Int32 | The number of brokers added. This value includes brokers that may have been specified a second time. |
Remarks
There is currently no API to remove existing configured, added or learnt brokers.
Assign(TopicPartition)
Update the assignment set to a single partition
.
The assignment set is the complete set of partitions to consume from and will replace any previous assignment.
Declaration
public void Assign(TopicPartition partition)
Parameters
Type | Name | Description |
---|---|---|
TopicPartition | partition | The partition to consume from. Consumption will resume from the last committed offset, or according to the 'auto.offset.reset' configuration parameter if no offsets have been committed yet. |
Assign(TopicPartitionOffset)
Update the assignment set to a single partition
.
The assignment set is the complete set of partitions to consume from and will replace any previous assignment.
Declaration
public void Assign(TopicPartitionOffset partition)
Parameters
Type | Name | Description |
---|---|---|
TopicPartitionOffset | partition | The partition to consume from. If an offset value of Offset.Invalid (-1001) is specified, consumption will resume from the last committed offset, or according to the 'auto.offset.reset' configuration parameter if no offsets have been committed yet. |
Assign(IEnumerable<TopicPartition>)
Update the assignment set to partitions
.
The assignment set is the complete set of partitions to consume from and will replace any previous assignment.
Declaration
public void Assign(IEnumerable<TopicPartition> partitions)
Parameters
Type | Name | Description |
---|---|---|
IEnumerable<TopicPartition> | partitions | The set of partitions to consume from. Consumption will resume from the last committed offset on each partition, or according to the 'auto.offset.reset' configuration parameter if no offsets have been committed yet. |
Assign(IEnumerable<TopicPartitionOffset>)
Update the assignment set to partitions
.
The assignment set is the complete set of partitions to consume from and will replace any previous assignment.
Declaration
public void Assign(IEnumerable<TopicPartitionOffset> partitions)
Parameters
Type | Name | Description |
---|---|---|
IEnumerable<TopicPartitionOffset> | partitions | The set of partitions to consume from. If an offset value of Offset.Invalid (-1001) is specified for a partition, consumption will resume from the last committed offset on that partition, or according to the 'auto.offset.reset' configuration parameter if no offsets have been committed yet. |
CommitAsync()
Commit offsets for the current assignment.
Declaration
public Task<CommittedOffsets> CommitAsync()
Returns
Type | Description |
---|---|
Task<CommittedOffsets> |
CommitAsync(Message)
Commits an offset based on the topic/partition/offset of a message.
The next message to be read will be that following message
.
Declaration
public Task<CommittedOffsets> CommitAsync(Message message)
Parameters
Type | Name | Description |
---|---|---|
Message | message | The message used to determine the committed offset. |
Returns
Type | Description |
---|---|
Task<CommittedOffsets> |
Remarks
A consumer which has position N has consumed records with offsets 0 through N-1 and will next receive the record with offset N.
Hence, this method commits an offset of message
.Offset + 1.
CommitAsync(IEnumerable<TopicPartitionOffset>)
Commit an explicit list of offsets.
Declaration
public Task<CommittedOffsets> CommitAsync(IEnumerable<TopicPartitionOffset> offsets)
Parameters
Type | Name | Description |
---|---|---|
IEnumerable<TopicPartitionOffset> | offsets |
Returns
Type | Description |
---|---|
Task<CommittedOffsets> |
Committed(IEnumerable<TopicPartition>, TimeSpan)
Retrieve current committed offsets for topics + partitions.
The offset field of each requested partition will be set to the offset of the last consumed message, or RD_KAFKA_OFFSET_INVALID in case there was no previous message, or, alternately a partition specific error may also be returned.
throws KafkaException if there was a problem retrieving the above information.
Declaration
public List<TopicPartitionOffsetError> Committed(IEnumerable<TopicPartition> partitions, TimeSpan timeout)
Parameters
Type | Name | Description |
---|---|---|
IEnumerable<TopicPartition> | partitions | |
TimeSpan | timeout |
Returns
Type | Description |
---|---|
List<TopicPartitionOffsetError> |
Consume(out Message, Int32)
Poll for new messages / consumer events. Blocks until a new
message or event is ready to be handled or the timeout period
millisecondsTimeout
has elapsed.
Declaration
public bool Consume(out Message message, int millisecondsTimeout)
Parameters
Type | Name | Description |
---|---|---|
Message | message | A consumed message, or null if no messages are available for consumption. |
System.Int32 | millisecondsTimeout | The maximum time to block (in milliseconds), or -1 to block indefinitely. You should typically use a relatively short timout period because this operation cannot be cancelled. |
Returns
Type | Description |
---|---|
System.Boolean | true: a message (with non-error state) was consumed. false: no message was available for consumption. |
Remarks
Will invoke events for OnPartitionsAssigned/Revoked, OnOffsetsCommitted, OnConsumeError etc. on the calling thread.
Consume(out Message, TimeSpan)
Refer to Consume(out Message, Int32)
Declaration
public bool Consume(out Message message, TimeSpan timeout)
Parameters
Type | Name | Description |
---|---|---|
Message | message | |
TimeSpan | timeout |
Returns
Type | Description |
---|---|
System.Boolean |
Dispose()
Releases all resources used by this Consumer.
This call will block until the consumer has revoked its assignment, calling the rebalance event if it is configured, committed offsets to broker, and left the consumer group.
[UNSTABLE-API] - The Dispose method should not block. We will separate out consumer close functionality from this method.
Declaration
public void Dispose()
GetMetadata(Boolean)
Refer to Confluent.Kafka.Producer.GetMetadata(System.Boolean,System.String,System.Int32) for more information.
[UNSTABLE-API] - The API associated with this functionality is subject to change.
Declaration
public Metadata GetMetadata(bool allTopics)
Parameters
Type | Name | Description |
---|---|---|
System.Boolean | allTopics |
Returns
Type | Description |
---|---|
Metadata |
GetMetadata(Boolean, TimeSpan)
Refer to Confluent.Kafka.Producer.GetMetadata(System.Boolean,System.String,System.Int32) for more information.
[UNSTABLE-API] - The API associated with this functionality is subject to change.
Declaration
public Metadata GetMetadata(bool allTopics, TimeSpan timeout)
Parameters
Type | Name | Description |
---|---|---|
System.Boolean | allTopics | |
TimeSpan | timeout |
Returns
Type | Description |
---|---|
Metadata |
GetWatermarkOffsets(TopicPartition)
Get last known low (oldest/beginning) and high (newest/end) offsets for a topic/partition.
[UNSTABLE-API] - The API associated with this functionality is subject to change.
Declaration
public WatermarkOffsets GetWatermarkOffsets(TopicPartition topicPartition)
Parameters
Type | Name | Description |
---|---|---|
TopicPartition | topicPartition | The topic/partition of interest. |
Returns
Type | Description |
---|---|
WatermarkOffsets | The requested WatermarkOffsets. |
Remarks
The low offset is updated periodically (if statistics.interval.ms is set) while the high offset is updated on each fetched message set from the broker.
If there is no cached offset (either low or high, or both) then Offset.Invalid will be returned for the respective offset.
ListGroup(String)
Get information pertaining to a particular group in the Kafka cluster (blocks, potentially indefinitely).
[UNSTABLE-API] - The API associated with this functionality is subject to change.
Declaration
public GroupInfo ListGroup(string group)
Parameters
Type | Name | Description |
---|---|---|
System.String | group | The group of interest. |
Returns
Type | Description |
---|---|
GroupInfo | Returns information pertaining to the specified group or null if this group does not exist. |
ListGroup(String, TimeSpan)
Get information pertaining to a particular group in the Kafka cluster (blocking).
[UNSTABLE-API] - The API associated with this functionality is subject to change.
Declaration
public GroupInfo ListGroup(string group, TimeSpan timeout)
Parameters
Type | Name | Description |
---|---|---|
System.String | group | The group of interest. |
TimeSpan | timeout | The maximum period of time the call may block. |
Returns
Type | Description |
---|---|
GroupInfo | Returns information pertaining to the specified group or null if this group does not exist. |
ListGroups(TimeSpan)
Get information pertaining to all groups in the Kafka cluster (blocking).
[UNSTABLE-API] - The API associated with this functionality is subject to change.
Declaration
public List<GroupInfo> ListGroups(TimeSpan timeout)
Parameters
Type | Name | Description |
---|---|---|
TimeSpan | timeout | The maximum period of time the call may block. |
Returns
Type | Description |
---|---|
List<GroupInfo> |
OffsetsForTimes(IEnumerable<TopicPartitionTimestamp>, TimeSpan)
Declaration
public IEnumerable<TopicPartitionOffsetError> OffsetsForTimes(IEnumerable<TopicPartitionTimestamp> timestampsToSearch, TimeSpan timeout)
Parameters
Type | Name | Description |
---|---|---|
IEnumerable<TopicPartitionTimestamp> | timestampsToSearch | |
TimeSpan | timeout |
Returns
Type | Description |
---|---|
IEnumerable<TopicPartitionOffsetError> |
Pause(IEnumerable<TopicPartition>)
Declaration
public List<TopicPartitionError> Pause(IEnumerable<TopicPartition> partitions)
Parameters
Type | Name | Description |
---|---|---|
IEnumerable<TopicPartition> | partitions |
Returns
Type | Description |
---|---|
List<TopicPartitionError> |
Poll()
Poll for new consumer events, including new messages ready to be consumed (which will trigger the OnMessage event).
Declaration
public void Poll()
Remarks
Blocks indefinitely until a new event is ready.
Poll(Int32)
Poll for new consumer events, including new messages
ready to be consumed (which will trigger the OnMessage
event). Blocks until a new event is available to be
handled or the timeout period millisecondsTimeout
has elapsed.
Declaration
public void Poll(int millisecondsTimeout)
Parameters
Type | Name | Description |
---|---|---|
System.Int32 | millisecondsTimeout | The maximum time to block (in milliseconds), or -1 to block indefinitely. You should typically use a relatively short timout period because this operation cannot be cancelled. |
Poll(TimeSpan)
Poll for new consumer events, including new messages ready to be consumed (which will trigger the OnMessage event).
Declaration
public void Poll(TimeSpan timeout)
Parameters
Type | Name | Description |
---|---|---|
TimeSpan | timeout | The maximum time to block. You should typically use a relatively short timout period because this operation cannot be cancelled. |
Position(IEnumerable<TopicPartition>)
Retrieve current positions (offsets) for topics + partitions.
The offset field of each requested partition will be set to the offset of the last consumed message + 1, or RD_KAFKA_OFFSET_INVALID in case there was no previous message, or, alternately a partition specific error may also be returned.
throws KafkaException if there was a problem retrieving the above information.
Declaration
public List<TopicPartitionOffsetError> Position(IEnumerable<TopicPartition> partitions)
Parameters
Type | Name | Description |
---|---|---|
IEnumerable<TopicPartition> | partitions |
Returns
Type | Description |
---|---|
List<TopicPartitionOffsetError> |
QueryWatermarkOffsets(TopicPartition)
Query the Kafka cluster for low (oldest/beginning) and high (newest/end) offsets for the specified topic/partition (blocks, potentially indefinitely).
[UNSTABLE-API] - The API associated with this functionality is subject to change.
Declaration
public WatermarkOffsets QueryWatermarkOffsets(TopicPartition topicPartition)
Parameters
Type | Name | Description |
---|---|---|
TopicPartition | topicPartition | The topic/partition of interest. |
Returns
Type | Description |
---|---|
WatermarkOffsets | The requested WatermarkOffsets. |
QueryWatermarkOffsets(TopicPartition, TimeSpan)
Query the Kafka cluster for low (oldest/beginning) and high (newest/end) offsets for the specified topic/partition (blocking)
[UNSTABLE-API] - The API associated with this functionality is subject to change.
Declaration
public WatermarkOffsets QueryWatermarkOffsets(TopicPartition topicPartition, TimeSpan timeout)
Parameters
Type | Name | Description |
---|---|---|
TopicPartition | topicPartition | The topic/partition of interest. |
TimeSpan | timeout | The maximum period of time the call may block. |
Returns
Type | Description |
---|---|
WatermarkOffsets | The requested WatermarkOffsets. |
Resume(IEnumerable<TopicPartition>)
Declaration
public List<TopicPartitionError> Resume(IEnumerable<TopicPartition> partitions)
Parameters
Type | Name | Description |
---|---|---|
IEnumerable<TopicPartition> | partitions |
Returns
Type | Description |
---|---|
List<TopicPartitionError> |
Seek(TopicPartitionOffset)
Declaration
public void Seek(TopicPartitionOffset tpo)
Parameters
Type | Name | Description |
---|---|---|
TopicPartitionOffset | tpo |
StoreOffsets(IEnumerable<TopicPartitionOffset>)
Declaration
public List<TopicPartitionOffsetError> StoreOffsets(IEnumerable<TopicPartitionOffset> offsets)
Parameters
Type | Name | Description |
---|---|---|
IEnumerable<TopicPartitionOffset> | offsets |
Returns
Type | Description |
---|---|
List<TopicPartitionOffsetError> |
Subscribe(IEnumerable<String>)
Update the subscription set to topics.
Any previous subscription will be unassigned and unsubscribed first.
The subscription set denotes the desired topics to consume and this set is provided to the partition assignor (one of the elected group members) for all clients which then uses the configured partition.assignment.strategy to assign the subscription sets's topics's partitions to the consumers, depending on their subscription.
Declaration
public void Subscribe(IEnumerable<string> topics)
Parameters
Type | Name | Description |
---|---|---|
IEnumerable<System.String> | topics |
Subscribe(String)
Update the subscription set to a single topic.
Any previous subscription will be unassigned and unsubscribed first.
Declaration
public void Subscribe(string topic)
Parameters
Type | Name | Description |
---|---|---|
System.String | topic |
Unassign()
Stop consumption and remove the current topic/partition assignment.
Declaration
public void Unassign()
Unsubscribe()
Unsubscribe from the current subscription set.
Declaration
public void Unsubscribe()
Events
OnConsumeError
Raised when a consumed message has an error != NoError (both when Consume or Poll is used for polling).
Declaration
public event EventHandler<Message> OnConsumeError
Event Type
Type | Description |
---|---|
EventHandler<Message> |
Remarks
Executes on the same thread as every other Consumer event handler (except OnLog which may be called from an arbitrary thread).
OnError
Raised on critical errors, e.g. connection failures or all brokers down. Note that the client will try to automatically recover from errors - these errors should be seen as informational rather than catastrophic
Declaration
public event EventHandler<Error> OnError
Event Type
Type | Description |
---|---|
EventHandler<Error> |
Remarks
Executes on the same thread as every other Consumer event handler (except OnLog which may be called from an arbitrary thread).
OnLog
Raised when there is information that should be logged.
Declaration
public event EventHandler<LogMessage> OnLog
Event Type
Type | Description |
---|---|
EventHandler<LogMessage> |
Remarks
Note: By default not many log messages are generated.
You can specify one or more debug contexts using the 'debug' configuration property and a log level using the 'log_level' configuration property to enable more verbose logging, however you shouldn't typically need to do this.
Warning: Log handlers are called spontaneously from internal librdkafka threads and the application must not call any Confluent.Kafka APIs from within a log handler or perform any prolonged operations.
OnMessage
Raised when a new message is avaiable for consumption. NOT raised when Consumer.Consume is used for polling (only when Consmer.Poll is used for polling). NOT raised when the message has an Error (OnConsumeError is raised in that case).
Declaration
public event EventHandler<Message> OnMessage
Event Type
Type | Description |
---|---|
EventHandler<Message> |
Remarks
Executes on the same thread as every other Consumer event handler (except OnLog which may be called from an arbitrary thread).
OnOffsetsCommitted
Raised to report the result of (automatic) offset commits. Not raised as a result of the use of the CommitAsync method.
Declaration
public event EventHandler<CommittedOffsets> OnOffsetsCommitted
Event Type
Type | Description |
---|---|
EventHandler<CommittedOffsets> |
Remarks
Executes on the same thread as every other Consumer event handler (except OnLog which may be called from an arbitrary thread).
OnPartitionEOF
Raised when the consumer reaches the end of a topic/partition it is reading from.
Declaration
public event EventHandler<TopicPartitionOffset> OnPartitionEOF
Event Type
Type | Description |
---|---|
EventHandler<TopicPartitionOffset> |
Remarks
Executes on the same thread as every other Consumer event handler (except OnLog which may be called from an arbitrary thread).
OnPartitionsAssigned
Raised on new partition assignment. You should typically call the Consumer.Assign method in this handler.
Declaration
public event EventHandler<List<TopicPartition>> OnPartitionsAssigned
Event Type
Type | Description |
---|---|
EventHandler<List<TopicPartition>> |
Remarks
Executes on the same thread as every other Consumer event handler (except OnLog which may be called from an arbitrary thread).
OnPartitionsRevoked
Raised when a partition assignment is revoked. You should typically call the Consumer.Unassign method in this handler.
Declaration
public event EventHandler<List<TopicPartition>> OnPartitionsRevoked
Event Type
Type | Description |
---|---|
EventHandler<List<TopicPartition>> |
Remarks
Executes on the same thread as every other Consumer event handler (except OnLog which may be called from an arbitrary thread).
OnStatistics
Raised on librdkafka statistics events. JSON formatted string as defined here: https://github.com/edenhill/librdkafka/wiki/Statistics
Declaration
public event EventHandler<string> OnStatistics
Event Type
Type | Description |
---|---|
EventHandler<System.String> |
Remarks
Executes on the same thread as every other Consumer event handler (except OnLog which may be called from an arbitrary thread).