Migrate Self-Managed Connectors to Fully-Managed Connectors

Migrating self-managed connectors to Confluent Cloud eliminates infrastructure management and standardizes your data pipelines on a high-availability platform. To streamline this transition, Confluent has developed the Connect Migration Utility, an open-source tool designed to automate the migration process.

The utility:

  • Discovers and translates self-managed connector configurations to a fully-managed format.

  • Handles offset migration to ensure data continuity.

  • Provides detailed error reporting and mapping analysis.

  • Supports multiple migration strategies to balance consistency and downtime.

  • Validates configurations and identifies unsupported features.

Pre-migration requirements

Before running the utility, validate the following pre-migration requirements to ensure a smooth migration process:

  • Validate network and security: Establish the network path like PrivateLink or Transit Gateway between your Source or Sink connectors and Confluent Cloud.

  • Verify schema availability: Verify that the required schemas are migrated to Confluent Cloud Schema Registry to maintain data serialization compatibility.

  • Choose a migration strategy: Choose an approach that best fits your availability requirements.
    • Downtime approach: Prevents data loss and duplication but requires a maintenance window.

    • Parallel approach: Ensures zero downtime but requires downstream deduplication.

    • Fresh start approach: Deploys a new connector without migrating existing offsets.

Prerequisites

Ensure your environment meets these requirements to run the utility:

System requirements

  • Python 3.8 or later.

  • Access to your self-managed Kafka Connect worker URLs.

  • Kafka version 3.6 or later (Confluent Platform version 7.6.x or later).

Confluent Cloud requirements

  • An active Confluent Cloud account.

  • A Confluent Cloud environment and Apache Kafka® cluster.

  • API keys for Confluent Cloud authentication:

    • Cloud API key and secret.

    • Kafka cluster API key and secret.

  • Appropriate permissions to create connectors.

For information about creating API keys, see Use API Keys to Authenticate to Confluent Cloud.

Install the utility

To install the Connect Migration Utility:

  1. Clone the Git repository:

    git clone https://github.com/confluentinc/connect-migration-utility.git
    cd connect-migration-utility
    
  2. Install the required dependencies:

    pip install -r requirements.txt
    

Migrate your connectors to Confluent Cloud

Follow these steps to migrate your connectors to Confluent Cloud:

  1. Discover configurations: Run the discovery_script.py script from the Connect Migration Utility to fetch your self-managed connector configurations and translate them into the Confluent Cloud format.

  2. Troubleshoot and fix mapping errors: Analyze the translated configurations for compatibility. Fix any mapping errors and replace unsupported features.

  3. Migrate connectors: Run the migration_connect_script.py script from the Connect Migration Utility to create your new, fully-managed connectors in Confluent Cloud using the validated configurations.

Step 1: Discover configurations

Use the Connect Migration Utility to fetch your existing configurations and translate them into Confluent Cloud format.

The discovery script translates the self-managed connector settings into the fully-managed format and reports errors to help you identify unsupported features before migration.

Select the method that matches how you currently manage your connectors.

Fetch configurations directly from running Kafka Connect workers.

python discovery_script.py \
--worker-urls http://worker1:8083,http://worker2:8083 \
--output-dir ./migration-output
  • If your workers require authentication, add --username <your_username> --password <your_password> or --bearer-token <your_oAuth_token>.

  • If you use self-signed certificates or internal services in a developer environment, add --disable-ssl-verify to disable SSL certificate verification.

Use a single JSON file containing connector configurations:

python discovery_script.py \
--config-file ./connectors-config.json \
--output-dir ./migration-output

The JSON file can contain:

  • A single connector object - {"name": "...", "config": {...}}

  • An array of connector objects - [{"name": "c1", ...}, {"name": "c2", ...}]

  • Nested objects with connectors wrapped in an Info object.

For example JSON files, refer to the GitHub repository.

Store each connector configuration in a separate JSON file within a directory to fetch multiple connectors at once:

python discovery_script.py \
--config-dir ./connector-configs \
--output-dir ./migration-output

For supported command line options with the discovery script, refer to the Discovery script CLI options section.

Note

  • If you are migrating Debezium self-managed connectors, use the --debezium-version option with the discovery script to specify your current connector version. The utility defaults to v2.

  • If you are running v1 connectors, specify --debezium-version v1 to ensure accurate configuration mapping. The utility automatically converts v1 configurations to v2 before mapping to the fully-managed format.

Generate Terraform files

Use the --terraform option with the discovery script to generate a Terraform file for each successful connector configuration in the output directory.

Discovery output

When you run the discovery script, the utility creates a directory structure in your specified output directory with translated configurations and diagnostic information.

Directory structure

The output directory contains the following structure:

output/
├── summary.txt                     # Migration summary report
├── discovered_configs/
│   ├── successful_configs/
│   │   ├── fm_configs/             # Successful fully-managed connector configurations
│   │   └── *.json                  # Combined configurations with warnings (if any)
│   └── unsuccessful_configs/
│       ├── fm_configs/             # Partially translated configurations with errors
│       └── *.json                  # Combined configurations with mapping errors and warnings
└── terraform/                      # Terraform files created when using the --terraform option

Migration summary (summary.txt)

The utility also generates a comprehensive migration summary report that provides:

  • Success and failure statistics.

  • Connector type analysis.

  • Error categorization and counts.

  • Per-cluster breakdown.

  • Mapping error details with occurrence counts.

The following example shows a sample summary output:

===============================================================================
 Overall Summary
===============================================================================
Number of Connector clusters scanned: 2
Total Connector configurations scanned: 25
Total Connectors that can be successfully migrated: 22
Total Connectors that have errors in migration: 3

===============================================================================
 Summary By Connector Type
===============================================================================
✅ Connector types (successful across all clusters):
  - io.confluent.connect.jdbc.JdbcSinkConnector: 15
  - io.confluent.connect.elasticsearch.ElasticsearchSinkConnector: 7

❌ Connector types (with errors across all clusters):
  - io.confluent.connect.jdbc.JdbcSourceConnector: 3

===============================================================================
 Per-Cluster Summary (sorted by successful configurations for migration)
===============================================================================
Cluster Details: cluster-1
  Total Connector configurations scanned: 15
  Total Connectors that can be successfully migrated: 13
    ✅ Connector types (successful):
      - io.confluent.connect.jdbc.JdbcSinkConnector: 10
      - io.confluent.connect.elasticsearch.ElasticsearchSinkConnector: 3
  Total Connectors that have errors in migration: 2
    ❌ Connector types (with errors):
      - io.confluent.connect.jdbc.JdbcSourceConnector: 2
    ⚠️ Mapping errors:
      - 'Transform 'complex_transform' is not supported': found in 2 file(s)

If there are no mapping errors in the connector configuration, proceed with migration using the translated configuration in the successful_configs directory.

Step 2: Troubleshoot and fix mapping errors

If the discovery step identifies mapping errors, review and fix the error details in the partially translated configuration files within the unsuccessful_configs directory before proceeding with migration.

The utility provides detailed error messages to help you understand the issues and potential solutions.

The following example shows a configuration file with mapping errors:

{
  "name": "my-connector",
  "sm_config": { /* original self-managed config */ },
  "config": { /* successfully mapped configuration */ },
  "mapping_errors": [
    "Transform 'unwrap' of type 'io.debezium.transforms.ExtractNewRecordState' is not supported in fully-managed Connector.
    Potentially Custom SMT can be used.",
    "Predicate 'predicate_0' is filtered out because it's associated with an unsupported transform."
  ]
}

For resolutions to common mapping errors and warnings, refer to the GitHub repository.

Step 3: Migrate connectors

After fixing any mapping errors, use the migrate_connector_script.py to migrate your connectors to Confluent Cloud using the translated configurations. Choose between the following approaches:

Choose this approach if your application can tolerate brief downtime. This approach ensures no data loss or duplication during migration.

  1. Stop the self-managed connector.

  2. Fetch its latest offsets from the Connect REST API.

  3. Create the fully-managed connector, passing those offsets as the starting position.

Run the following command:

python src/migrate_connector_script.py --worker-urls "<WORKER_URL>" --cluster-id "<CLUSTER_ID>" --environment-id "<ENVIRONMENT_ID>"
--migration-mode "stop_create_latest_offset" --bearer-token "<BEARER_TOKEN>" --fm-config-dir "<INPUT_FM_CONFIGS_DIR>"
--kafka-api-key "<KAFKA-API-KEY>" --kafka-api-secret "<KAFKA-API-SECRET>" --migration-output-dir "<CREATE_CONNECTOR_OUTPUT_DIRECTORY>"

Choose this approach to maintain continuous data flow by running both connectors in parallel. Use this approach when uptime is critical and you have downstream deduplication capabilities.

  1. Keep self-managed connector running.

  2. Read its offsets and create the fully-managed connector with those offsets.

  3. Run both in parallel and handle data duplication downstream.

Run the following command:

python src/migrate_connector_script.py --worker-urls "<WORKER_URL>" --cluster-id "<CLUSTER_ID>" --environment-id "<ENVIRONMENT_ID>"
--migration-mode "create_latest_offset" --bearer-token "<BEARER_TOKEN>" --fm-config-dir "<INPUT_FM_CONFIGS_DIR>"
--kafka-api-key "<KAFKA-API-KEY>" --kafka-api-secret "<KAFKA-API-SECRET>" --migration-output-dir "<CREATE_CONNECTOR_OUTPUT_DIRECTORY>"

Choose this approach if you want to create a fully-managed connector without stopping the self-managed connector and without using offsets. The connector starts from the beginning or default position.

Run the following command:

python src/migrate_connector_script.py --worker-urls "<WORKER_URL>" --cluster-id "<CLUSTER_ID>" --environment-id "<ENVIRONMENT_ID>"
--migration-mode "create" --bearer-token "<BEARER_TOKEN>" --fm-config-dir "<INPUT_FM_CONFIGS_DIR>"
--kafka-api-key "<KAFKA-API-KEY>" --kafka-api-secret "<KAFKA-API-SECRET>" --migration-output-dir "<CREATE_CONNECTOR_OUTPUT_DIRECTORY>"

For supported command line options with the migration script, refer to the Migration script CLI options section.

Discovery script CLI options

The following table shows the command line options you can use with discovery_script.py:

Option

Description

Required

--config-file

Path to JSON file containing connector configurations

No*

--config-dir

Path to directory with multiple JSON connector configuration files

No*

--worker-urls

Comma-separated list of worker URLs

No*

--worker-urls-file

Path to file containing worker URLs

No*

--worker-username

Username for basic authentication with Connect worker REST API

No

--worker-password

Password for basic authentication with Connect worker REST API

No

--output-dir

Output directory for all files (default: output)

No

--environment-id

Confluent Cloud environment ID

No

--cluster-id

Confluent Cloud Kafka cluster ID

No

--bearer-token

Confluent Cloud bearer token (api_key:api_secret)

No

--prompt-bearer-token

Prompt for bearer token securely

No

--redact

Redact sensitive configurations

No

--sensitive-file

Path to file containing sensitive configuration keys

No

--worker-config-file

Path to file containing additional worker configurations

No

--disable-ssl-verify

Disable SSL certificate verification for HTTPS requests

No

--debezium-version {v1,v2}

Debezium version for CDC template selection (default: v2)

No

--terraform

Generate Terraform files for successful connector configurations

No

*Either --config-file/ --config-dir, or --worker-urls/ --worker-urls-file is required.

Migration script CLI options

The following table shows the command line options you can use with migrate_connector_script.py:

Option

Description

Required

--fm-config-dir

Path of directory with multiple fully-managed connector configuration files

Yes

--worker-urls

Comma-separated list of worker URLs

No1

--worker-username

Username for basic authentication with Connect worker REST API

No

--worker-password

Password for basic authentication with Connect worker REST API

No

--migration-output-dir

Output directory for migration output files (default: migration_output)

No

--environment-id

Confluent Cloud environment ID

Yes

--cluster-id

Confluent Cloud LKC cluster ID

Yes

--bearer-token

Confluent Cloud bearer token (api_key:api_secret)

Yes2

--prompt-bearer-token

Prompt for bearer token securely

Yes2

--disable-ssl-verify

Disable SSL certificate verification for HTTPS requests

No

--migration-mode

Connector migration mode. Options: stop_create_latest_offset, create, create_latest_offset

Yes

--kafka-auth-mode

Current mode for authentication between Kafka client (connector) and Kafka broker. Options: SERVICE_ACCOUNT, KAFKA_API_KEY (default: KAFKA_API_KEY)

No

--kafka-api-key

Kafka API key for authentication

Yes3

--kafka-api-secret

Kafka API secret for authentication

Yes3

--kafka-service-account-id

Service account ID for authentication

Yes3

1 Required when the migration mode is set to stop_create_latest_offset or create_latest_offset.

2 Use one of the two authentication tokens - --bearer-token or --prompt-bearer-token.

3 Use one of the two authentication modes. For KAFKA_API_KEY, provide the Kafka API key and secret. For SERVICE_ACCOUNT, provide the Kafka service account ID.