public abstract class SinkTask extends Object implements Task
put(Collection) API, which should either write them to the downstream system or batch them for later writing. Periodically, Connect will call flush(Map) to ensure that batched records are actually pushed to the downstream system. Below we describe the lifecycle of a SinkTask.
initialize(SinkTaskContext) to prepare the task's context and start(Map) to accept configuration and start any services needed for processing.open(Collection). These partitions are owned exclusively by this task until they have been closed with close(Collection).put(Collection) API. Periodically, Connect will ask the task to flush records using flush(Map) as described above.close(Collection) and the new assignment will be opened using open(Collection).stop()| Modifier and Type | Field and Description |
|---|---|
static String | TOPICS_CONFIG The configuration key that provides the list of topics that are inputs for this SinkTask. |
static String | TOPICS_REGEX_CONFIG The configuration key that provides a regex specifying which topics to include as inputs for this SinkTask. |
| Constructor and Description |
|---|
SinkTask() |
| Modifier and Type | Method and Description |
|---|---|
void | close(Collection<org.apache.kafka.common.TopicPartition> partitions)The SinkTask uses this method to close writers for partitions that are no longer assigned to the SinkTask. |
void | flush(Map<org.apache.kafka.common.TopicPartition,org.apache.kafka.clients.consumer.OffsetAndMetadata> currentOffsets)Flush all records that have been put(Collection) for the specified topic-partitions. |
void | initialize(SinkTaskContext context)Initialize the context of this task. |
void | onPartitionsAssigned(Collection<org.apache.kafka.common.TopicPartition> partitions)Deprecated. Use open(Collection) for partition initialization. |
void | onPartitionsRevoked(Collection<org.apache.kafka.common.TopicPartition> partitions)Deprecated. Use close(Collection) instead for partition cleanup. |
void | open(Collection<org.apache.kafka.common.TopicPartition> partitions)The SinkTask uses this method to create writers for newly assigned partitions in case of partition rebalance. |
Map<org.apache.kafka.common.TopicPartition,org.apache.kafka.clients.consumer.OffsetAndMetadata> | preCommit(Map<org.apache.kafka.common.TopicPartition,org.apache.kafka.clients.consumer.OffsetAndMetadata> currentOffsets)Pre-commit hook invoked prior to an offset commit. |
abstract void | put(Collection<SinkRecord> records)Put the records in the sink. |
abstract void | start(Map<String,String> props)Start the Task. |
abstract void | stop()Perform any cleanup to stop this task. |
public static final String TOPICS_CONFIG
The configuration key that provides the list of topics that are inputs for this SinkTask.
public static final String TOPICS_REGEX_CONFIG
The configuration key that provides a regex specifying which topics to include as inputs for this SinkTask.
public void initialize(SinkTaskContext context)
open(Collection).context - The sink task's contextpublic abstract void start(Map<String,String> props)
public abstract void put(Collection<SinkRecord> records)
flush(Map) or preCommit(Map) to ensure that offsets are only committed for records that have been written to the downstream system (hence avoiding data loss during failures). If this operation fails, the SinkTask may throw a RetriableException to indicate that the framework should attempt to retry the same call again. Other exceptions will cause the task to be stopped immediately. SinkTaskContext.timeout(long) can be used to set the maximum time before the batch will be retried.
records - the collection of records to sendpublic void flush(Map<org.apache.kafka.common.TopicPartition,org.apache.kafka.clients.consumer.OffsetAndMetadata> currentOffsets)
put(Collection) for the specified topic-partitions.currentOffsets - the current offset state as of the last call to put(Collection)}, provided for convenience but could also be determined by tracking all offsets included in the SinkRecords passed to put(java.util.Collection<org.apache.kafka.connect.sink.SinkRecord>).public Map<org.apache.kafka.common.TopicPartition,org.apache.kafka.clients.consumer.OffsetAndMetadata> preCommit(Map<org.apache.kafka.common.TopicPartition,org.apache.kafka.clients.consumer.OffsetAndMetadata> currentOffsets)
The default implementation simply invokes flush(Map) and is thus able to assume all currentOffsets are safe to commit.
currentOffsets - the current offset state as of the last call to put(Collection)}, provided for convenience but could also be determined by tracking all offsets included in the SinkRecords passed to put(java.util.Collection<org.apache.kafka.connect.sink.SinkRecord>).public void open(Collection<org.apache.kafka.common.TopicPartition> partitions)
partitions - The list of partitions that are now assigned to the task (may include partitions previously assigned to the task)@Deprecated public void onPartitionsAssigned(Collection<org.apache.kafka.common.TopicPartition> partitions)
open(Collection) for partition initialization.public void close(Collection<org.apache.kafka.common.TopicPartition> partitions)
partitions - The list of partitions that should be closed@Deprecated public void onPartitionsRevoked(Collection<org.apache.kafka.common.TopicPartition> partitions)
close(Collection) instead for partition cleanup.public abstract void stop()
put(Collection) has returned) and a final flush(Map) and offset commit has completed. Implementations of this method should only need to perform final cleanup operations, such as closing network connections to the sink system.