Manage Offsets for Fully-Managed Connectors in Confluent Cloud¶
You can manage offsets for fully-managed connectors. Offsets provide information on the point in the source system from which the connector accesses data. The source system is different for sink and source connectors. For sink connectors, the source system is Apache Kafka® where data is organized into topic partitions and each topic partition has an associated offset. For source connectors, the source system is an external system where the connector pulls data from. For example, a database where data is organized into tables and the timestamp can be used as an offsets, or a CDC system where data is organized into a single binlog (binary log) and the position in the binlog can be used as the offset.
To set an offset, fully-managed connectors use a JSON format in the API request. Because the source systems are different, sink and source connectors use different formats. Further, the format for sink connectors is consistent between sink connectors, while the format for source connectors can vary. This is because sink connectors collect data from Kafka, while source connectors interact with data using various external systems and each of these external systems can require a special format to manage the offset.
Considerations:
- When you reset an offset for a connector this running, the connector stops working and the status changes to
PAUSED
. After the offset is altered, the connector auto resumes. During this time, the connector does not process data. - The sink connectors JSON format appears below. Use this format for all sink connectors.
- If you are using a service account, you need to assign RBAC permissions to manage offsets for sink connectors. For more information, see Sink connector offset management.
- For source connectors, use the format specified on the supported connector’s installation page.
Limitations¶
- Custom connectors do not support offset management.
- If fully-managed source connectors have multiple versions available, the earlier versions do not support offset management. Upgrade to the latest version to get offset management capabilities.
Supported connectors¶
The following connectors support offset management:
Source connectors¶
The following fully-managed source connectors support offset management:
- Amazon DynamoDB Source connector
- Amazon Kinesis Source connector
- Amazon S3 Source connector
- Azure Blob Storage Source connector
- Azure Cosmos DB Source connector
- GitHub Source connector
- Google Cloud Storage (GCS) Source connector
- HTTP Source V2 connector
- InfluxDB 2 Source connector
- Jira Source connector
- Microsoft SQL Server Change Data Capture (CDC) Source V2 (Debezium) connector
- Microsoft SQL Server Source (JDBC)
- MongoDB Atlas Source connector
- MySQL CDC Source (Debezium) [Legacy] connector
- MySQL CDC Source V2 (Debezium) connector
- MySQL Source (JDBC)
- Oracle CDC Source connector
- Oracle Database Source (JDBC)
- PostgreSQL Change Data Capture (CDC) Source V2 (Debezium) connector
- PostgreSQL Source (JDBC)
- Salesforce Bulk API 2.0 Source connector
- Salesforce Change Data Capture (CDC) Source connector
- Salesforce Platform Event Source connector
- Salesforce PushTopic Source connector
- ServiceNow Source connector
- Zendesk Source connector
Sink connectors¶
- All fully-managed sink connectors support offset management. Confluent resends your topic data to the sink system. You must ensure that the sink system and downstream services are aware of the possibilities of data duplication and out of order data.
To manage sink connector offsets, see Sink connectors.
Manage connector offsets¶
Use this section to manage connector offsets.
Sink connectors¶
Use the following steps to manage sink connector offsets.
To manage sink connector offsets:
- Manage offsets using Confluent Cloud APIs. For more information, see Cluster API reference.
To get the current offset, make a GET
request that specifies the environment, Kafka cluster, and connector name.
GET /connect/v1/environments/{environment_id}/clusters/{kafka_cluster_id}/connectors/{connector_name}/offsets
Host: https://api.confluent.cloud
Response:
Successful calls return HTTP 200
with a JSON payload that describes the offset.
{
"id": "lcc-example123",
"name": "{connector_name}",
"offsets": [
{
"partition": {
"kafka_partition": 2,
"kafka_topic": "topic-2"
},
"offset": {
"kafka_offset": 3000
}
},
{
"partition": {
"kafka_partition": 1,
"kafka_topic": "topic-1"
},
"offset": {
"kafka_offset": 7000
}
},
{
"partition": {
"kafka_partition": 0,
"kafka_topic": "topic-0"
},
"offset": {
"kafka_offset": 2000
}
}
],
"metadata": {
"observed_at": "2024-03-06T16:41:46.445116967Z"
}
}
Responses include the following information:
- The position of all current offsets, along with a partition and topic
- The observed time of the offset in the metadata portion of the payload. The
observed_at
time indicates a snapshot in time for when the API retrieved the offset. A running connector is always updating its offsets. Useobserved_at
to get a sense for the gap between real time and the time at which the request was made. By default, offsets are observed every minute. Calling get repeatedly will fetch more recently observed offsets. - Information about the connector.
To update the offset, make a POST
request that specifies the environment, Kafka cluster, and connector
name. Include a JSON payload that specifies the new offset and a patch type.
POST /connect/v1/environments/{environment_id}/clusters/{kafka_cluster_id}/connectors/{connector_name}/offsets/request
Host: https://api.confluent.cloud
{
"type":"PATCH",
"offsets": [
{
"partition": {
"kafka_partition": 2,
"kafka_topic": "topic-2"
},
"offset": {
"kafka_offset": 1000
}
},
{
"partition": {
"kafka_partition": 1,
"kafka_topic": "topic-1"
},
"offset": {
"kafka_offset": 1000
}
},
{
"partition": {
"kafka_partition": 0,
"kafka_topic": "topic-0"
},
"offset": {
"kafka_offset": 1000
}
}
]
}
Considerations:
- You can only make one offset change at a time for a given connector.
- This is an asynchronous request. To check the status of this request, you must use the check offset status API. For more information, see Get the status of an offset request.
- Patch requests move the offsets to the provided point in the topic partition. If the offset provided is not present in the topic partition, it resets to the earliest offset in the topic partition.
- You don’t need to provide partition and offset information for all the topic partitions that the connector is processing. A partial set also works.
Response:
Successful calls return HTTP 202 Accepted
with a JSON payload that describes the offset.
{
"id": "lcc-example123",
"name": "{connector_name}",
"offsets": [
{
"partition": {
"kafka_partition": 2,
"kafka_topic": "topic-2"
},
"offset": {
"kafka_offset": 1000
}
},
{
"partition": {
"kafka_partition": 1,
"kafka_topic": "topic-1"
},
"offset": {
"kafka_offset": 1000
}
},
{
"partition": {
"kafka_partition": 0,
"kafka_topic": "topic-0"
},
"offset": {
"kafka_offset": 1000
}
}
],
"requested_at": "2024-03-06T16:45:07.221304236Z",
"type": "PATCH"
}
Responses include the following information:
- The requested position of the offsets, along with a partition and topic
- The time of the request to update the offset.
- Information about the connector.
To delete the offset, make a POST
request that specifies the environment, Kafka cluster, and connector
name. Include a JSON payload that specifies the delete type.
POST /connect/v1/environments/{environment_id}/clusters/{kafka_cluster_id}/connectors/{connector_name}/offsets/request
Host: https://api.confluent.cloud
{
"type": "DELETE"
}
Considerations:
- Delete requests delete the offset for the provided partition and reset to the base state. A delete request is as if you created a fresh new connector.
- This is an asynchronous request. To check the status of this request, you must use the check offset status API. For more information, see Get the status of an offset request.
- Do not issue delete and patch requests at the same time.
- Delete requests move the offset to the provided point in the topic partition. If the offset provided is not present in the topic partition it resets to the earliest offset in the topic partition.
Response:
Successful calls return HTTP 202 Accepted
with a JSON payload that describes the result.
{
"id": "lcc-example123",
"name": "GcsSinkConnector_0",
"offsets": [],
"requested_at": "2024-03-06T20:11:56.005564011Z",
"type": "DELETE"
}
Responses include the following information:
- Empty offsets.
- The time of the request to delete the offset.
- Information about Kafka cluster and connector.
- The type of request.
To get the status of a previous offset request, make a GET
request that specifies the environment, Kafka cluster, and connector
name.
GET /connect/v1/environments/{environment_id}/clusters/{kafka_cluster_id}/connectors/{connector_name}/offsets/request/status
Host: https://api.confluent.cloud
Considerations:
- The status endpoint always shows the status of the most recent PATCH/DELETE operation.
Response:
Successful calls return HTTP 200
with a JSON payload that describes the result. The following is an example
of an applied patch.
{
"request": {
"id": "lcc-example123",
"name": "{connector_name}",
"offsets": [],
"requested_at": "2024-03-06T20:11:56.005564011Z",
"type": "DELETE"
},
"status": {
"phase": "APPLIED",
"message": "The Connect framework-managed offsets for this connector have been reset successfully. However, if this connector manages offsets externally, they will need to be manually reset in the system that the connector uses."
},
"previous_offsets": [
{
"partition": {
"kafka_partition": 2,
"kafka_topic": "topic-2"
},
"offset": {
"kafka_offset": 58000
}
},
{
"partition": {
"kafka_partition": 1,
"kafka_topic": "topic-1"
},
"offset": {
"kafka_offset": 147000
}
},
{
"partition": {
"kafka_partition": 0,
"kafka_topic": "topic-0"
},
"offset": {
"kafka_offset": 58000
}
}
],
"applied_at": "2024-03-06T20:11:57.766328908Z"
}
Responses include the following information:
- The original request, including the time it was made.
- The status of the request: applied, pending, or failed.
- The time you issued the status request.
- The previous offsets. These are the offsets that the connector last updated prior to updating the offsets. Use these to try to restore the state of your connector if a patch update causes your connector to fail or to return a connector to its previous state after rolling back.
Source connectors¶
Use the documentation on the installation page of the supported source connector. For more information, see Source connectors and Cluster API reference.
Create connectors with offsets¶
Use Confluent Cloud APIs to create connectors that start processing data from a designated offset. For more information, see Cluster API reference.
To create connectors with a designated offset:
- To create a connector with an offset, make a
POST
request that specifies the environment, and Kafka cluster. Include a JSON payload that specifies the name of the connector, the configuration for the connector, and the the offset.
This is an example of a request to create a sink connector with a specific offset.
While the offset properties are the same for all sink connectors, the name of the connector and configuration properties are different for every connector. Use the documentation for the sink connector to determine the properties to use. For more information, see Supported connectors.
POST /connect/v1/environments/{environment_id}/clusters/{kafka_cluster_id}/connectors
Host: https://api.confluent.cloud
{
"name": "{connector-name}",
"config": {
... // connector specific configuration
},
"offsets": [
{
"partition": {
"kafka_partition": 2,
"kafka_topic": "topic-2"
},
"offset": {
"kafka_offset": 1000
}
}
]
}
Response:
Successful calls return HTTP 200
with a JSON payload that describes the offset.
{
"name": "{connector-name}",
"config": {
... // connector specific configuration
},
"tasks": [],
"offsets": [
{
"partition": {
"kafka_partition": 2,
"kafka_topic": "topic-2"
},
"offset": {
"kafka_offset": 1000
}
}
]
}
Responses include the following information:
- The requested position of the offset, along with a partition and topic
- Information about the connector
This is an example of a request to create a source connector with a specific offset.
The name of the connector, configuration properties, and the payload that describes the offset are different for every source connector. Use the documentation for the source connector to determine the properties to use. For more information, see Source connectors and Supported connectors.
POST /connect/v1/environments/{environment_id}/clusters/{kafka_cluster_id}/connectors
Host: https://api.confluent.cloud
{
"name": "{connector-name}",
"config": {
... // connector specific configuration
},
"offsets": [
{
"partition": {
... // connector specific configuration
},
"offset": {
... // connector specific configuration
}
}
]
}
Response:
Successful calls return HTTP 200
with a JSON payload that describes the new connector and the offset.
{
"name": "{connector-name}",
"config": {
... // connector specific configuration
},
"tasks": [],
"offsets": [
{
"partition": {
... // connector specific configuration
},
"offset": {
... // connector specific configuration
}
}
]
}
Responses include the following information:
- The requested position of the offset
- Information about the connector
Migrate to fully-managed connectors¶
This section describes how to migrate from self-managed connectors to fully-managed connectors. Migration always involves downtime because you must stop the self-managed connector to migrate. In general, migration follows a three step process:
- Stop the self-managed connector.
- Get the latest offset from the stopped self-managed connector.
- Create the full-managed connector on Confluent Cloud, specifying the offset from the stopped self-managed connector.
Prerequisites¶
- Kafka version 3.6 or more is required. This corresponds to version
7.6.x
of Confluent Platform. - You must have REST api access to the connect workers.
Stop self-managed connectors¶
Stop the self-managed connector.
Considerations:
- Use stop (do not use pause).
- Stopping ensures that you get the latest offset.
- Ensure the connector status shows
STOPPED
and has no tasks.
To stop self-managed connectors, make a PUT
request that specifies the connector name:
PUT /connectors/{connector-name}/stop
To check the status of self-managed connectors, make a GET
request that specifies the connector name:
GET /connectors/{connector-name}/status
For more information, see Connectors in Kafka Connect Reference.
Get offsets¶
Get offsets for self-managed connectors.
Considerations:
- Sink connectors use a JSON format for offsets that is different from source connectors, but is similar across sink connectors.
- Source connectors use a JSON format for offsets that can differ across source connectors.
To get the offset of self-managed connectors, make a GET
request that specifies the connector name:
GET /connectors/{connector}/offsets
Response
{
"offsets": [
{
"partition": {
"kafka_partition": // Kafka partition
"kafka_topic": // Kafka topic
},
"offset": {
"kafka_offset": // Kafka offset
}
}
]
}
{
"offsets": [
{
"partition": {
// Connector-defined source partition
},
"offset": {
// Connector-defined source offset
}
}
]
}
For more information, see Offsets in Kafka Connect Reference.
Create fully-managed connectors¶
Create fully-managed connectors to migrate self-managed connectors. Use the offset you obtained from the self-managed connector to prevent replaying old data.
Considerations:
- Before you migrate, familiarize yourself with the documentation for the fully-managed connector to which you are migrating. Some connectors require special processes to migrate.
- Confluent recommends that the configuration of your self-managed connector and the configuration of your fully-managed connector match as close as possible.
To create connectors with a designated offset:
- To create a connector with an offset, make a
POST
request that specifies the environment, and Kafka cluster. Include a JSON payload that specifies the name of the connector, the configuration for the connector, and the the offset.
This is an example of a request to create a sink connector with a specific offset.
While the offset properties are the same for all sink connectors, the name of the connector and configuration properties are different for every connector. Use the documentation for the sink connector to determine the properties to use. For more information, see Supported connectors.
POST /connect/v1/environments/{environment_id}/clusters/{kafka_cluster_id}/connectors
Host: https://api.confluent.cloud
{
"name": "{connector-name}",
"config": {
... // connector specific configuration
},
"offsets": [
{
"partition": {
"kafka_partition": 2,
"kafka_topic": "topic-2"
},
"offset": {
"kafka_offset": 1000
}
}
]
}
Response:
Successful calls return HTTP 200
with a JSON payload that describes the offset.
{
"name": "{connector-name}",
"config": {
... // connector specific configuration
},
"tasks": [],
"offsets": [
{
"partition": {
"kafka_partition": 2,
"kafka_topic": "topic-2"
},
"offset": {
"kafka_offset": 1000
}
}
]
}
Responses include the following information:
- The requested position of the offset, along with a partition and topic
- Information about the connector
This is an example of a request to create a source connector with a specific offset.
The name of the connector, configuration properties, and the payload that describes the offset are different for every source connector. Use the documentation for the source connector to determine the properties to use. For more information, see Source connectors and Supported connectors.
POST /connect/v1/environments/{environment_id}/clusters/{kafka_cluster_id}/connectors
Host: https://api.confluent.cloud
{
"name": "{connector-name}",
"config": {
... // connector specific configuration
},
"offsets": [
{
"partition": {
... // connector specific configuration
},
"offset": {
... // connector specific configuration
}
}
]
}
Response:
Successful calls return HTTP 200
with a JSON payload that describes the new connector and the offset.
{
"name": "{connector-name}",
"config": {
... // connector specific configuration
},
"tasks": [],
"offsets": [
{
"partition": {
... // connector specific configuration
},
"offset": {
... // connector specific configuration
}
}
]
}
Responses include the following information:
- The requested position of the offset
- Information about the connector
Examples¶
The following examples show how to create fully-managed connectors with offsets.
This is an example of a request to create an Amazon S3 Sink connector with a specific offset.
POST /connect/v1/environments/{environment_id}/clusters/{kafka_cluster_id}/connectors
Host: https://api.confluent.cloud
{
"name": "confluent-s3-sink",
"config": {
"name" : "confluent-s3-sink",
"connector.class": "S3_SINK",
"kafka.auth.mode": "KAFKA_API_KEY",
"kafka.api.key": "<my-kafka-api-key>",
"kafka.api.secret": "<my-kafka-api-secret>",
"aws.access.key.id" : "<my-aws-access-key>",
"aws.secret.access.key": "<my-aws-access-key-secret>",
"input.data.format": "JSON",
"output.data.format": "JSON",
"compression.codec": "JSON - gzip",
"s3.compression.level": "6",
"s3.bucket.name": "<my-bucket-name>",
"time.interval" : "HOURLY",
"flush.size": "1000",
"tasks.max" : "1",
"topics": "<topic-1>, <topic-2>"
},
"offsets": [
{
"partition": {
"kafka_partition": 2,
"kafka_topic": "topic-2"
},
"offset": {
"kafka_offset": 1000
}
},
{
"partition": {
"kafka_partition": 3,
"kafka_topic": "topic-1"
},
"offset": {
"kafka_offset": 280
}
}
]
}
Response:
Successful calls return HTTP 200
with a JSON payload that describes the offset.
{
"name": "confluent-s3-sink",
"config": {
"name" : "confluent-s3-sink",
"connector.class": "S3_SINK",
"kafka.auth.mode": "KAFKA_API_KEY",
"kafka.api.key": "<my-kafka-api-key>",
"kafka.api.secret": "<my-kafka-api-secret>",
"aws.access.key.id" : "<my-aws-access-key>",
"aws.secret.access.key": "<my-aws-access-key-secret>",
"input.data.format": "JSON",
"output.data.format": "JSON",
"compression.codec": "JSON - gzip",
"s3.compression.level": "6",
"s3.bucket.name": "<my-bucket-name>",
"time.interval" : "HOURLY",
"flush.size": "1000",
"tasks.max" : "1",
"topics": "<topic-1>, <topic-2>"
},
"tasks": [],
"offsets": [
{
"partition": {
"kafka_partition": 2,
"kafka_topic": "topic-2"
},
"offset": {
"kafka_offset": 1000
}
},
{
"partition": {
"kafka_partition": 3,
"kafka_topic": "topic-1"
},
"offset": {
"kafka_offset": 280
}
}
]
}
This is an example of a request to create a MongoDB Atlas Source connector with a specific offset.
POST /connect/v1/environments/{environment_id}/clusters/{kafka_cluster_id}/connectors
Host: https://api.confluent.cloud
{
"name": "<my-connector-name>",
"config": {
"connector.class": "MongoDbAtlasSource",
"kafka.auth.mode": "KAFKA_API_KEY",
"kafka.api.key": "<my-kafka-api-key>",
"kafka.api.secret": "<my-kafka-api-secret>",
"topic.prefix": "<topic-prefix>",
"connection.host": "<database-host-address>",
"connection.user": "<database-username>",
"connection.password": "<database-password>",
"database": "<database-name>",
"collection": "<database-collection-name>",
"poll.await.time.ms": "5000",
"poll.max.batch.size": "1000",
"startup.mode": "copy_existing",
"output.data.format": "JSON"
"tasks.max": "1"
},
"offsets": [
{
"partition": {
"ns": "mongodb+srv://cluster0.2a5tnof.mongodb.net/"
},
"offset": {
"_id": "{\"_data\": \"82661F7DDE000000012B042C0100296E5A1004737030_TRUNCATED\"}"
}
}
]
}
Response:
Successful calls return HTTP 200
with a JSON payload that describes the new connector and the offset.
{
"name": "<my-connector-name>",
"config": {
"connector.class": "MongoDbAtlasSource",
"kafka.auth.mode": "KAFKA_API_KEY",
"kafka.api.key": "<my-kafka-api-key>",
"kafka.api.secret": "<my-kafka-api-secret>",
"topic.prefix": "<topic-prefix>",
"connection.host": "<database-host-address>",
"connection.user": "<database-username>",
"connection.password": "<database-password>",
"database": "<database-name>",
"collection": "<database-collection-name>",
"poll.await.time.ms": "5000",
"poll.max.batch.size": "1000",
"startup.mode": "copy_existing",
"output.data.format": "JSON"
"tasks.max": "1"
},
"tasks": [],
"offsets": [
{
"partition": {
"ns": "mongodb+srv://cluster0.2a5tnof.mongodb.net/"
},
"offset": {
"_id": "{\"_data\": \"82661F7DDE000000012B042C0100296E5A1004737030_TRUNCATED\"}"
}
}
]
}