public abstract class SinkTask extends Object implements Task
put(Collection)
API, which should either write them to the downstream system or batch them for
later writing. Periodically, Connect will call flush(Map)
to ensure that batched records are
actually pushed to the downstream system.
Below we describe the lifecycle of a SinkTask.
initialize(SinkTaskContext)
to prepare the task's context and start(Map)
to accept configuration and start any services
needed for processing.open(Collection)
. These partitions are owned exclusively by this task until they
have been closed with close(Collection)
.put(Collection)
API. Periodically, Connect will ask the task
to flush records using flush(Map)
as described above.close(Collection)
and
the new assignment will be opened using open(Collection)
.stop()
Modifier and Type | Field and Description |
---|---|
static String |
TOPICS_CONFIG
The configuration key that provides the list of topics that are inputs for this
SinkTask.
|
static String |
TOPICS_REGEX_CONFIG
The configuration key that provides a regex specifying which topics to include as inputs
for this SinkTask.
|
Constructor and Description |
---|
SinkTask() |
Modifier and Type | Method and Description |
---|---|
void |
close(Collection<org.apache.kafka.common.TopicPartition> partitions)
The SinkTask uses this method to close writers for partitions that are no
longer assigned to the SinkTask.
|
void |
flush(Map<org.apache.kafka.common.TopicPartition,org.apache.kafka.clients.consumer.OffsetAndMetadata> currentOffsets)
Flush all records that have been
put(Collection) for the specified topic-partitions. |
void |
initialize(SinkTaskContext context)
Initialize the context of this task.
|
void |
onPartitionsAssigned(Collection<org.apache.kafka.common.TopicPartition> partitions)
Deprecated.
Use
open(Collection) for partition initialization. |
void |
onPartitionsRevoked(Collection<org.apache.kafka.common.TopicPartition> partitions)
Deprecated.
Use
close(Collection) instead for partition cleanup. |
void |
open(Collection<org.apache.kafka.common.TopicPartition> partitions)
The SinkTask uses this method to create writers for newly assigned partitions in case of partition
rebalance.
|
Map<org.apache.kafka.common.TopicPartition,org.apache.kafka.clients.consumer.OffsetAndMetadata> |
preCommit(Map<org.apache.kafka.common.TopicPartition,org.apache.kafka.clients.consumer.OffsetAndMetadata> currentOffsets)
Pre-commit hook invoked prior to an offset commit.
|
abstract void |
put(Collection<SinkRecord> records)
Put the records in the sink.
|
abstract void |
start(Map<String,String> props)
Start the Task.
|
abstract void |
stop()
Perform any cleanup to stop this task.
|
public static final String TOPICS_CONFIG
The configuration key that provides the list of topics that are inputs for this SinkTask.
public static final String TOPICS_REGEX_CONFIG
The configuration key that provides a regex specifying which topics to include as inputs for this SinkTask.
public void initialize(SinkTaskContext context)
open(Collection)
.context
- The sink task's contextpublic abstract void start(Map<String,String> props)
public abstract void put(Collection<SinkRecord> records)
flush(Map)
or preCommit(Map)
to ensure that offsets are only committed for records
that have been written to the downstream system (hence avoiding data loss during failures).
If this operation fails, the SinkTask may throw a RetriableException
to
indicate that the framework should attempt to retry the same call again. Other exceptions will cause the task to
be stopped immediately. SinkTaskContext.timeout(long)
can be used to set the maximum time before the
batch will be retried.
records
- the collection of records to sendpublic void flush(Map<org.apache.kafka.common.TopicPartition,org.apache.kafka.clients.consumer.OffsetAndMetadata> currentOffsets)
put(Collection)
for the specified topic-partitions.currentOffsets
- the current offset state as of the last call to put(Collection)
, provided for
convenience but could also be determined by tracking all offsets included in the
SinkRecord
s passed to put(java.util.Collection<org.apache.kafka.connect.sink.SinkRecord>)
. Note that the topic, partition and offset
here correspond to the original Kafka topic partition and offset, before any
transformations
have been applied. These can be tracked by the task
through the SinkRecord.originalTopic()
, SinkRecord.originalKafkaPartition()
and SinkRecord.originalKafkaOffset()
methods.public Map<org.apache.kafka.common.TopicPartition,org.apache.kafka.clients.consumer.OffsetAndMetadata> preCommit(Map<org.apache.kafka.common.TopicPartition,org.apache.kafka.clients.consumer.OffsetAndMetadata> currentOffsets)
The default implementation simply invokes flush(Map)
and is thus able to assume all currentOffsets
are safe to commit.
currentOffsets
- the current offset state as of the last call to put(Collection)
, provided for
convenience but could also be determined by tracking all offsets included in the
SinkRecord
s passed to put(java.util.Collection<org.apache.kafka.connect.sink.SinkRecord>)
. Note that the topic, partition and offset
here correspond to the original Kafka topic partition and offset, before any
transformations
have been applied. These can be tracked by the task
through the SinkRecord.originalTopic()
, SinkRecord.originalKafkaPartition()
and SinkRecord.originalKafkaOffset()
methods.public void open(Collection<org.apache.kafka.common.TopicPartition> partitions)
Note that the topic partitions here correspond to the original Kafka topic partitions, before any
transformations
have been applied.
partitions
- The list of partitions that are now assigned to the task (may include
partitions previously assigned to the task)@Deprecated public void onPartitionsAssigned(Collection<org.apache.kafka.common.TopicPartition> partitions)
open(Collection)
for partition initialization.public void close(Collection<org.apache.kafka.common.TopicPartition> partitions)
Note that the topic partitions here correspond to the original Kafka topic partitions, before any
transformations
have been applied.
partitions
- The list of partitions that should be closed@Deprecated public void onPartitionsRevoked(Collection<org.apache.kafka.common.TopicPartition> partitions)
close(Collection)
instead for partition cleanup.public abstract void stop()
put(Collection)
has returned) and a final flush(Map)
and offset
commit has completed. Implementations of this method should only need to perform final cleanup operations, such
as closing network connections to the sink system.