Manage Offsets for Fully-Managed Connectors¶
Custom offsets for managed connectors is Early Access
Confluent uses Early Access releases to gather feedback. This service should be used only for evaluation and non-production testing purposes, or to provide feedback to Confluent, particularly as it becomes more widely available in follow-on preview editions.
Early Access is intended for evaluation use in development and testing environments only and not for production use. The warranty, SLA, and Support Services provisions of your agreement with Confluent do not apply to Early Access. Confluent considers Early Access to be a Proof of Concept as defined in the Confluent Cloud Terms of Service. Confluent may discontinue providing preview releases of the Early Access releases at any time at the sole discretion of Confluent.
You can manage offsets for fully-managed connectors. Offsets provide information on the point in the source system from which the connector accesses data. The source system is different for sink and source connectors. For sink connectors, the source system is Apache Kafka® where data is organized into topic partitions and each topic partition has an associated offset. For source connectors, the source system is an external system where the connector pulls data from. For example, a database where data is organized into tables and the timestamp can be used as an offsets, or a CDC system where data is organized into a single binlog (binary log) and the position in the binlog can be used as the offset.
To set an offset, fully-managed connectors use a JSON format in the API request. Because the source systems are different, sink and source connectors use different formats. Further, the format for sink connectors is consistent between sink connectors, while the format for source connectors can vary. This is because sink connectors collect data from Kafka, while source connectors interact with data using various external systems and each of these external systems can require a special format to manage the offset.
Considerations:
- When you reset an offset for a connector this running, the connector stops working and the status changes to
PAUSED
. After the offset is altered, the connector auto resumes. During this time, the connector does not process data. - The sink connectors JSON format appears below. Use this format for all sink connectors.
- If you are using a service account, you need to assign RBAC permissions to manage offsets for sink connectors. For more information, see Sink connector offset management.
- For source connectors, use the format specified on the supported connector’s installation page.
Supported connectors¶
Confluent supports the following connectors:
Source connectors¶
To manage source connector offsets, use the documentation on the connectors installation page:
Sink connectors¶
- Confluent supports all fully-managed sink connectors. Confluent resends your topic data to the sink system. You must ensure that the sink system and downstream services are aware of the possibilities of data duplication and out of order data.
To manage sink connector offsets, see Manage sink connector offsets.
Manage sink connector offsets¶
Use this section to manage sink connector offsets.
To manage sink connector offsets:
- Manage offsets using Confluent Cloud APIs. For more information, see Cluster API reference.
To get the current offset, make a GET
request that specifies the environment, Kafka cluster, and connector name.
GET /connect/v1/environments/{environment_id}/clusters/{kafka_cluster_id}/connectors/{connector_name}/offsets
Host: https://api.confluent.cloud
Response:
Successful calls return HTTP 200
with a JSON payload that describes the offset.
{
"id": "lcc-example123",
"name": "{connector_name}",
"offsets": [
{
"partition": {
"kafka_partition": 2,
"kafka_topic": "topic-2"
},
"offset": {
"kafka_offset": 3000
}
},
{
"partition": {
"kafka_partition": 1,
"kafka_topic": "topic-1"
},
"offset": {
"kafka_offset": 7000
}
},
{
"partition": {
"kafka_partition": 0,
"kafka_topic": "topic-0"
},
"offset": {
"kafka_offset": 2000
}
}
],
"metadata": {
"observed_at": "2024-03-06T16:41:46.445116967Z"
}
}
Responses include the following information:
- The position of all current offsets, along with a partition and topic
- The observed time of the offset in the metadata portion of the payload. The
observed_at
time indicates a snapshot in time for when the API retrieved the offset. A running connector is always updating its offsets. Useobserved_at
to get a sense for the gap between real time and the time at which the request was made. By default, offsets are observed every minute. Calling get repeatedly will fetch more recently observed offsets. - Information about the connector.
To update the offset, make a POST
request that specifies the environment, Kafka cluster, and connector
name. Include a JSON payload that specifies the new offset and a patch type.
POST /connect/v1/environments/{environment_id}/clusters/{kafka_cluster_id}/connectors/{connector_name}/offsets/request
Host: https://api.confluent.cloud
{
"type":"PATCH",
"offsets": [
{
"partition": {
"kafka_partition": 2,
"kafka_topic": "topic-2"
},
"offset": {
"kafka_offset": 1000
}
},
{
"partition": {
"kafka_partition": 1,
"kafka_topic": "topic-1"
},
"offset": {
"kafka_offset": 1000
}
},
{
"partition": {
"kafka_partition": 0,
"kafka_topic": "topic-0"
},
"offset": {
"kafka_offset": 1000
}
}
]
}
Considerations:
- You can only make one offset change at a time for a given connector.
- This is an asynchronous request. To check the status of this request, you must use the check offset status API. For more information, see Get the status of an offset request.
- Patch requests move the offsets to the provided point in the topic partition. If the offset provided is not present in the topic partition, it resets to the earliest offset in the topic partition.
- You don’t need to provide partition and offset information for all the topic partitions that the connector is processing. A partial set also works.
Response:
Successful calls return HTTP 202 Accepted
with a JSON payload that describes the offset.
{
"id": "lcc-example123",
"name": "{connector_name}",
"offsets": [
{
"partition": {
"kafka_partition": 2,
"kafka_topic": "topic-2"
},
"offset": {
"kafka_offset": 1000
}
},
{
"partition": {
"kafka_partition": 1,
"kafka_topic": "topic-1"
},
"offset": {
"kafka_offset": 1000
}
},
{
"partition": {
"kafka_partition": 0,
"kafka_topic": "topic-0"
},
"offset": {
"kafka_offset": 1000
}
}
],
"requested_at": "2024-03-06T16:45:07.221304236Z",
"type": "PATCH"
}
Responses include the following information:
- The requested position of the offsets, along with a partition and topic
- The time of the request to update the offset.
- Information about the connector.
To delete the offset, make a POST
request that specifies the environment, Kafka cluster, and connector
name. Include a JSON payload that specifies the delete type.
POST /connect/v1/environments/{environment_id}/clusters/{kafka_cluster_id}/connectors/{connector_name}/offsets/request
Host: https://api.confluent.cloud
{
"type": "DELETE"
}
Considerations:
- Delete requests delete the offset for the provided partition and reset to the base state. A delete request is as if you created a fresh new connector.
- This is an asynchronous request. To check the status of this request, you must use the check offset status API. For more information, see Get the status of an offset request.
- Do not issue delete and patch requests at the same time.
- Delete requests move the offset to the provided point in the topic partition. If the offset provided is not present in the topic partition it resets to the earliest offset in the topic partition.
Response:
Successful calls return HTTP 202 Accepted
with a JSON payload that describes the result.
{
"id": "lcc-example123",
"name": "GcsSinkConnector_0",
"offsets": [],
"requested_at": "2024-03-06T20:11:56.005564011Z",
"type": "DELETE"
}
Responses include the following information:
- Empty offsets.
- The time of the request to delete the offset.
- Information about Kafka cluster and connector.
- The type of request.
To get the status of a previous offset request, make a GET
request that specifies the environment, Kafka cluster, and connector
name.
GET /connect/v1/environments/{environment_id}/clusters/{kafka_cluster_id}/connectors/{connector_name}/offsets/request/status
Host: https://api.confluent.cloud
Considerations:
- The status endpoint always shows the status of the most recent PATCH/DELETE operation.
Response:
Successful calls return HTTP 200
with a JSON payload that describes the result. The following is an example
of an applied patch.
{
"request": {
"id": "lcc-example123",
"name": "{connector_name}",
"offsets": [],
"requested_at": "2024-03-06T20:11:56.005564011Z",
"type": "DELETE"
},
"status": {
"phase": "APPLIED",
"message": "The Connect framework-managed offsets for this connector have been reset successfully. However, if this connector manages offsets externally, they will need to be manually reset in the system that the connector uses."
},
"previous_offsets": [
{
"partition": {
"kafka_partition": 2,
"kafka_topic": "topic-2"
},
"offset": {
"kafka_offset": 58000
}
},
{
"partition": {
"kafka_partition": 1,
"kafka_topic": "topic-1"
},
"offset": {
"kafka_offset": 147000
}
},
{
"partition": {
"kafka_partition": 0,
"kafka_topic": "topic-0"
},
"offset": {
"kafka_offset": 58000
}
}
],
"applied_at": "2024-03-06T20:11:57.766328908Z"
}
Responses include the following information:
- The original request, including the time it was made.
- The status of the request: applied, pending, or failed.
- The time you issued the status request.
- The previous offsets. These are the offsets that the connector last updated prior to updating the offsets. Use these to try to restore the state of your connector if a patch update causes your connector to fail or to return a connector to its previous state after rolling back.