Manage Offsets for Fully-Managed Connectors in Confluent Cloud

You can manage offsets for fully-managed connectors. Offsets provide information on the point in the source system from which the connector accesses data. The source system is different for sink and source connectors. For sink connectors, the source system is Apache Kafka® where data is organized into topic partitions and each topic partition has an associated offset. For source connectors, the source system is an external system where the connector pulls data from. For example, a database where data is organized into tables and the timestamp can be used as an offsets, or a CDC system where data is organized into a single binlog (binary log) and the position in the binlog can be used as the offset.

To set an offset, fully-managed connectors use a JSON format in the API request. Because the source systems are different, sink and source connectors use different formats. Further, the format for sink connectors is consistent between sink connectors, while the format for source connectors can vary. This is because sink connectors collect data from Kafka, while source connectors interact with data using various external systems and each of these external systems can require a special format to manage the offset.

Considerations:

  • When you reset an offset for a connector this running, the connector stops working and the status changes to PAUSED. After the offset is altered, the connector auto resumes. During this time, the connector does not process data.
  • The sink connectors JSON format appears below. Use this format for all sink connectors.
  • If you are using a service account, you need to assign RBAC permissions to manage offsets for sink connectors. For more information, see Sink connector offset management.
  • For source connectors, use the format specified on the supported connector’s installation page.

Limitations

  • Custom connectors do not support offset management.
  • If fully-managed source connectors have multiple versions available, the earlier versions do not support offset management. Upgrade to the latest version to get offset management capabilities.

Manage connector offsets

Use this section to manage connector offsets.

Sink connectors

Use the following steps to manage sink connector offsets.

To manage sink connector offsets:

To get the current offset, make a GET request that specifies the environment, Kafka cluster, and connector name.

GET /connect/v1/environments/{environment_id}/clusters/{kafka_cluster_id}/connectors/{connector_name}/offsets
Host: https://api.confluent.cloud

Response:

Successful calls return HTTP 200 with a JSON payload that describes the offset.

{
    "id": "lcc-example123",
    "name": "{connector_name}",
    "offsets": [
        {
            "partition": {
                "kafka_partition": 2,
                "kafka_topic": "topic-2"
            },
            "offset": {
                "kafka_offset": 3000
            }
        },
        {
            "partition": {
                "kafka_partition": 1,
                "kafka_topic": "topic-1"
            },
            "offset": {
                "kafka_offset": 7000
            }
        },
        {
            "partition": {
                "kafka_partition": 0,
                "kafka_topic": "topic-0"
            },
            "offset": {
                "kafka_offset": 2000
            }
        }
    ],
    "metadata": {
        "observed_at": "2024-03-06T16:41:46.445116967Z"
    }
}

Responses include the following information:

  • The position of all current offsets, along with a partition and topic
  • The observed time of the offset in the metadata portion of the payload. The observed_at time indicates a snapshot in time for when the API retrieved the offset. A running connector is always updating its offsets. Use observed_at to get a sense for the gap between real time and the time at which the request was made. By default, offsets are observed every minute. Calling get repeatedly will fetch more recently observed offsets.
  • Information about the connector.

Source connectors

Use the documentation on the installation page of the supported source connector. For more information, see Source connectors and Cluster API reference.

Create connectors with offsets

Use Confluent Cloud APIs to create connectors that start processing data from a designated offset. For more information, see Cluster API reference.

To create connectors with a designated offset:

  • To create a connector with an offset, make a POST request that specifies the environment, and Kafka cluster. Include a JSON payload that specifies the name of the connector, the configuration for the connector, and the the offset.

This is an example of a request to create a sink connector with a specific offset.

While the offset properties are the same for all sink connectors, the name of the connector and configuration properties are different for every connector. Use the documentation for the sink connector to determine the properties to use. For more information, see Supported connectors.

POST /connect/v1/environments/{environment_id}/clusters/{kafka_cluster_id}/connectors
Host: https://api.confluent.cloud

  {
      "name": "{connector-name}",
      "config": {
          ... // connector specific configuration
      },
      "offsets": [
          {
              "partition": {
                "kafka_partition": 2,
                "kafka_topic": "topic-2"
              },
              "offset": {
                "kafka_offset": 1000
              }
          }
      ]
  }

Response:

Successful calls return HTTP 200 with a JSON payload that describes the offset.

{
  "name": "{connector-name}",
  "config": {
      ... // connector specific configuration
  },
  "tasks": [],
  "offsets": [
      {
          "partition": {
            "kafka_partition": 2,
            "kafka_topic": "topic-2"
          },
          "offset": {
            "kafka_offset": 1000
          }
      }
  ]
}

Responses include the following information:

  • The requested position of the offset, along with a partition and topic
  • Information about the connector

Migrate to fully-managed connectors

This section describes how to migrate from self-managed connectors to fully-managed connectors. Migration always involves downtime because you must stop the self-managed connector to migrate. In general, migration follows a three step process:

  1. Stop the self-managed connector.
  2. Get the latest offset from the stopped self-managed connector.
  3. Create the full-managed connector on Confluent Cloud, specifying the offset from the stopped self-managed connector.

Prerequisites

  • Kafka version 3.6 or more is required. This corresponds to version 7.6.x of Confluent Platform.
  • You must have REST api access to the connect workers.

Stop self-managed connectors

Stop the self-managed connector.

Considerations:

  • Use stop (do not use pause).
  • Stopping ensures that you get the latest offset.
  • Ensure the connector status shows STOPPED and has no tasks.

To stop self-managed connectors, make a PUT request that specifies the connector name:

PUT /connectors/{connector-name}/stop

To check the status of self-managed connectors, make a GET request that specifies the connector name:

GET /connectors/{connector-name}/status

For more information, see Connectors in Kafka Connect Reference.

Get offsets

Get offsets for self-managed connectors.

Considerations:

  • Sink connectors use a JSON format for offsets that is different from source connectors, but is similar across sink connectors.
  • Source connectors use a JSON format for offsets that can differ across source connectors.

To get the offset of self-managed connectors, make a GET request that specifies the connector name:

GET /connectors/{connector}/offsets

Response

{
  "offsets": [
      {
      "partition": {
          "kafka_partition": // Kafka partition
          "kafka_topic": // Kafka topic
      },
      "offset": {
          "kafka_offset": // Kafka offset
      }
      }
  ]
}

For more information, see Offsets in Kafka Connect Reference.

Create fully-managed connectors

Create fully-managed connectors to migrate self-managed connectors. Use the offset you obtained from the self-managed connector to prevent replaying old data.

Considerations:

  • Before you migrate, familiarize yourself with the documentation for the fully-managed connector to which you are migrating. Some connectors require special processes to migrate.
  • Confluent recommends that the configuration of your self-managed connector and the configuration of your fully-managed connector match as close as possible.

To create connectors with a designated offset:

  • To create a connector with an offset, make a POST request that specifies the environment, and Kafka cluster. Include a JSON payload that specifies the name of the connector, the configuration for the connector, and the the offset.

This is an example of a request to create a sink connector with a specific offset.

While the offset properties are the same for all sink connectors, the name of the connector and configuration properties are different for every connector. Use the documentation for the sink connector to determine the properties to use. For more information, see Supported connectors.

POST /connect/v1/environments/{environment_id}/clusters/{kafka_cluster_id}/connectors
Host: https://api.confluent.cloud

  {
      "name": "{connector-name}",
      "config": {
          ... // connector specific configuration
      },
      "offsets": [
          {
              "partition": {
                "kafka_partition": 2,
                "kafka_topic": "topic-2"
              },
              "offset": {
                "kafka_offset": 1000
              }
          }
      ]
  }

Response:

Successful calls return HTTP 200 with a JSON payload that describes the offset.

{
  "name": "{connector-name}",
  "config": {
      ... // connector specific configuration
  },
  "tasks": [],
  "offsets": [
      {
          "partition": {
            "kafka_partition": 2,
            "kafka_topic": "topic-2"
          },
          "offset": {
            "kafka_offset": 1000
          }
      }
  ]
}

Responses include the following information:

  • The requested position of the offset, along with a partition and topic
  • Information about the connector

Examples

The following examples show how to create fully-managed connectors with offsets.

This is an example of a request to create an Amazon S3 Sink connector with a specific offset.

POST /connect/v1/environments/{environment_id}/clusters/{kafka_cluster_id}/connectors
Host: https://api.confluent.cloud

  {
      "name": "confluent-s3-sink",
      "config": {
        "name" : "confluent-s3-sink",
        "connector.class": "S3_SINK",
        "kafka.auth.mode": "KAFKA_API_KEY",
        "kafka.api.key": "<my-kafka-api-key>",
        "kafka.api.secret": "<my-kafka-api-secret>",
        "aws.access.key.id" : "<my-aws-access-key>",
        "aws.secret.access.key": "<my-aws-access-key-secret>",
        "input.data.format": "JSON",
        "output.data.format": "JSON",
        "compression.codec": "JSON - gzip",
        "s3.compression.level": "6",
        "s3.bucket.name": "<my-bucket-name>",
        "time.interval" : "HOURLY",
        "flush.size": "1000",
        "tasks.max" : "1",
        "topics": "<topic-1>, <topic-2>"
      },
      "offsets": [
          {
              "partition": {
                "kafka_partition": 2,
                "kafka_topic": "topic-2"
              },
              "offset": {
                "kafka_offset": 1000
              }
          },
              {
              "partition": {
                "kafka_partition": 3,
                "kafka_topic": "topic-1"
              },
              "offset": {
                "kafka_offset": 280
              }
          }
      ]
  }

Response:

Successful calls return HTTP 200 with a JSON payload that describes the offset.

{
  "name": "confluent-s3-sink",
  "config": {
    "name" : "confluent-s3-sink",
    "connector.class": "S3_SINK",
    "kafka.auth.mode": "KAFKA_API_KEY",
    "kafka.api.key": "<my-kafka-api-key>",
    "kafka.api.secret": "<my-kafka-api-secret>",
    "aws.access.key.id" : "<my-aws-access-key>",
    "aws.secret.access.key": "<my-aws-access-key-secret>",
    "input.data.format": "JSON",
    "output.data.format": "JSON",
    "compression.codec": "JSON - gzip",
    "s3.compression.level": "6",
    "s3.bucket.name": "<my-bucket-name>",
    "time.interval" : "HOURLY",
    "flush.size": "1000",
    "tasks.max" : "1",
    "topics": "<topic-1>, <topic-2>"
  },
  "tasks": [],
  "offsets": [
      {
          "partition": {
            "kafka_partition": 2,
            "kafka_topic": "topic-2"
          },
          "offset": {
            "kafka_offset": 1000
          }
      },
          {
          "partition": {
            "kafka_partition": 3,
            "kafka_topic": "topic-1"
          },
          "offset": {
            "kafka_offset": 280
          }
      }
  ]
}