public interface ErrorHandlerContext
ErrorHandlerContext
instances are passed into DeserializationExceptionHandler
,
ProcessingExceptionHandler
, or ProductionExceptionHandler
dependent on what error occurred.
Modifier and Type | Method and Description |
---|---|
org.apache.kafka.common.header.Headers |
headers()
Return the headers of the current source record; could be an empty header if it is not
available.
|
long |
offset()
Return the offset of the current input record; could be
-1 if it is not
available. |
int |
partition()
Return the partition ID of the current input record; could be
-1 if it is not
available. |
String |
processorNodeId()
Return the current processor node ID.
|
TaskId |
taskId()
Return the task ID.
|
long |
timestamp()
Return the current timestamp; could be
-1 if it is not available. |
String |
topic()
Return the topic name of the current input record; could be
null if it is not
available. |
String topic()
null
if it is not
available.
For example, if this method is invoked within a punctuation callback
, or while processing a record that was forwarded by a punctuation
callback, the record won't have an associated topic.
Another example is
KTable.transformValues(ValueTransformerWithKeySupplier, String...)
(and siblings), that do not always guarantee to provide a valid topic name, as they might be
executed "out-of-band" due to some internal optimizations applied by the Kafka Streams DSL.
Additionally, when writing into a changelog topic, there is no associated input record,
and thus no topic name is available.
int partition()
-1
if it is not
available.
For example, if this method is invoked within a punctuation callback
, or while processing a record that was forwarded by a punctuation
callback, the record won't have an associated partition ID.
Another example is
KTable.transformValues(ValueTransformerWithKeySupplier, String...)
(and siblings), that do not always guarantee to provide a valid partition ID, as they might be
executed "out-of-band" due to some internal optimizations applied by the Kafka Streams DSL.
Additionally, when writing into a changelog topic, there is no associated input record,
and thus no partition is available.
long offset()
-1
if it is not
available.
For example, if this method is invoked within a punctuation callback
, or while processing a record that was forwarded by a punctuation
callback, the record won't have an associated offset.
Another example is
KTable.transformValues(ValueTransformerWithKeySupplier, String...)
(and siblings), that do not always guarantee to provide a valid offset, as they might be
executed "out-of-band" due to some internal optimizations applied by the Kafka Streams DSL.
Additionally, when writing into a changelog topic, there is no associated input record,
and thus no offset is available.
org.apache.kafka.common.header.Headers headers()
For example, if this method is invoked within a punctuation callback
, or while processing a record that was forwarded by a punctuation
callback, the record might not have any associated headers.
Another example is
KTable.transformValues(ValueTransformerWithKeySupplier, String...)
(and siblings), that do not always guarantee to provide valid headers, as they might be
executed "out-of-band" due to some internal optimizations applied by the Kafka Streams DSL.
Additionally, when writing into a changelog topic, there is no associated input record,
and thus no headers are available.
String processorNodeId()
TaskId taskId()
long timestamp()
-1
if it is not available.
For example, when writing into a changelog topic, there is no associated input record, and thus no timestamp is available.
If it is triggered while processing a record streamed from the source processor,
timestamp is defined as the timestamp of the current input record; the timestamp is extracted from
ConsumerRecord
by TimestampExtractor
.
Note, that an upstream Processor
might have set a new timestamp by calling
forward(record.withTimestamp(...))
.
In particular, some Kafka Streams DSL operators set result record timestamps explicitly,
to guarantee deterministic results.
If it is triggered while processing a record generated not from the source processor (for example, if this method is invoked from the punctuate call):
PunctuationType.STREAM_TIME
timestamp is defined as the current task's stream time,
which is defined as the largest timestamp of any record processed by the task
PunctuationType.WALL_CLOCK_TIME
timestamp is defined the current system time
If it is triggered from a deserialization failure, timestamp is defined as the timestamp of the
current rawRecord ConsumerRecord
.