Manage Offsets for Fully-Managed Connectors in Confluent Cloud

An offset defines the point in the source system from which the connector accesses data. The definition of the source system varies by connector type:

  • Sink connectors: The source system is the Apache Kafka® cluster, where the offset tracks the position within a topic partition.

  • Source connectors: The source system is the external system where the connector pulls data from. For example, a database where data is organized into tables and the timestamp can be used as an offsets, or a CDC system where data is organized into a single binlog (binary log) and the position in the binlog can be used as the offset.

You can manage offsets for fully-managed connectors in Confluent Cloud. This feature is essential for tasks such as:

  • Migrating the operational state of an existing Confluent Platform connector to a fully-managed Confluent Cloud connector.

  • Transferring an offset from one fully-managed connector to another (for example, during an upgrade).

  • Resetting a connector’s reading position to reprocess or skip data.

To set an offset, fully-managed connectors use a JSON format in the API request. Because the source systems are different, sink and source connectors use different formats. Further, the format for sink connectors is consistent between sink connectors, while the format for source connectors can vary. This is because sink connectors collect data from Kafka, while source connectors interact with data using various external systems and each of these external systems can require a special format to manage the offset.

Considerations:

  • When you reset an offset for a connector this running, the connector stops working and the status changes to PAUSED. After the offset is altered, the connector auto resumes. During this time, the connector does not process data.

  • The sink connectors JSON format appears below. Use this format for all sink connectors.

  • If you are using a service account, you need to assign RBAC permissions to manage offsets for sink connectors. For more information, see Sink connector offset management.

  • For source connectors, use the format specified on the supported connector’s installation page.

  • In scenarios (such as disasters) that may cause data-plane unavailability for connectors, the offsets API will return the last known offset for the connector, as long as the user has not deleted the connector.

Limitations

  • Custom connectors do not support offset management.

  • If fully-managed source connectors have multiple versions available, the earlier versions do not support offset management. Upgrade to the latest version to get offset management capabilities.

Supported connectors

The following connectors support offset management:

Source connectors

The following fully-managed source connectors support offset management:

Sink connectors

All fully-managed sink connectors support offset management. Confluent resends your topic data to the sink system. You must ensure that the sink system and downstream services are aware of the possibilities of data duplication and out of order data.

To manage sink connector offsets, see Sink connectors.

Manage connector offsets

Use this section to manage connector offsets.

Sink connectors

Use the following steps to manage sink connector offsets.

To manage sink connector offsets:

To get the current offset, make a GET request that specifies the environment, Kafka cluster, and connector name.

GET /connect/v1/environments/{environment_id}/clusters/{kafka_cluster_id}/connectors/{connector_name}/offsets
Host: https://api.confluent.cloud

Response:

Successful calls return HTTP 200 with a JSON payload that describes the offset.

{
    "id": "lcc-example123",
    "name": "{connector_name}",
    "offsets": [
        {
            "partition": {
                "kafka_partition": 2,
                "kafka_topic": "topic-2"
            },
            "offset": {
                "kafka_offset": 3000
            }
        },
        {
            "partition": {
                "kafka_partition": 1,
                "kafka_topic": "topic-1"
            },
            "offset": {
                "kafka_offset": 7000
            }
        },
        {
            "partition": {
                "kafka_partition": 0,
                "kafka_topic": "topic-0"
            },
            "offset": {
                "kafka_offset": 2000
            }
        }
    ],
    "metadata": {
        "observed_at": "2024-03-06T16:41:46.445116967Z"
    }
}

Responses include the following information:

  • The position of all current offsets, along with a partition and topic

  • The observed time of the offset in the metadata portion of the payload. The observed_at time indicates a snapshot in time for when the API retrieved the offset. A running connector is always updating its offsets. Use observed_at to get a sense for the gap between real time and the time at which the request was made. By default, offsets are observed every minute. Calling get repeatedly will fetch more recently observed offsets.

  • Information about the connector.

To update the offset, make a POST request that specifies the environment, Kafka cluster, and connector name. Include a JSON payload that specifies the new offset and a patch type.

POST /connect/v1/environments/{environment_id}/clusters/{kafka_cluster_id}/connectors/{connector_name}/offsets/request
Host: https://api.confluent.cloud

   {
       "type":"PATCH",
       "offsets": [
           {
               "partition": {
                   "kafka_partition": 2,
                   "kafka_topic": "topic-2"
               },
               "offset": {
                   "kafka_offset": 1000
               }
           },
           {
               "partition": {
                   "kafka_partition": 1,
                   "kafka_topic": "topic-1"
               },
               "offset": {
                   "kafka_offset": 1000
               }
           },
           {
               "partition": {
                   "kafka_partition": 0,
                   "kafka_topic": "topic-0"
               },
               "offset": {
                   "kafka_offset": 1000
               }
           }
       ]
   }

Considerations:

  • You can only make one offset change at a time for a given connector.

  • This is an asynchronous request. To check the status of this request, you must use the check offset status API. For more information, see Get the status of an offset request.

  • Patch requests move the offsets to the provided point in the topic partition. If the offset provided is not present in the topic partition, it resets to the earliest offset in the topic partition.

  • You don’t need to provide partition and offset information for all the topic partitions that the connector is processing. A partial set also works.

Response:

Successful calls return HTTP 202 Accepted with a JSON payload that describes the offset.

{
    "id": "lcc-example123",
    "name": "{connector_name}",
    "offsets": [
        {
            "partition": {
                "kafka_partition": 2,
                "kafka_topic": "topic-2"
            },
            "offset": {
                "kafka_offset": 1000
            }
        },
        {
            "partition": {
                "kafka_partition": 1,
                "kafka_topic": "topic-1"
            },
            "offset": {
                "kafka_offset": 1000
            }
        },
        {
            "partition": {
                "kafka_partition": 0,
                "kafka_topic": "topic-0"
            },
            "offset": {
                "kafka_offset": 1000
            }
        }
    ],
    "requested_at": "2024-03-06T16:45:07.221304236Z",
    "type": "PATCH"
}

Responses include the following information:

  • The requested position of the offsets, along with a partition and topic

  • The time of the request to update the offset.

  • Information about the connector.

To delete the offset, make a POST request that specifies the environment, Kafka cluster, and connector name. Include a JSON payload that specifies the delete type.

 POST /connect/v1/environments/{environment_id}/clusters/{kafka_cluster_id}/connectors/{connector_name}/offsets/request
 Host: https://api.confluent.cloud

{
  "type": "DELETE"
}

Considerations:

  • Delete requests delete the offset for the provided partition and reset to the base state. A delete request is as if you created a fresh new connector.

  • This is an asynchronous request. To check the status of this request, you must use the check offset status API. For more information, see Get the status of an offset request.

  • Do not issue delete and patch requests at the same time.

  • Delete requests move the offset to the provided point in the topic partition. If the offset provided is not present in the topic partition it resets to the earliest offset in the topic partition.

Response:

Successful calls return HTTP 202 Accepted with a JSON payload that describes the result.

{
    "id": "lcc-example123",
    "name": "GcsSinkConnector_0",
    "offsets": [],
    "requested_at": "2024-03-06T20:11:56.005564011Z",
    "type": "DELETE"
}

Responses include the following information:

  • Empty offsets.

  • The time of the request to delete the offset.

  • Information about Kafka cluster and connector.

  • The type of request.

To get the status of a previous offset request, make a GET request that specifies the environment, Kafka cluster, and connector name.

GET /connect/v1/environments/{environment_id}/clusters/{kafka_cluster_id}/connectors/{connector_name}/offsets/request/status
Host: https://api.confluent.cloud

Considerations:

  • The status endpoint always shows the status of the most recent PATCH/DELETE operation.

Response:

Successful calls return HTTP 200 with a JSON payload that describes the result. The following is an example of an applied patch.

{
    "request": {
        "id": "lcc-example123",
        "name": "{connector_name}",
        "offsets": [],
        "requested_at": "2024-03-06T20:11:56.005564011Z",
        "type": "DELETE"
    },
    "status": {
        "phase": "APPLIED",
        "message": "The Connect framework-managed offsets for this connector have been reset successfully. However, if this connector manages offsets externally, they will need to be manually reset in the system that the connector uses."
    },
    "previous_offsets": [
        {
            "partition": {
                "kafka_partition": 2,
                "kafka_topic": "topic-2"
            },
            "offset": {
                "kafka_offset": 58000
            }
        },
        {
            "partition": {
                "kafka_partition": 1,
                "kafka_topic": "topic-1"
            },
            "offset": {
                "kafka_offset": 147000
            }
        },
        {
            "partition": {
                "kafka_partition": 0,
                "kafka_topic": "topic-0"
            },
            "offset": {
                "kafka_offset": 58000
            }
        }
    ],
    "applied_at": "2024-03-06T20:11:57.766328908Z"
}

Responses include the following information:

  • The original request, including the time it was made.

  • The status of the request: applied, pending, or failed.

  • The time you issued the status request.

  • The previous offsets. These are the offsets that the connector last updated prior to updating the offsets. Use these to try to restore the state of your connector if a patch update causes your connector to fail or to return a connector to its previous state after rolling back.

Source connectors

Use the documentation on the installation page of the supported source connector. For more information, see Source connectors and Cluster API reference.

Create connectors with offsets

Use Confluent Cloud APIs to create connectors that start processing data from a designated offset. For more information, see Cluster API reference.

To create connectors with a designated offset:

  • To create a connector with an offset, make a POST request that specifies the environment, and Kafka cluster. Include a JSON payload that specifies the name of the connector, the configuration for the connector, and the offset.

This is an example of a request to create a sink connector with a specific offset.

While the offset properties are the same for all sink connectors, the name of the connector and configuration properties are different for every connector. Use the documentation for the sink connector to determine the properties to use. For more information, see Supported connectors.

POST /connect/v1/environments/{environment_id}/clusters/{kafka_cluster_id}/connectors
Host: https://api.confluent.cloud

  {
      "name": "{connector-name}",
      "config": {
          ... // connector specific configuration
      },
      "offsets": [
          {
              "partition": {
                "kafka_partition": 2,
                "kafka_topic": "topic-2"
              },
              "offset": {
                "kafka_offset": 1000
              }
          }
      ]
  }

Response:

Successful calls return HTTP 200 with a JSON payload that describes the offset.

{
  "name": "{connector-name}",
  "config": {
      ... // connector specific configuration
  },
  "tasks": [],
  "offsets": [
      {
          "partition": {
            "kafka_partition": 2,
            "kafka_topic": "topic-2"
          },
          "offset": {
            "kafka_offset": 1000
          }
      }
  ]
}

Responses include the following information:

  • The requested position of the offset, along with a partition and topic

  • Information about the connector

This is an example of a request to create a source connector with a specific offset.

The name of the connector, configuration properties, and the payload that describes the offset are different for every source connector. Use the documentation for the source connector to determine the properties to use. For more information, see Source connectors and Supported connectors.

POST /connect/v1/environments/{environment_id}/clusters/{kafka_cluster_id}/connectors
Host: https://api.confluent.cloud

  {
      "name": "{connector-name}",
      "config": {
          ... // connector specific configuration
      },
      "offsets": [
          {
              "partition": {
                ... // connector specific configuration
              },
              "offset": {
                ... // connector specific configuration
              }
          }
      ]
  }

Response:

Successful calls return HTTP 200 with a JSON payload that describes the new connector and the offset.

{
  "name": "{connector-name}",
  "config": {
      ... // connector specific configuration
  },
  "tasks": [],
  "offsets": [
      {
          "partition": {
              ... // connector specific configuration
          },
          "offset": {
              ... // connector specific configuration
          }
      }
  ]
}

Responses include the following information:

  • The requested position of the offset

  • Information about the connector

Migrate to fully-managed connectors

This section describes how to migrate from self-managed connectors to fully-managed connectors. Migration always involves downtime because you must stop the self-managed connector to migrate. In general, migration follows a three step process:

  1. Stop the self-managed connector.

  2. Get the latest offset from the stopped self-managed connector.

  3. Create the full-managed connector on Confluent Cloud, specifying the offset from the stopped self-managed connector.

Prerequisites

  • Kafka version 3.6 or more is required. This corresponds to version 7.6.x of Confluent Platform.

  • You must have REST api access to the connect workers.

Stop self-managed connectors

Stop the self-managed connector.

Considerations:

  • Use stop (do not use pause).

  • Stopping ensures that you get the latest offset.

  • Ensure the connector status shows STOPPED and has no tasks.

To stop self-managed connectors, make a PUT request that specifies the connector name:

PUT /connectors/{connector-name}/stop

To check the status of self-managed connectors, make a GET request that specifies the connector name:

GET /connectors/{connector-name}/status

For more information, see Connectors in Kafka Connect Reference.

Get offsets

Get offsets for self-managed connectors.

Considerations:

  • Sink connectors use a JSON format for offsets that is different from source connectors, but is similar across sink connectors.

  • Source connectors use a JSON format for offsets that can differ across source connectors.

To get the offset of self-managed connectors, make a GET request that specifies the connector name:

GET /connectors/{connector}/offsets

Response

{
  "offsets": [
      {
      "partition": {
          "kafka_partition": // Kafka partition
          "kafka_topic": // Kafka topic
      },
      "offset": {
          "kafka_offset": // Kafka offset
      }
      }
  ]
}
{
  "offsets": [
      {
      "partition": {
          // Connector-defined source partition
      },
      "offset": {
          // Connector-defined source offset
      }
      }
  ]
}

For more information, see Offsets in Kafka Connect Reference.

Create fully-managed connectors

Create fully-managed connectors to migrate self-managed connectors. Use the offset you obtained from the self-managed connector to prevent replaying old data.

Considerations:

  • Before you migrate, familiarize yourself with the documentation for the fully-managed connector to which you are migrating. Some connectors require special processes to migrate.

  • Confluent recommends that the configuration of your self-managed connector and the configuration of your fully-managed connector match as close as possible.

To create connectors with a designated offset:

  • To create a connector with an offset, make a POST request that specifies the environment, and Kafka cluster. Include a JSON payload that specifies the name of the connector, the configuration for the connector, and the offset.

This is an example of a request to create a sink connector with a specific offset.

While the offset properties are the same for all sink connectors, the name of the connector and configuration properties are different for every connector. Use the documentation for the sink connector to determine the properties to use. For more information, see Supported connectors.

POST /connect/v1/environments/{environment_id}/clusters/{kafka_cluster_id}/connectors
Host: https://api.confluent.cloud

  {
      "name": "{connector-name}",
      "config": {
          ... // connector specific configuration
      },
      "offsets": [
          {
              "partition": {
                "kafka_partition": 2,
                "kafka_topic": "topic-2"
              },
              "offset": {
                "kafka_offset": 1000
              }
          }
      ]
  }

Response:

Successful calls return HTTP 200 with a JSON payload that describes the offset.

{
  "name": "{connector-name}",
  "config": {
      ... // connector specific configuration
  },
  "tasks": [],
  "offsets": [
      {
          "partition": {
            "kafka_partition": 2,
            "kafka_topic": "topic-2"
          },
          "offset": {
            "kafka_offset": 1000
          }
      }
  ]
}

Responses include the following information:

  • The requested position of the offset, along with a partition and topic

  • Information about the connector

This is an example of a request to create a source connector with a specific offset.

The name of the connector, configuration properties, and the payload that describes the offset are different for every source connector. Use the documentation for the source connector to determine the properties to use. For more information, see Source connectors and Supported connectors.

POST /connect/v1/environments/{environment_id}/clusters/{kafka_cluster_id}/connectors
Host: https://api.confluent.cloud

  {
      "name": "{connector-name}",
      "config": {
          ... // connector specific configuration
      },
      "offsets": [
          {
              "partition": {
                ... // connector specific configuration
              },
              "offset": {
                ... // connector specific configuration
              }
          }
      ]
  }

Response:

Successful calls return HTTP 200 with a JSON payload that describes the new connector and the offset.

{
  "name": "{connector-name}",
  "config": {
      ... // connector specific configuration
  },
  "tasks": [],
  "offsets": [
      {
          "partition": {
              ... // connector specific configuration
          },
          "offset": {
              ... // connector specific configuration
          }
      }
  ]
}

Responses include the following information:

  • The requested position of the offset

  • Information about the connector

Examples

The following examples show how to create fully-managed connectors with offsets.

This is an example of a request to create an Amazon S3 Sink connector with a specific offset.

POST /connect/v1/environments/{environment_id}/clusters/{kafka_cluster_id}/connectors
Host: https://api.confluent.cloud

  {
      "name": "confluent-s3-sink",
      "config": {
        "name" : "confluent-s3-sink",
        "connector.class": "S3_SINK",
        "kafka.auth.mode": "KAFKA_API_KEY",
        "kafka.api.key": "<my-kafka-api-key>",
        "kafka.api.secret": "<my-kafka-api-secret>",
        "aws.access.key.id" : "<my-aws-access-key>",
        "aws.secret.access.key": "<my-aws-access-key-secret>",
        "input.data.format": "JSON",
        "output.data.format": "JSON",
        "compression.codec": "JSON - gzip",
        "s3.compression.level": "6",
        "s3.bucket.name": "<my-bucket-name>",
        "time.interval" : "HOURLY",
        "flush.size": "1000",
        "tasks.max" : "1",
        "topics": "<topic-1>, <topic-2>"
      },
      "offsets": [
          {
              "partition": {
                "kafka_partition": 2,
                "kafka_topic": "topic-2"
              },
              "offset": {
                "kafka_offset": 1000
              }
          },
              {
              "partition": {
                "kafka_partition": 3,
                "kafka_topic": "topic-1"
              },
              "offset": {
                "kafka_offset": 280
              }
          }
      ]
  }

Response:

Successful calls return HTTP 200 with a JSON payload that describes the offset.

{
  "name": "confluent-s3-sink",
  "config": {
    "name" : "confluent-s3-sink",
    "connector.class": "S3_SINK",
    "kafka.auth.mode": "KAFKA_API_KEY",
    "kafka.api.key": "<my-kafka-api-key>",
    "kafka.api.secret": "<my-kafka-api-secret>",
    "aws.access.key.id" : "<my-aws-access-key>",
    "aws.secret.access.key": "<my-aws-access-key-secret>",
    "input.data.format": "JSON",
    "output.data.format": "JSON",
    "compression.codec": "JSON - gzip",
    "s3.compression.level": "6",
    "s3.bucket.name": "<my-bucket-name>",
    "time.interval" : "HOURLY",
    "flush.size": "1000",
    "tasks.max" : "1",
    "topics": "<topic-1>, <topic-2>"
  },
  "tasks": [],
  "offsets": [
      {
          "partition": {
            "kafka_partition": 2,
            "kafka_topic": "topic-2"
          },
          "offset": {
            "kafka_offset": 1000
          }
      },
          {
          "partition": {
            "kafka_partition": 3,
            "kafka_topic": "topic-1"
          },
          "offset": {
            "kafka_offset": 280
          }
      }
  ]
}

This is an example of a request to create a MongoDB Atlas Source connector with a specific offset.

POST /connect/v1/environments/{environment_id}/clusters/{kafka_cluster_id}/connectors
Host: https://api.confluent.cloud

  {
      "name": "<my-connector-name>",
      "config": {
        "connector.class": "MongoDbAtlasSource",
        "kafka.auth.mode": "KAFKA_API_KEY",
        "kafka.api.key": "<my-kafka-api-key>",
        "kafka.api.secret": "<my-kafka-api-secret>",
        "topic.prefix": "<topic-prefix>",
        "connection.host": "<database-host-address>",
        "connection.user": "<database-username>",
        "connection.password": "<database-password>",
        "database": "<database-name>",
        "collection": "<database-collection-name>",
        "poll.await.time.ms": "5000",
        "poll.max.batch.size": "1000",
        "startup.mode": "copy_existing",
        "output.data.format": "JSON"
        "tasks.max": "1"
      },
      "offsets": [
          {
              "partition": {
                  "ns": "mongodb+srv://cluster0.2a5tnof.mongodb.net/"
              },
              "offset": {
                  "_id": "{\"_data\": \"82661F7DDE000000012B042C0100296E5A1004737030_TRUNCATED\"}"
              }
          }
      ]
  }

Response:

Successful calls return HTTP 200 with a JSON payload that describes the new connector and the offset.

{
  "name": "<my-connector-name>",
  "config": {
    "connector.class": "MongoDbAtlasSource",
    "kafka.auth.mode": "KAFKA_API_KEY",
    "kafka.api.key": "<my-kafka-api-key>",
    "kafka.api.secret": "<my-kafka-api-secret>",
    "topic.prefix": "<topic-prefix>",
    "connection.host": "<database-host-address>",
    "connection.user": "<database-username>",
    "connection.password": "<database-password>",
    "database": "<database-name>",
    "collection": "<database-collection-name>",
    "poll.await.time.ms": "5000",
    "poll.max.batch.size": "1000",
    "startup.mode": "copy_existing",
    "output.data.format": "JSON"
    "tasks.max": "1"
  },
  "tasks": [],
  "offsets": [
      {
          "partition": {
              "ns": "mongodb+srv://cluster0.2a5tnof.mongodb.net/"
          },
          "offset": {
              "_id": "{\"_data\": \"82661F7DDE000000012B042C0100296E5A1004737030_TRUNCATED\"}"
          }
      }
  ]
}