@InterfaceStability.Evolving public class KafkaShareConsumer<K,V> extends Object implements ShareConsumer<K,V>
This is an early access feature under development which is introduced by KIP-932. It is not suitable for production use until it is fully implemented and released.
UnsupportedVersionException
when invoking an API that is not
available on the running broker version.
group.id
will be part of
the same share group.
Each consumer in a group can dynamically set the list of topics it wants to subscribe to using the
subscribe(Collection)
method. Kafka will deliver each message in the subscribed topics to one
consumer in the share group. Unlike consumer groups, share groups balance the partitions between all
members of the share group permitting multiple consumers to consume from the same partitions. This gives
more flexible sharing of records than a consumer group, at the expense of record ordering.
Membership in a share group is maintained dynamically: if a consumer fails, the partitions assigned to it will be reassigned to other consumers in the same group. Similarly, if a new consumer joins the group, the partition assignment is re-evaluated and partitions can be moved from existing consumers to the new one. This is known as rebalancing the group and is discussed in more detail below. Group rebalancing is also used when new partitions are added to one of the subscribed topics. The group will automatically detect the new partitions through periodic metadata refreshes and assign them to the members of the group.
Conceptually, you can think of a share group as a single logical subscriber made up of multiple consumers. In fact, in other messaging systems, a share group is roughly equivalent to a durable shared subscription. You can have multiple share groups and consumer groups independently consuming from the same topics.
poll(Duration)
is
invoked. This method is designed to ensure consumer liveness. As long as you continue to call poll, the consumer
will stay in the group and continue to receive records from the partitions it was assigned. Under the covers,
the consumer sends periodic heartbeats to the broker. If the consumer crashes or is unable to send heartbeats for
the duration of the share group's session time-out, then the consumer will be considered dead and its partitions
will be reassigned.
It is also possible that the consumer could encounter a "livelock" situation where it is continuing to send heartbeats
in the background, but no progress is being made. To prevent the consumer from holding onto its partitions
indefinitely in this case, we provide a liveness detection mechanism using the max.poll.interval.ms
setting.
If you don't call poll at least as frequently as this, the client will proactively leave the share group.
So to stay in the group, you must continue to call poll.
poll(Duration)
, it receives available records from any
of the topic-partitions that match its subscriptions. Records are acquired for delivery to this consumer with a
time-limited acquisition lock. While a record is acquired, it is not available for another consumer. By default,
the lock duration is 30 seconds, but it can also be controlled using the group group.share.record.lock.duration.ms
configuration parameter. The idea is that the lock is automatically released once the lock duration has elapsed, and
then the record is available to be given to another consumer. The consumer which holds the lock can deal with it in
the following ways:
group.share.record.lock.partition.limit
. By limiting the duration of the acquisition lock and automatically
releasing the locks, the broker ensures delivery progresses even in the presence of consumer failures.
The consumer can choose to use implicit or explicit acknowledgement of the records it processes.
If the application calls acknowledge(ConsumerRecord, AcknowledgeType)
for any record in the batch,
it is using explicit acknowledgement. In this case:
commitSync()
or commitAsync()
which commits the acknowledgements to Kafka.
If any records in the batch were not acknowledged, they remain acquired and will be presented to the application
in response to a future poll.poll(Duration)
without committing first, which commits the acknowledgements to
Kafka asynchronously. In this case, no exception is thrown by a failure to commit the acknowledgement.
If any records in the batch were not acknowledged, they remain acquired and will be presented to the application
in response to a future poll.close()
which attempts to commit any pending acknowledgements and
releases any remaining acquired records.acknowledge(ConsumerRecord, AcknowledgeType)
for any record in the batch,
it is using implicit acknowledgement. In this case:
commitSync()
or commitAsync()
which implicitly acknowledges all of
the delivered records as processed successfully and commits the acknowledgements to Kafka.poll(Duration)
without committing, which also implicitly acknowledges all of
the delivered records and commits the acknowledgements to Kafka asynchronously. In this case, no exception is
thrown by a failure to commit the acknowledgements.close()
which releases any acquired records without acknowledgement.
The consumer guarantees that the records returned in the ConsumerRecords
object for a specific topic-partition
are in order of increasing offset. For each topic-partition, Kafka guarantees that acknowledgements for the records
in a batch are performed atomically. This makes error handling significantly more straightforward because there can be
one error code per partition.
poll(Duration)
to acknowledge the records which
were delivered in the previous poll. All the records delivered are implicitly marked as successfully consumed and
acknowledged synchronously with Kafka as the consumer fetches more records.
Properties props = new Properties(); props.setProperty("bootstrap.servers", "localhost:9092"); props.setProperty("group.id", "test"); props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaShareConsumer<String, String> consumer = new KafkaShareConsumer<>(props); consumer.subscribe(Arrays.asList("foo")); while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String, String> record : records) { System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); doProcessing(record); } }Alternatively, you can use
commitSync()
or commitAsync()
to commit the acknowledgements, but this is
slightly less efficient because there is an additional request sent to Kafka.
Properties props = new Properties(); props.setProperty("bootstrap.servers", "localhost:9092"); props.setProperty("group.id", "test"); props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaShareConsumer<String, String> consumer = new KafkaShareConsumer<>(props); consumer.subscribe(Arrays.asList("foo")); while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String, String> record : records) { System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); doProcessing(record); } consumer.commitSync(); }
Properties props = new Properties(); props.setProperty("bootstrap.servers", "localhost:9092"); props.setProperty("group.id", "test"); props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaShareConsumer<String, String> consumer = new KafkaShareConsumer<>(props); consumer.subscribe(Arrays.asList("foo")); while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String, String> record : records) { try { doProcessing(record); consumer.acknowledge(record, AcknowledgeType.ACCEPT); } catch (Exception e) { consumer.acknowledge(record, AcknowledgeType.REJECT); } } consumer.commitSync(); }Each record processed is separately acknowledged using a call to
acknowledge(ConsumerRecord, AcknowledgeType)
.
The AcknowledgeType
argument indicates whether the record was processed successfully or not. In this case,
the bad records are rejected meaning that they’re not eligible for further delivery attempts. For a permanent error
such as a semantic error, this is appropriate. For a transient error which might not affect a subsequent processing
attempt, AcknowledgeType.RELEASE
is more appropriate because the record remains eligible for further delivery attempts.
The calls to acknowledge(ConsumerRecord, AcknowledgeType)
are simply updating local information in the consumer.
It is only once commitSync()
is called that the acknowledgements are committed by sending the new state
information to Kafka.
Properties props = new Properties(); props.setProperty("bootstrap.servers", "localhost:9092"); props.setProperty("group.id", "test"); props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaShareConsumer<String, String> consumer = new KafkaShareConsumer<>(props); consumer.subscribe(Arrays.asList("foo")); while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String, String> record : records) { try { doProcessing(record); consumer.acknowledge(record, AcknowledgeType.ACCEPT); } catch (Exception e) { consumer.acknowledge(record, AcknowledgeType.REJECT); break; } } consumer.commitSync(); }There are the following cases in this example:
commitSync()
just does nothing because the batch was empty.acknowledge(ConsumerRecord, AcknowledgeType)
specifying AcknowledgeType.ACCEPT
mark all records in the batch as successfully processed.acknowledge(ConsumerRecord, AcknowledgeType)
specifying
AcknowledgeType.REJECT
rejects that record. Earlier records in the batch have already been marked as successfully
processed. The call to commitSync()
commits the acknowledgements, but the records after the failed record
remain acquired as part of the same delivery attempt and will be presented to the application in response to another poll.group.share.isolation.level
configuration property. In a share group, the isolation level applies to the entire share group, not just individual
consumers.
In read_uncommitted
isolation level, the share group consumes all non-transactional and transactional
records. The consumption is bounded by the high-water mark.
In read_committed
isolation level (not yet supported), the share group only consumes non-transactional
records and committed transactional records. The set of records which are eligible to become in-flight records are
non-transactional records and committed transactional records only. The consumption is bounded by the last stable
offset, so an open transaction blocks the progress of the share group with read_committed isolation level.
ConcurrentModificationException
.
The only exception to this rule is wakeup()
which can safely be used from an external thread to
interrupt an active operation. In this case, a WakeupException
will be
thrown from the thread blocking on the operation. This can be used to shut down the consumer from another thread.
The following snippet shows the typical pattern:
public class KafkaShareConsumerRunner implements Runnable { private final AtomicBoolean closed = new AtomicBoolean(false); private final KafkaShareConsumer consumer; public KafkaShareConsumerRunner(KafkaShareConsumer consumer) { this.consumer = consumer; } @Override public void run() { try { consumer.subscribe(Arrays.asList("topic")); while (!closed.get()) { ConsumerRecords records = consumer.poll(Duration.ofMillis(10000)); // Handle new records } } catch (WakeupException e) { // Ignore exception if closing if (!closed.get()) throw e; } finally { consumer.close(); } } // Shutdown hook which can be called from a separate thread public void shutdown() { closed.set(true); consumer.wakeup(); } }Then in a separate thread, the consumer can be shutdown by setting the closed flag and waking up the consumer.
closed.set(true); consumer.wakeup();
Note that while it is possible to use thread interrupts instead of wakeup()
to abort a blocking operation
(in which case, InterruptException
will be raised), we discourage their use since they may cause a clean
shutdown of the consumer to be aborted. Interrupts are mainly supported for those cases where using wakeup()
is impossible, such as when a consumer thread is managed by code that is unaware of the Kafka client.
We have intentionally avoided implementing a particular threading model for processing. Various options for multithreaded processing are possible, of which the most straightforward is to dedicate a thread to each consumer.
Constructor and Description |
---|
KafkaShareConsumer(ConsumerConfig config,
Deserializer<K> keyDeserializer,
Deserializer<V> valueDeserializer) |
KafkaShareConsumer(Map<String,Object> configs)
A consumer is instantiated by providing a set of key-value pairs as configuration.
|
KafkaShareConsumer(Map<String,Object> configs,
Deserializer<K> keyDeserializer,
Deserializer<V> valueDeserializer)
A consumer is instantiated by providing a set of key-value pairs as configuration, and a key and a value
Deserializer . |
KafkaShareConsumer(Properties properties)
A consumer is instantiated by providing a
Properties object as configuration. |
KafkaShareConsumer(Properties properties,
Deserializer<K> keyDeserializer,
Deserializer<V> valueDeserializer)
A consumer is instantiated by providing a
Properties object as configuration, and a
key and a value Deserializer . |
Modifier and Type | Method and Description |
---|---|
void |
acknowledge(ConsumerRecord<K,V> record)
Acknowledge successful delivery of a record returned on the last
poll(Duration) call. |
void |
acknowledge(ConsumerRecord<K,V> record,
AcknowledgeType type)
Acknowledge delivery of a record returned on the last
poll(Duration) call indicating whether
it was processed successfully. |
Uuid |
clientInstanceId(Duration timeout)
Determines the client's unique client instance ID used for telemetry.
|
void |
close()
Close the consumer, waiting for up to the default timeout of 30 seconds for any needed cleanup.
|
void |
close(Duration timeout)
Tries to close the consumer cleanly within the specified timeout.
|
void |
commitAsync()
Commit the acknowledgements for the records returned.
|
Map<TopicIdPartition,Optional<KafkaException>> |
commitSync()
Commit the acknowledgements for the records returned.
|
Map<TopicIdPartition,Optional<KafkaException>> |
commitSync(Duration timeout)
Commit the acknowledgements for the records returned.
|
Map<MetricName,? extends Metric> |
metrics()
Get the metrics kept by the consumer
|
ConsumerRecords<K,V> |
poll(Duration timeout)
Fetch data for the topics specified using
subscribe(Collection) . |
void |
setAcknowledgementCommitCallback(AcknowledgementCommitCallback callback)
Sets the acknowledgement commit callback which can be used to handle acknowledgement completion.
|
void |
subscribe(Collection<String> topics)
Subscribe to the given list of topics to get dynamically assigned partitions.
|
Set<String> |
subscription()
Get the current subscription.
|
void |
unsubscribe()
Unsubscribe from topics currently subscribed with
subscribe(Collection) . |
void |
wakeup()
Wake up the consumer.
|
public KafkaShareConsumer(Map<String,Object> configs)
Valid configuration strings are documented at ConsumerConfig
.
Note: after creating a KafkaShareConsumer
you must always close()
it to avoid resource leaks.
configs
- The consumer configspublic KafkaShareConsumer(Properties properties)
Properties
object as configuration.
Valid configuration strings are documented at ConsumerConfig
.
Note: after creating a KafkaShareConsumer
you must always close()
it to avoid resource leaks.
properties
- The consumer configuration propertiespublic KafkaShareConsumer(Properties properties, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer)
Properties
object as configuration, and a
key and a value Deserializer
.
Valid configuration strings are documented at ConsumerConfig
.
Note: after creating a KafkaShareConsumer
you must always close()
it to avoid resource leaks.
properties
- The consumer configuration propertieskeyDeserializer
- The deserializer for key that implements Deserializer
. The configure() method
won't be called in the consumer when the deserializer is passed in directly.valueDeserializer
- The deserializer for value that implements Deserializer
. The configure() method
won't be called in the consumer when the deserializer is passed in directly.public KafkaShareConsumer(Map<String,Object> configs, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer)
Deserializer
.
Valid configuration strings are documented at ConsumerConfig
.
Note: after creating a KafkaShareConsumer
you must always close()
it to avoid resource leaks.
configs
- The consumer configskeyDeserializer
- The deserializer for key that implements Deserializer
. The configure() method
won't be called in the consumer when the deserializer is passed in directly.valueDeserializer
- The deserializer for value that implements Deserializer
. The configure() method
won't be called in the consumer when the deserializer is passed in directly.public KafkaShareConsumer(ConsumerConfig config, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer)
public Set<String> subscription()
subscribe(Collection)
, or an empty set if no such call has been made.subscription
in interface ShareConsumer<K,V>
subscription()
public void subscribe(Collection<String> topics)
unsubscribe()
.
As part of group management, the coordinator will keep track of the list of consumers that belong to a particular group and will trigger a rebalance operation if any one of the following events are triggered:
subscribe
in interface ShareConsumer<K,V>
topics
- The list of topics to subscribe toIllegalArgumentException
- if topics is null or contains null or empty elementsKafkaException
- for any other unrecoverable errorssubscribe(Collection)
public void unsubscribe()
subscribe(Collection)
.unsubscribe
in interface ShareConsumer<K,V>
KafkaException
- for any other unrecoverable errorsunsubscribe()
public ConsumerRecords<K,V> poll(Duration timeout)
subscribe(Collection)
. It is an error to not have
subscribed to any topics before polling for data.
This method returns immediately if there are records available. Otherwise, it will await the passed timeout. If the timeout expires, an empty record set will be returned.
poll
in interface ShareConsumer<K,V>
timeout
- The maximum time to block (must not be greater than Long.MAX_VALUE
milliseconds)AuthenticationException
- if authentication fails. See the exception for more detailsAuthorizationException
- if caller lacks Read access to any of the subscribed
topics or to the share group. See the exception for more detailsIllegalArgumentException
- if the timeout value is negativeIllegalStateException
- if the consumer is not subscribed to any topicsArithmeticException
- if the timeout is greater than Long.MAX_VALUE
milliseconds.InvalidTopicException
- if the current subscription contains any invalid
topic (per Topic.validate(String)
)WakeupException
- if wakeup()
is called before or while this method is calledInterruptException
- if the calling thread is interrupted before or while this method is calledKafkaException
- for any other unrecoverable errorspoll(Duration)
public void acknowledge(ConsumerRecord<K,V> record)
poll(Duration)
call.
The acknowledgement is committed on the next commitSync()
, commitAsync()
or
poll(Duration)
call.acknowledge
in interface ShareConsumer<K,V>
record
- The record to acknowledgeIllegalStateException
- if the record is not waiting to be acknowledged, or the consumer has already
used implicit acknowledgementacknowledge(ConsumerRecord)
public void acknowledge(ConsumerRecord<K,V> record, AcknowledgeType type)
poll(Duration)
call indicating whether
it was processed successfully. The acknowledgement is committed on the next commitSync()
,
commitAsync()
or poll(Duration)
call. By using this method, the consumer is using
explicit acknowledgement.acknowledge
in interface ShareConsumer<K,V>
record
- The record to acknowledgetype
- The acknowledgement type which indicates whether it was processed successfullyIllegalStateException
- if the record is not waiting to be acknowledged, or the consumer has already
used implicit acknowledgementacknowledge(ConsumerRecord, AcknowledgeType)
public Map<TopicIdPartition,Optional<KafkaException>> commitSync()
acknowledge(ConsumerRecord)
or
acknowledge(ConsumerRecord, AcknowledgeType)
. If the consumer is using implicit acknowledgement,
all the records returned by the latest call to poll(Duration)
are acknowledged.
This is a synchronous commit and will block until either the commit succeeds, an unrecoverable error is
encountered (in which case it is thrown to the caller), or the timeout specified by default.api.timeout.ms
expires.
commitSync
in interface ShareConsumer<K,V>
WakeupException
- if wakeup()
is called before or while this method is calledInterruptException
- if the thread is interrupted while blockedKafkaException
- for any other unrecoverable errorscommitSync()
public Map<TopicIdPartition,Optional<KafkaException>> commitSync(Duration timeout)
acknowledge(ConsumerRecord)
or
acknowledge(ConsumerRecord, AcknowledgeType)
. If the consumer is using implicit acknowledgement,
all the records returned by the latest call to poll(Duration)
are acknowledged.
This is a synchronous commit and will block until either the commit succeeds, an unrecoverable error is encountered (in which case it is thrown to the caller), or the timeout expires.
commitSync
in interface ShareConsumer<K,V>
timeout
- The maximum amount of time to await completion of the acknowledgementIllegalArgumentException
- if the timeout
is negativeWakeupException
- if wakeup()
is called before or while this method is calledInterruptException
- if the thread is interrupted while blockedKafkaException
- for any other unrecoverable errorscommitSync(Duration)
public void commitAsync()
acknowledge(ConsumerRecord)
or
acknowledge(ConsumerRecord, AcknowledgeType)
. If the consumer is using implicit acknowledgement,
all the records returned by the latest call to poll(Duration)
are acknowledged.commitAsync
in interface ShareConsumer<K,V>
KafkaException
- for any other unrecoverable errorscommitAsync()
public void setAcknowledgementCommitCallback(AcknowledgementCommitCallback callback)
setAcknowledgementCommitCallback
in interface ShareConsumer<K,V>
callback
- The acknowledgement commit callbacksetAcknowledgementCommitCallback(AcknowledgementCommitCallback)
public Uuid clientInstanceId(Duration timeout)
If telemetry is enabled, this will first require a connection to the cluster to generate
the unique client instance ID. This method waits up to timeout
for the consumer
client to complete the request.
Client telemetry is controlled by the ConsumerConfig.ENABLE_METRICS_PUSH_CONFIG
configuration option.
clientInstanceId
in interface ShareConsumer<K,V>
timeout
- The maximum time to wait for consumer client to determine its client instance ID.
The value must be non-negative. Specifying a timeout of zero means do not
wait for the initial request to complete if it hasn't already.IllegalArgumentException
- if the timeout
is negativeIllegalStateException
- if telemetry is not enabledWakeupException
- if wakeup()
is called before or while this method is calledInterruptException
- if the thread is interrupted while blockedKafkaException
- if an unexpected error occurs while trying to determine the client
instance ID, though this error does not necessarily imply the
consumer client is otherwise unusablepublic Map<MetricName,? extends Metric> metrics()
metrics
in interface ShareConsumer<K,V>
metrics()
public void close()
close(Duration)
for details. Note that wakeup()
cannot be used to interrupt close.close
in interface Closeable
close
in interface AutoCloseable
close
in interface ShareConsumer<K,V>
WakeupException
- if wakeup()
is called before or while this method is calledInterruptException
- if the thread is interrupted before or while this method is calledKafkaException
- for any other error during closeclose()
public void close(Duration timeout)
timeout
for the consumer to complete acknowledgements and leave the group.
If the consumer is unable to complete acknowledgements and gracefully leave the group
before the timeout expires, the consumer is force closed. Note that wakeup()
cannot be
used to interrupt close.close
in interface ShareConsumer<K,V>
timeout
- The maximum time to wait for consumer to close gracefully. The value must be
non-negative. Specifying a timeout of zero means do not wait for pending requests to complete.IllegalArgumentException
- if the timeout
is negativeWakeupException
- if wakeup()
is called before or while this method is calledInterruptException
- if the thread is interrupted before or while this method is calledKafkaException
- for any other error during closeclose(Duration)
public void wakeup()
WakeupException
.
If no thread is blocking in a method which can throw WakeupException
,
the next call to such a method will raise it instead.wakeup
in interface ShareConsumer<K,V>
wakeup()