Manage Offsets for Fully-Managed Connectors

Custom offsets for managed connectors is Early Access

Confluent uses Early Access releases to gather feedback. This service should be used only for evaluation and non-production testing purposes, or to provide feedback to Confluent, particularly as it becomes more widely available in follow-on preview editions.

Early Access is intended for evaluation use in development and testing environments only and not for production use. The warranty, SLA, and Support Services provisions of your agreement with Confluent do not apply to Early Access. Confluent considers Early Access to be a Proof of Concept as defined in the Confluent Cloud Terms of Service. Confluent may discontinue providing preview releases of the Early Access releases at any time at the sole discretion of Confluent.

You can manage offsets for fully-managed connectors. Offsets provide information on the point in the source system from which the connector accesses data. The source system is different for sink and source connectors. For sink connectors, the source system is Apache Kafka® where data is organized into topic partitions and each topic partition has an associated offset. For source connectors, the source system is an external system where the connector pulls data from. For example, a database where data is organized into tables and the timestamp can be used as an offsets, or a CDC system where data is organized into a single binlog (binary log) and the position in the binlog can be used as the offset.

To set an offset, fully-managed connectors use a JSON format in the API request. Because the source systems are different, sink and source connectors use different formats. Further, the format for sink connectors is consistent between sink connectors, while the format for source connectors can vary. This is because sink connectors collect data from Kafka, while source connectors interact with data using various external systems and each of these external systems can require a special format to manage the offset.

Considerations:

  • When you reset an offset for a connector this running, the connector stops working and the status changes to PAUSED. After the offset is altered, the connector auto resumes. During this time, the connector does not process data.
  • The sink connectors JSON format appears below. Use this format for all sink connectors.
  • If you are using a service account, you need to assign RBAC permissions to manage offsets for sink connectors. For more information, see Sink connector offset management.
  • For source connectors, use the format specified on the supported connector’s installation page.

Supported connectors

Confluent supports the following connectors:

Source connectors

To manage source connector offsets, use the documentation on the connectors installation page:

Sink connectors

  • Confluent supports all fully-managed sink connectors. Confluent resends your topic data to the sink system. You must ensure that the sink system and downstream services are aware of the possibilities of data duplication and out of order data.

To manage sink connector offsets, see Manage sink connector offsets.

Manage sink connector offsets

Use this section to manage sink connector offsets.

To manage sink connector offsets:

To get the current offset, make a GET request that specifies the environment, Kafka cluster, and connector name.

GET /connect/v1/environments/{environment_id}/clusters/{kafka_cluster_id}/connectors/{connector_name}/offsets
Host: https://api.confluent.cloud

Response:

Successful calls return HTTP 200 with a JSON payload that describes the offset.

{
    "id": "lcc-example123",
    "name": "{connector_name}",
    "offsets": [
        {
            "partition": {
                "kafka_partition": 2,
                "kafka_topic": "topic-2"
            },
            "offset": {
                "kafka_offset": 3000
            }
        },
        {
            "partition": {
                "kafka_partition": 1,
                "kafka_topic": "topic-1"
            },
            "offset": {
                "kafka_offset": 7000
            }
        },
        {
            "partition": {
                "kafka_partition": 0,
                "kafka_topic": "topic-0"
            },
            "offset": {
                "kafka_offset": 2000
            }
        }
    ],
    "metadata": {
        "observed_at": "2024-03-06T16:41:46.445116967Z"
    }
}

Responses include the following information:

  • The position of all current offsets, along with a partition and topic
  • The observed time of the offset in the metadata portion of the payload. The observed_at time indicates a snapshot in time for when the API retrieved the offset. A running connector is always updating its offsets. Use observed_at to get a sense for the gap between real time and the time at which the request was made. By default, offsets are observed every minute. Calling get repeatedly will fetch more recently observed offsets.
  • Information about the connector.