Salesforce CDC Source Connector for Confluent Cloud

The fully-managed Salesforce Change Data Capture (CDC) Source connector for Confluent Cloud provides a way to monitor Salesforce records. Salesforce sends a notification when a change to a Salesforce record occurs as part of a create, update, delete, or undelete operation. The Salesforce CDC Source connector can be used to capture these change events and write them to an Apache Kafka® topic.

Note

Features

The Salesforce CDC Source connector provides the following features:

  • Salesforce Streaming API: This connector uses the Salesforce Streaming API (Change Data Capture). Changes captured include new records, updates to existing records, record deletions, and record undeletions.

  • Support for single entity channels: The connector supports single entity channels like the LeadChangeEvent channel.

  • Support for multiple entity channels: The connector supports multiple entity channels like the ChangeEvents Standard Channel or a Custom Channel like LeadCustom__chn.

  • Initial start: Captures the latest changes or all changes over the last 72 hours.

  • Data formats: The connector supports Avro, JSON Schema, Protobuf, JSON (schemaless), or SF_API output data. In SF_API format the record is formatted identically to the Salesforce message received by the connector and the messages are ingested as raw bytes without any schema. Schema Registry must be enabled to use a Schema Registry-based format (for example, Avro, JSON_SR (JSON Schema), or Protobuf). See Schema Registry Enabled Environments for additional information.

  • Topics created automatically: The connector can automatically create Kafka topics. When using multiple entity channels with the connector, you can add ${_ObjectType} to the topic name to create different topic names based on the entity name.

  • Tasks per connector: Organizations can run multiple connectors with a limit of one task per connector (that is, "tasks.max": "1").

  • Offset management capabilities: Supports offset management. For more information, see Manage custom offsets.

  • Client-side encryption (CSFLE and CSPE) support: The connector supports CSFLE and CSPE for sensitive data. For more information about CSFLE or CSPE setup, see the connector configuration.

  • Supports Salesforce enriched events : The connector supports enriching Salesforce CDC events with additional fields on both custom channels (for example, /data/SalesEvents__chn) and the standard /data/ChangeEvents channel, provided event enrichment is configured in Salesforce for the associated entities. For more information, see Salesforce’s CDC event enrichment documentation.

  • Supports Client Credentials flow: The connector supports authentication using the Client Credentials flow that enables connecting to Salesforce without exposing the user credentials. To use CLIENT_CREDENTIALS grant type, you must enable the Client Credentials flow in your connected Salesforce application and assign an integration user.

For more information and examples to use with the Confluent Cloud API for Connect, see the Confluent Cloud API for Connect Usage Examples section.

Limitations

Be sure to review the following information.

Manage custom offsets

You can manage the offsets for this connector. Offsets provide information on the point in the system from which the connector is accessing data. For more information, see Manage Offsets for Fully-Managed Connectors in Confluent Cloud.

To manage offsets:

To get the current offset, make a GET request that specifies the environment, Kafka cluster, and connector name.

GET /connect/v1/environments/{environment_id}/clusters/{kafka_cluster_id}/connectors/{connector_name}/offsets
Host: https://api.confluent.cloud

Response:

Successful calls return HTTP 200 with a JSON payload that describes the offset.

{
    "id": "lcc-example123",
    "name": "{connector_name}",
    "offsets": [
      {
        "partition": {},
        "offset": {
          "replayId": 75314157
        }
      }
    ],
    "metadata": {
        "observed_at": "2024-03-28T17:57:48.139635200Z"
    }
}

Responses include the following information:

  • The position of latest offset - represented by replayId.

  • The observed time of the offset in the metadata portion of the payload. The observed_at time indicates a snapshot in time for when the API retrieved the offset. A running connector is always updating its offsets. Use observed_at to get a sense for the gap between real time and the time at which the request was made. By default, offsets are observed every minute. Calling get repeatedly will fetch more recently observed offsets.

  • Information about the connector.

To update the offset, make a POST request that specifies the environment, Kafka cluster, and connector name. Include a JSON payload that specifies new offset and a patch type.

POST /connect/v1/environments/{environment_id}/clusters/{kafka_cluster_id}/connectors/{connector_name}/offsets/request
Host: https://api.confluent.cloud

 {
     "type": "PATCH",
     "offsets": [
         {
             "partition": {},
             "offset": {
                 "replayId": 75314147
             }
         }
     ]
 }

Considerations:

  • You can only make one offset change at a time for a given connector.

  • This is an asynchronous request. To check the status of this request, you must use the check offset status API. For more information, see Get the status of an offset request.

  • For source connectors, the connector attempts to read from the position defined by the requested offsets.

  • replayID is equal to the ReplayID value for an event. For example, assume the ReplayID for an event is 1234. The connector would start consuming all the events after the event with ReplayID 1234.

  • replayID is available in each Kafka record.

  • To consume all events within retention window, set replayID to -2. Salesforce describes the behavior of this option as subscribers receive all events, including past events that are within the retention window and new events. This means that the connector resets the offsets to pick all the events that are within the 72-hour Salesforce retention window and new events. For more information, see Message Durability in the Salesforce documentation.

  • Events outside the Salesforce retention period (72 hours) are discarded.

Response:

Successful calls return HTTP 202 Accepted with a JSON payload that describes the offset.

{
    "id": "lcc-example123",
    "name": "{connector_name}",
    "offsets": [
      {
        "partition": {},
        "offset": {
          "replayId": 75314147
        }
      }
    ],
    "requested_at": "2024-03-28T17:58:45.606796307Z",
    "type": "PATCH"
}

Responses include the following information:

  • The requested position of the offsets in the source.

  • The time of the request to update the offset.

  • Information about the connector.

To delete the offset, make a POST request that specifies the environment, Kafka cluster, and connector name. Include a JSON payload that specifies the delete type.

 POST /connect/v1/environments/{environment_id}/clusters/{kafka_cluster_id}/connectors/{connector_name}/offsets/request
 Host: https://api.confluent.cloud

{
  "type": "DELETE"
}

Considerations:

  • Delete requests delete the offset for the provided partition and reset to the base state. A delete request is as if you created a fresh new connector.

  • This is an asynchronous request. To check the status of this request, you must use the check offset status API. For more information, see Get the status of an offset request.

  • Do not issue delete and patch requests at the same time.

  • For source connectors, the connector attempts to read from the position defined in the base state.

Response:

Successful calls return HTTP 202 Accepted with a JSON payload that describes the result.

{
  "id": "lcc-example123",
  "name": "{connector_name}",
  "offsets": [],
  "requested_at": "2024-03-28T17:59:45.606796307Z",
  "type": "DELETE"
}

Responses include the following information:

  • Empty offsets.

  • The time of the request to delete the offset.

  • Information about Kafka cluster and connector.

  • The type of request.

To get the status of a previous offset request, make a GET request that specifies the environment, Kafka cluster, and connector name.

GET /connect/v1/environments/{environment_id}/clusters/{kafka_cluster_id}/connectors/{connector_name}/offsets/request/status
Host: https://api.confluent.cloud

Considerations:

  • The status endpoint always shows the status of the most recent PATCH/DELETE operation.

Response:

Successful calls return HTTP 200 with a JSON payload that describes the result. The following is an example of an applied patch.

{
   "request": {
      "id": "lcc-example123",
      "name": "{connector_name}",
      "offsets": [
        {
          "partition": {},
          "offset": {
            "replayId": 75314150
          }
        }
      ],
      "requested_at": "2024-03-28T17:58:45.606796307Z",
      "type": "PATCH"
   },
   "status": {
      "phase": "APPLIED",
      "message": "The Connect framework-managed offsets for this connector have been altered successfully. However, if this connector manages offsets externally, they will need to be manually altered in the system that the connector uses."
   },
   "previous_offsets": [
     {
       "partition": {},
       "offset": {
         "replayId": 75314147
       }
     }
   ],
   "applied_at": "2024-03-28T17:58:48.079141883Z"
}

Responses include the following information:

  • The original request, including the time it was made.

  • The status of the request: applied, pending, or failed.

  • The time you issued the status request.

  • The previous offsets. These are the offsets that the connector last updated prior to updating the offsets. Use these to try to restore the state of your connector if a patch update causes your connector to fail or to return a connector to its previous state after rolling back.

JSON payload

The table below offers a description of the unique fields in the JSON payload for managing offsets of the Salesforce CDC Source connector.

Field

Definition

Required/Optional

replayId

The ReplayId field value, which is populated by the Salesforce system refers to the position of the event in the event stream. Replay ID values are not guaranteed to be contiguous for consecutive events. For more information, see Message Durability in the Salesforce documentation.

Required

Quick Start

Use this quick start to get up and running with the Salesforce CDC Source connector. The quick start provides the basics of selecting the connector and configuring it to monitor changes.

Prerequisites
  • Authorized access to a Confluent Cloud cluster on Amazon Web Services (AWS), Microsoft Azure (Azure), or Google Cloud.

  • The Confluent CLI installed and configured for the cluster. See Install the Confluent CLI.

  • Schema Registry must be enabled to use a Schema Registry-based format (for example, Avro, JSON_SR (JSON Schema), or Protobuf). See Schema Registry Enabled Environments for additional information.

  • Salesforce must be configured for CDC. See the Salesforce Change Data Capture Developer Guide.

  • The Salesforce user account configured for the connector must have permission to View All Data, in addition to the permissions listed below. For details, see Required Permissions for Change Events Received by CometD Subscribers.

  • For networking considerations, see Networking and DNS. To use a set of public egress IP addresses, see Public Egress IP Addresses for Confluent Cloud Connectors.

  • The connector uses the Salesforce SOAP client for multiple entity channels. Before using multiple entity channels, you must enable Salesforce SOAP APIs using the API Enabled option in the Salesforce account. Be sure your organization’s firewall rules (if applicable) allow the connector to communicate with the SOAP client.

  • Before configuring the connector, ensure your Salesforce environment meets the following requirements:

    • User permissions: The Salesforce user account must have the following permissions: View All Data, API Enabled, View Setup and Configuration, and Read access to the target objects.

    • CDC configuration: Explicitly enable CDC for all objects you intend to capture.

  • Kafka cluster credentials. The following lists the different ways you can provide credentials.

    • Enter an existing service account resource ID.

    • Create a Confluent Cloud service account for the connector. Make sure to review the ACL entries required in the service account documentation. Some connectors have specific ACL requirements.

    • Create a Confluent Cloud API key and secret. To create a key and secret, you can use confluent api-key create or you can autogenerate the API key and secret directly in the Cloud Console when setting up the connector.

Using the Confluent Cloud Console

Step 1: Launch your Confluent Cloud cluster

To create and launch a Kafka cluster in Confluent Cloud, see Create a kafka cluster in Confluent Cloud.

Step 2: Add a connector

In the left navigation menu, click Connectors. If you already have connectors in your cluster, click + Add connector.

Step 3: Select your connector

Click the Salesforce CDC Source connector card.

Salesforce CDC Source Connector Card

Step 4: Enter the connector details

Note

  • Make sure you have all your prerequisites completed.

  • An asterisk ( * ) designates a required entry.

At the Add Salesforce CDC Source Connector screen, complete the following:

Enter a topic name.

The connector can automatically create Kafka topics. When using multiple entity channels (MULTI) with the connector, you can add ${_ObjectType} to the topic name to create different topic names based on the entity name.

  1. Select the way you want to provide Kafka Cluster credentials. You can choose one of the following options:

    • My account: This setting allows your connector to globally access everything that you have access to. With a user account, the connector uses an API key and secret to access the Kafka cluster. This option is not recommended for production.

    • Service account: This setting limits the access for your connector by using a service account. This option is recommended for production.

    • Use an existing API key: This setting allows you to specify an API key and a secret pair. You can use an existing pair or create a new one. This method is not recommended for production environments.

    Note

    Freight clusters support only service accounts for Kafka authentication.

  2. Click Continue.

  1. Configure the authentication properties:

    • Salesforce grant type: Sets the authentication grant type to PASSWORD , JWT_BEARER (Salesforce JSON Web Token (JWT)) or CLIENT_CREDENTIALS. Defaults to PASSWORD.

    Salesforce details

    • Salesforce instance: The URL of the Salesforce endpoint to use. The default is https://login.salesforce.com. This directs the connector to use the endpoint specified in the authentication response.

    • Salesforce username: The Salesforce username for the connector to use.

    • Salesforce password: The Salesforce password for the connector to use.

    • Salesforce password token: The Salesforce security token associated with the username.

    • Salesforce consumer key: The consumer key for the OAuth application.

    • Salesforce consumer secret: The consumer secret for the OAuth application.

    • Salesforce JWT keystore file: If using the grant type JWT_BEARER, upload the JWT keystore file.

    • Salesforce JWT keystore password: The password used to access the JWT keystore file.

    Note

    The following properties are used based on the Salesforce grant type you choose.

    • JWT_BEARER: Requires username, consumer key, JWT keystore file, and JWT keystore password.

    • PASSWORD: Requires username, password, password token, consumer key, and consumer secret.

    • CLIENT_CREDENTIALS: Requires consumer key, consumer secret (client ID and client secret of a Salesforce connected application) and Salesforce domain URL in Salesforce instance option. The default value https://login.salesforce.com does not work for this option. To use CLIENT_CREDENTIALS, you must enable the Client Credentials flow in your connected Salesforce application and assign an integration user.

  2. Click Continue.

  • Select output record value format: Sets the output Kafka record value format. Valid values are AVRO, JSON_SR, PROTOBUF, JSON, or SF_API. Note that you need to have Confluent Cloud Schema Registry configured if using a schema-based message format like AVRO, JSON_SR, and PROTOBUF. When SF_API is selected, the record is identical in format to the Salesforce message as received by the connector. Note that in SF_API, messages are ingested as raw bytes without any schema.

  • Salesforce Channel Type: The type of Salesforce CDC channel from which the connector consumes events. The value can be SINGLE or MULTI. Use SINGLE for a single entity channel like LeadChangeEvent. Use MULTI for the Standard (ChangeEvents) channel or a Custom channel like LeadCustom__chn.

  • Salesforce Channel Entities List: The comma-separated list of entities in the standard or custom channel. For example, LeadChangeEvent, AccountChangeEvent.

Salesforce details

  • Salesforce CDC name: The Salesforce Change Data Capture event name to subscribe to.

Data encryption

  • Enable Client-Side Field Level Encryption for data encryption. Specify a Service Account to access the Schema Registry and associated encryption rules or keys with that schema. For more information on CSFLE or CSPE setup, see Manage encryption for connectors.

Show advanced configurations
  • Schema context: Select a schema context to use for this connector, if using a schema-based data format. This property defaults to the Default context, which configures the connector to use the default schema set up for Schema Registry in your Confluent Cloud environment. A schema context allows you to use separate schemas (like schema sub-registries) tied to topics in different Kafka clusters that share the same Schema Registry environment. For example, if you select a non-default context, a Source connector uses only that schema context to register a schema and a Sink connector uses only that schema context to read from. For more information about setting up a schema context, see What are schema contexts and when should you use them?.

Additional Configs

  • Value Converter Decimal Format: Specify the JSON/JSON_SR serialization format for Connect DECIMAL logical type values with two allowed literals: BASE64 to serialize DECIMAL logical types as base64 encoded binary data and NUMERIC to serialize Connect DECIMAL logical type values in JSON/JSON_SR as a number representing the decimal value.

  • value.converter.replace.null.with.default: Whether to replace fields that have a default value and that are null to the default value. When set to true, the default value is used, otherwise null is used. Applicable for JSON Converter.

  • Key Converter Schema ID Serializer: The class name of the schema ID serializer for keys. This is used to serialize schema IDs in the message headers.

  • Value Converter Reference Subject Name Strategy: Set the subject reference name strategy for value. Valid entries are DefaultReferenceSubjectNameStrategy or QualifiedReferenceSubjectNameStrategy. Note that the subject reference name strategy can be selected only for PROTOBUF format with the default strategy being DefaultReferenceSubjectNameStrategy.

  • value.converter.schemas.enable: Include schemas within each of the serialized values. Input messages must contain schema and payload fields and may not contain additional fields. For plain JSON data, set this to false. Applicable for JSON Converter.

  • errors.tolerance: Use this property if you would like to configure the connector’s error handling behavior. WARNING: This property should be used with CAUTION for SOURCE CONNECTORS as it may lead to dataloss. If you set this property to ‘all’, the connector will not fail on errant records, but will instead log them (and send to DLQ for Sink Connectors) and continue processing. If you set this property to ‘none’, the connector task will fail on errant records.

  • Value Converter Connect Meta Data: Allow the Connect converter to add its metadata to the output schema. Applicable for Avro Converters.

  • Value Converter Value Subject Name Strategy: Determines how to construct the subject name under which the value schema is registered with Schema Registry.

  • Key Converter Key Subject Name Strategy: How to construct the subject name for key schema registration.

  • value.converter.ignore.default.for.nullables: When set to true, this property ensures that the corresponding record in Kafka is NULL, instead of showing the default column value. Applicable for AVRO,PROTOBUF and JSON_SR Converters.

  • Value Converter Schema ID Serializer: The class name of the schema ID serializer for values. This is used to serialize schema IDs in the message headers.

  • invalid.replay.id.behaviour: Determine if the connector should fallback to fetching all or latest events if invalid or expired replayId is provided.

    NOTE: Salesforce only retains events for 24 hours (standard-volume) or 72 hours (high-volume or CDC), and events outside this window are unrecoverable regardless of fallback mode.

Auto-restart policy

  • Enable Connector Auto-restart: Control the auto-restart behavior of the connector and its task in the event of user-actionable errors. Defaults to true, enabling the connector to automatically restart in case of user-actionable errors. Set this property to false to disable auto-restart for failed connectors. In such cases, you would need to manually restart the connector.

Connection details

  • Salesforce initial start: Specifies the initial starting point for the connector. Allowed values are latest and all. The default value is latest.

  • Max Retry Time in Milliseconds: In case of error when making a request to Salesforce, the connector retries until this time (in milliseconds) elapses.

  • Connection Max Message Size: The maximum message size in bytes that is accepted during a poll on the Salesforce streaming endpoint.

Transforms

Processing position

  • Set offsets: Click Set offsets to define a specific offset for this connector to begin procession data from. For more information on managing offsets, see Manage offsets.

For all property values and definitions, see Configuration Properties .

  • Click Continue.

Based on the number of topic partitions you select, you will be provided with a recommended number of tasks.

  1. To change the number of tasks, use the Range Slider to select the desired number of tasks.

  2. Click Continue.

  1. Verify the connection details by previewing the running configuration.

    Tip

    For information about previewing your connector output, see Data Previews for Confluent Cloud Connectors.

  2. After you’ve validated that the properties are configured to your satisfaction, click Launch.

    The status for the connector should go from Provisioning to Running.

Step 5: Check the Kafka topic

After the connector is running, verify that messages are populating your Kafka topic.

For more information and examples to use with the Confluent Cloud API for Connect, see the Confluent Cloud API for Connect Usage Examples section.

Using the Confluent CLI

Complete the following steps to set up and run the connector using the Confluent CLI.

Note

Make sure you have all your prerequisites completed.

Step 1: List the available connectors

Enter the following command to list available connectors:

confluent connect plugin list

Step 2: List the connector configuration properties

Enter the following command to show the connector configuration properties:

confluent connect plugin describe <connector-plugin-name>

The command output shows the required and optional configuration properties.

Step 3: Create the connector configuration file

Create a JSON file that contains the connector configuration properties. The following example shows the required connector properties.

{
  "connector.class": "SalesforceCdcSource",
  "name": "SalesforceCdcSourceConnector_0",
  "kafka.auth.mode": "KAFKA_API_KEY",
  "kafka.api.key": "****************",
  "kafka.api.secret": "****************************************************************",
  "kafka.topic": "AccountChangeEvent",
  "salesforce.grant.type": "PASSWORD",
  "salesforce.instance": "https://login.salesforce.com",
  "salesforce.username": "<my-username>",
  "salesforce.password": "**************",
  "salesforce.password.token": "************************",
  "salesforce.consumer.key": "*************************************************************************************",
  "salesforce.consumer.secret": "****************************************************************",
  "salesforce.cdc.name": "AccountChangeEvent",
  "output.data.format": "JSON",
  "tasks.max": "1"
}

Note the following property definitions:

  • "connector.class": Identifies the connector plugin name.

  • "name": Sets a name for your new connector.

  • "kafka.auth.mode": Identifies the connector authentication mode you want to use. There are two options: SERVICE_ACCOUNT or KAFKA_API_KEY (the default). To use an API key and secret, specify the configuration properties kafka.api.key and kafka.api.secret, as shown in the example configuration (above). To use a service account, specify the Resource ID in the property kafka.service.account.id=<service-account-resource-ID>. To list the available service account resource IDs, use the following command:

    confluent iam service-account list
    

    For example:

    confluent iam service-account list
    
       Id     | Resource ID |       Name        |    Description
    +---------+-------------+-------------------+-------------------
       123456 | sa-l1r23m   | sa-1              | Service account 1
       789101 | sa-l4d56p   | sa-2              | Service account 2
    
  • ""kafka.topic": Enter a Kafka topic name. When using multiple entity channels with the connector, you can add ${_ObjectType} to the topic name to create different topic names based on the entity name.

  • "salesforce.grant.type": Sets the authentication grant type to PASSWORD (username+password) , JWT_BEARER (Salesforce JSON Web Token (JWT)) or CLIENT_CREDENTIALS. Defaults to PASSWORD.

    Note

    The following properties are used based on the Salesforce grant type you choose.

    • JWT_BEARER: Requires username, consumer key, JWT keystore file, and JWT keystore password.

    • PASSWORD: Requires username, password, password token, consumer key, and consumer secret.

    • CLIENT_CREDENTIALS: Requires consumer key, consumer secret (client ID and client secret of a Salesforce connected application) and Salesforce domain URL in Salesforce instance option. The default value https://login.salesforce.com does not work for this option. To use CLIENT_CREDENTIALS, you must enable the Client Credentials flow in your connected Salesforce application and assign an integration user.

  • "salesforce.username": The Salesforce username for the connector to use.

  • "salesforce.password": The Salesforce username password.

  • "salesforce.password.token": The Salesforce security token associated with the username.

  • "salesforce.consumer.key": The consumer key for the OAuth application.

  • "salesforce.consumer.secret": The consumer secret for the OAuth application.

  • "salesforce.jwt.keystore.file": Salesforce JWT keystore file. The JWT keystore file is a binary file and you supply the contents of the file in the property encoded in Base64. To use the salesforce.jwt.keystore.file property, encode the keystore contents in Base64, take the encoded string, add the data:text/plain:base64 prefix, and then use the entire string as the property entry. For example:

    "salesforce.jwt.keystore.file" : "data:text/plain;base64,/u3+7QAAAAIAAAACAAAAGY2xpZ...==",
    "salesforce.jwt.keystore.password" : "<password>",
    

    Note

    You only need to convert to Base64 when deploying the connector using the Confluent REST API and Confluent CLI. If you update the JKS file directly from the Confluent Cloud Console, Base64 conversion is not required.

  • "salesforce.jwt.keystore.password": Enter the password used to access the JWT keystore file.

  • "salesforce.cdc.name": The Salesforce Change Data Capture event name to subscribe to.

  • "output.data.format": Sets the output Kafka record value format (data coming from the connector). Valid entries are AVRO, JSON_SR, PROTOBUF, JSON, or SF_API. You must have Confluent Cloud Schema Registry configured if using a schema-based message format (for example, Avro, JSON_SR (JSON Schema), or Protobuf). Note that if you select SF_API, records are ingested as raw bytes and the record format is identical to the salesforce message format. For additional information, see Schema Registry Enabled Environments.

  • "tasks.max": Enter the number of tasks in use by the connector. Organizations can run multiple connectors with a limit of one task per connector (that is, "tasks.max": "1").

Note

To enable CSFLE or CSPE for data encryption, specify the following properties:

  • csfle.enabled: Flag to indicate whether the connector honors CSFLE or CSPE rules.

  • sr.service.account.id: A Service Account to access the Schema Registry and associated encryption rules or keys with that schema.

For more information on CSFLE or CSPE setup, see Manage encryption for connectors.

Single Message Transforms: See the Single Message Transforms (SMT) documentation for details about adding SMTs using the CLI.

See Configuration Properties for all property values and definitions.

Step 4: Load the properties file and create the connector

Enter the following command to load the configuration and start the connector:

confluent connect cluster create --config-file <file-name>.json

For example:

confluent connect cluster create --config-file salesforce-cdc-source.json

Example output:

Created connector SalesforceCdcSourceConnector_0 lcc-ix4dl

Step 5: Check the connector status

Enter the following command to check the connector status:

confluent connect cluster list

Example output:

ID          |            Name                  | Status  |  Type
+-----------+----------------------------------+---------+-------+
lcc-ix4dl   | SalesforceCdcSourceConnector_0   | RUNNING | source

Step 6: Check the Kafka topic.

After the connector is running, verify that messages are populating your Kafka topic.

For more information and examples to use with the Confluent Cloud API for Connect, see the Confluent Cloud API for Connect Usage Examples section.

Configuration Properties

Use the following configuration properties with the fully-managed connector. For self-managed connector property definitions and other details, see the connector docs in Self-managed connectors for Confluent Platform.

How should we connect to your data?

name

Sets a name for your connector.

  • Type: string

  • Valid Values: A string at most 64 characters long

  • Importance: high

Kafka Cluster credentials

kafka.auth.mode

Kafka Authentication mode. It can be one of KAFKA_API_KEY or SERVICE_ACCOUNT. It defaults to KAFKA_API_KEY mode, whenever possible.

  • Type: string

  • Valid Values: SERVICE_ACCOUNT, KAFKA_API_KEY

  • Importance: high

kafka.api.key

Kafka API Key. Required when kafka.auth.mode==KAFKA_API_KEY.

  • Type: password

  • Importance: high

kafka.service.account.id

The Service Account that will be used to generate the API keys to communicate with Kafka Cluster.

  • Type: string

  • Importance: high

kafka.api.secret

Secret associated with Kafka API key. Required when kafka.auth.mode==KAFKA_API_KEY.

  • Type: password

  • Importance: high

Which topic do you want to send data to?

kafka.topic

Identifies the topic name to write the data to.

  • Type: string

  • Importance: high

Schema Config

schema.context.name

Add a schema context name. A schema context represents an independent scope in Schema Registry. It is a separate sub-schema tied to topics in different Kafka clusters that share the same Schema Registry instance. If not used, the connector uses the default schema configured for Schema Registry in your Confluent Cloud environment.

  • Type: string

  • Default: default

  • Importance: medium

How should we connect to Salesforce?

salesforce.grant.type

Salesforce grant type. Valid options are ‘PASSWORD’, ‘CLIENT_CREDENTIALS’ and ‘JWT_BEARER’.

  • Type: string

  • Default: PASSWORD

  • Importance: high

salesforce.instance

The URL of the Salesforce endpoint to use. When using ‘CLIENT_CREDENTIALS’ grant type, provide your Salesforce domain URL. The default is https://login.salesforce.com, which directs the connector to use the endpoint specified in the authentication response.

salesforce.username

The Salesforce username the connector should use.

  • Type: string

  • Importance: high

salesforce.channel.type

Indicates the type of Salesforce CDC channel from which the connector shall consume the events. The value can be SINGLE or MULTI. SINGLE should be used for a single entity channel like LeadChangeEvent. MULTI should be used for the Standard (ChangeEvents) channel or a Custom channel like LeadCustom__chn.

  • Type: string

  • Importance: high

salesforce.password

The Salesforce password the connector should use.

  • Type: password

  • Importance: high

salesforce.cdc.name

The Salesforce Change Data Capture event name to subscribe to.

  • Type: string

  • Importance: high

salesforce.password.token

The Salesforce security token associated with the username.

  • Type: password

  • Importance: high

salesforce.consumer.key

The client id(consumer key) for the Salesforce Connected app.

  • Type: password

  • Importance: high

salesforce.channel.entities

Comma seperated list of entities in the standard or custom channel. Eg LeadChangeEvent, AccountChangeEvent.

  • Type: list

  • Importance: medium

salesforce.consumer.secret

The client secret(consumer secret) for the Salesforce Connected app.

  • Type: password

  • Importance: medium

salesforce.jwt.keystore.file

Salesforce JWT keystore file which contains the private key.

  • Type: password

  • Default: [hidden]

  • Importance: medium

salesforce.jwt.keystore.password

Password used to access JWT keystore file.

  • Type: password

  • Importance: medium

Connection details

salesforce.initial.start

Specify the initial starting point for the connector for replaying events.

  • Type: string

  • Default: latest

  • Importance: high

connection.timeout

The amount of time to wait in milliseconds while connecting to the Salesforce streaming endpoint.

  • Type: long

  • Default: 30000

  • Importance: low

request.max.retries.time.ms

In case of error when making a request to Salesforce, the connector will retry until this time (in ms) elapses. The default value is 30000 (30 seconds). Minimum value is 1 sec

  • Type: long

  • Default: 30000 (30 seconds)

  • Valid Values: [1000,…,250000]

  • Importance: low

connection.max.message.size

The maximum message size in bytes that is accepted during a long poll on the Salesforce streaming endpoint.

  • Type: int

  • Default: 1048576

  • Valid Values: [1048576,…,104857600]

  • Importance: low

Output messages

output.data.format

Sets the output Kafka record value format. Valid entries are AVRO, JSON_SR, PROTOBUF, JSON or SF_API. Note that you need to have Confluent Cloud Schema Registry configured if using a schema-based message format like AVRO, JSON_SR, and PROTOBUF. When SF_API is selected, the record will be identical in format to the salesforce message as received by the connector. Note that in SF_API, messages are ingested as raw bytes without any schema.

  • Type: string

  • Default: JSON

  • Importance: high

convert.changed.fields

Whether to convert field names within changed fields section of the ChangeEventHeader to match field names present on the Kafka record.

  • Type: boolean

  • Default: false

  • Importance: low

Number of tasks for this connector

tasks.max

Maximum number of tasks for the connector.

  • Type: int

  • Valid Values: [1,…,1]

  • Importance: high

Additional Configs

header.converter

The converter class for the headers. This is used to serialize and deserialize the headers of the messages.

  • Type: string

  • Importance: low

producer.override.compression.type

The compression type for all data generated by the producer. Valid values are none, gzip, snappy, lz4, and zstd.

  • Type: string

  • Importance: low

producer.override.linger.ms

The producer groups together any records that arrive in between request transmissions into a single batched request. More details can be found in the documentation: https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html#linger-ms.

  • Type: long

  • Valid Values: [100,…,1000]

  • Importance: low

value.converter.allow.optional.map.keys

Allow optional string map key when converting from Connect Schema to Avro Schema. Applicable for Avro Converters.

  • Type: boolean

  • Importance: low

value.converter.auto.register.schemas

Specify if the Serializer should attempt to register the Schema.

  • Type: boolean

  • Importance: low

value.converter.connect.meta.data

Allow the Connect converter to add its metadata to the output schema. Applicable for Avro Converters.

  • Type: boolean

  • Importance: low

value.converter.enhanced.avro.schema.support

Enable enhanced schema support to preserve package information and Enums. Applicable for Avro Converters.

  • Type: boolean

  • Importance: low

value.converter.enhanced.protobuf.schema.support

Enable enhanced schema support to preserve package information. Applicable for Protobuf Converters.

  • Type: boolean

  • Importance: low

value.converter.flatten.unions

Whether to flatten unions (oneofs). Applicable for Protobuf Converters.

  • Type: boolean

  • Importance: low

value.converter.generate.index.for.unions

Whether to generate an index suffix for unions. Applicable for Protobuf Converters.

  • Type: boolean

  • Importance: low

value.converter.generate.struct.for.nulls

Whether to generate a struct variable for null values. Applicable for Protobuf Converters.

  • Type: boolean

  • Importance: low

value.converter.int.for.enums

Whether to represent enums as integers. Applicable for Protobuf Converters.

  • Type: boolean

  • Importance: low

value.converter.latest.compatibility.strict

Verify latest subject version is backward compatible when use.latest.version is true.

  • Type: boolean

  • Importance: low

value.converter.object.additional.properties

Whether to allow additional properties for object schemas. Applicable for JSON_SR Converters.

  • Type: boolean

  • Importance: low

value.converter.optional.for.nullables

Whether nullable fields should be specified with an optional label. Applicable for Protobuf Converters.

  • Type: boolean

  • Importance: low

value.converter.optional.for.proto2

Whether proto2 optionals are supported. Applicable for Protobuf Converters.

  • Type: boolean

  • Importance: low

value.converter.scrub.invalid.names

Whether to scrub invalid names by replacing invalid characters with valid characters. Applicable for Avro and Protobuf Converters.

  • Type: boolean

  • Importance: low

value.converter.use.latest.version

Use latest version of schema in subject for serialization when auto.register.schemas is false.

  • Type: boolean

  • Importance: low

value.converter.use.optional.for.nonrequired

Whether to set non-required properties to be optional. Applicable for JSON_SR Converters.

  • Type: boolean

  • Importance: low

value.converter.wrapper.for.nullables

Whether nullable fields should use primitive wrapper messages. Applicable for Protobuf Converters.

  • Type: boolean

  • Importance: low

value.converter.wrapper.for.raw.primitives

Whether a wrapper message should be interpreted as a raw primitive at root level. Applicable for Protobuf Converters.

  • Type: boolean

  • Importance: low

errors.tolerance

Use this property if you would like to configure the connector’s error handling behavior. WARNING: This property should be used with CAUTION for SOURCE CONNECTORS as it may lead to dataloss. If you set this property to ‘all’, the connector will not fail on errant records, but will instead log them (and send to DLQ for Sink Connectors) and continue processing. If you set this property to ‘none’, the connector task will fail on errant records.

  • Type: string

  • Default: none

  • Importance: low

key.converter.key.schema.id.serializer

The class name of the schema ID serializer for keys. This is used to serialize schema IDs in the message headers.

  • Type: string

  • Default: io.confluent.kafka.serializers.schema.id.PrefixSchemaIdSerializer

  • Importance: low

key.converter.key.subject.name.strategy

How to construct the subject name for key schema registration.

  • Type: string

  • Default: TopicNameStrategy

  • Importance: low

value.converter.decimal.format

Specify the JSON/JSON_SR serialization format for Connect DECIMAL logical type values with two allowed literals:

BASE64 to serialize DECIMAL logical types as base64 encoded binary data and

NUMERIC to serialize Connect DECIMAL logical type values in JSON/JSON_SR as a number representing the decimal value.

  • Type: string

  • Default: BASE64

  • Importance: low

value.converter.flatten.singleton.unions

Whether to flatten singleton unions. Applicable for Avro and JSON_SR Converters.

  • Type: boolean

  • Default: false

  • Importance: low

value.converter.ignore.default.for.nullables

When set to true, this property ensures that the corresponding record in Kafka is NULL, instead of showing the default column value. Applicable for AVRO,PROTOBUF and JSON_SR Converters.

  • Type: boolean

  • Default: false

  • Importance: low

value.converter.reference.subject.name.strategy

Set the subject reference name strategy for value. Valid entries are DefaultReferenceSubjectNameStrategy or QualifiedReferenceSubjectNameStrategy. Note that the subject reference name strategy can be selected only for PROTOBUF format with the default strategy being DefaultReferenceSubjectNameStrategy.

  • Type: string

  • Default: DefaultReferenceSubjectNameStrategy

  • Importance: low

value.converter.replace.null.with.default

Whether to replace fields that have a default value and that are null to the default value. When set to true, the default value is used, otherwise null is used. Applicable for JSON Converter.

  • Type: boolean

  • Default: true

  • Importance: low

value.converter.schemas.enable

Include schemas within each of the serialized values. Input messages must contain schema and payload fields and may not contain additional fields. For plain JSON data, set this to false. Applicable for JSON Converter.

  • Type: boolean

  • Default: false

  • Importance: low

value.converter.value.schema.id.serializer

The class name of the schema ID serializer for values. This is used to serialize schema IDs in the message headers.

  • Type: string

  • Default: io.confluent.kafka.serializers.schema.id.PrefixSchemaIdSerializer

  • Importance: low

value.converter.value.subject.name.strategy

Determines how to construct the subject name under which the value schema is registered with Schema Registry.

  • Type: string

  • Default: TopicNameStrategy

  • Importance: low

invalid.replay.id.behaviour

Determine if the connector should fallback to fetching all or latest events if invalid or expired replayId is provided.

NOTE: Salesforce only retains events for 24 hours (standard-volume) or 72 hours (high-volume/CDC), and events outside this window are unrecoverable regardless of fallback mode.

  • Type: string

  • Default: all

  • Importance: medium

Auto-restart policy

auto.restart.on.user.error

Enable connector to automatically restart on user-actionable errors.

  • Type: boolean

  • Default: true

  • Importance: medium

Frequently asked questions

Find answers to frequently asked questions about the Salesforce CDC Source connector for Confluent Cloud.

Authentication and connection

How do I configure CLIENT_CREDENTIALS authentication for the connector?

To use the CLIENT_CREDENTIALS grant type, you must:

  1. Enable the Client Credentials flow in your Salesforce connected application.

  2. Assign an integration user as the Run As user in the connected application policies.

  3. Provide your Salesforce domain URL (My Domain URL) in the salesforce.instance configuration. For example, https://sample.my.salesforce.com. The default value https://login.salesforce.com does not work for CLIENT_CREDENTIALS.

  4. Configure the connector with:

    • salesforce.grant.type: CLIENT_CREDENTIALS

    • salesforce.consumer.key: Client ID of the connected application

    • salesforce.consumer.secret: Client secret of the connected application

    • salesforce.instance: Your Salesforce My Domain URL

If you encounter invalid_grant or no client credentials user enabled errors, verify that the Run As user is properly configured in your connected application’s OAuth policies.

For more information, see Salesforce OAuth 2.0 Client Credentials Flow.

Why am I getting INVALID_SESSION_ID errors with multiple connectors?

This error occurs when multiple Salesforce CDC Source connectors (version 2.x or later) use the same salesforce.username and one connector logs out.

To resolve this issue:

Option 1 (Recommended): Use a unique Salesforce user account (salesforce.username) for each connector. This prevents session conflicts.

Option 2: If you cannot use separate user accounts, consider using the CLIENT_CREDENTIALS grant type, which uses application credentials instead of user sessions.

Configuration

How do I configure the connector for multiple entity channels?

To consume from the standard ChangeEvents channel or a custom channel with multiple entities:

  1. Set salesforce.channel.type to MULTI.

  2. Set salesforce.cdc.name to the channel name:

    • For the standard channel: ChangeEvents

    • For a custom channel: Use the FullName (for example, MyCustomChannel__chn)

  3. Set salesforce.channel.entities to a comma-separated list of change event entities. For example: LeadChangeEvent,AccountChangeEvent,ContactChangeEvent.

  4. Enable change data capture for the corresponding objects in Salesforce (Setup > Change Data Capture > select objects).

  5. Optionally, add ${_ObjectType} to the topic name to create separate topics for each entity.

Note

For custom channels, pre-configure the channel in Salesforce and reference it by its FullName in the connector configuration.

How do I configure the connector to use custom channels?

To use a custom channel for change data capture:

  1. In Salesforce, create a custom channel and add the desired change event entities. Note the channel’s FullName (for example, MyCustomChannel__chn).

  2. In the connector configuration:

    • Set salesforce.channel.type to MULTI.

    • Set salesforce.cdc.name to the custom channel’s FullName (for example, MyCustomChannel__chn).

    • Set salesforce.channel.entities to a comma-separated list of change event entities in the channel (for example, LeadChangeEvent,AccountChangeEvent).

  3. Ensure your Salesforce user has the required permissions:

    • API Enabled

    • View Setup and Configuration

    • View All Data

    • Read access on the objects

For more information, see Salesforce Custom Channels.

What permissions does the Salesforce user need for the connector?

The Salesforce user configured for the connector must have the following permissions:

For single entity channels:
  • API Enabled: Required for all connector operations.

  • View All Data: Required to receive Change Data Capture events. See Required Permissions for Change Events.

  • Read access on the objects being tracked.

For multiple entity channels (standard or custom):
  • API Enabled: Required for all connector operations.

  • View Setup and Configuration: Required for the connector to validate channel configuration using the Tooling API.

  • View All Data: Required to receive Change Data Capture events.

  • Read access on the objects being tracked.

For creating or modifying custom channels:
  • Customize Application: Required to create custom channels or add/remove entities in a custom channel.

Ensure that Change Data Capture is enabled for the objects in Salesforce (Setup > Change Data Capture).

Data ingestion and ReplayID management

What does the replayID was invalid or no longer available warning mean?

This warning appears when the connector attempts to consume events using a replayID that Salesforce no longer recognizes. Common causes include:

  • The replayID is older than the Salesforce retention window (72 hours for Change Data Capture events).

  • The connector was stopped for longer than the retention period.

  • The replayID was manually set to an invalid value.

When this occurs, the connector automatically falls back based on the invalid.replay.id.behaviour configuration:

  • all (default): Resets to replayID -2 to consume all events within the retention window.

  • latest: Resets to replayID -1 to consume only new events from the current point forward.

No action is required unless you want to change this fallback behavior. To customize it, set the invalid.replay.id.behaviour property in your connector configuration.

How do I manage connector offsets using the replayID?

You can manage offsets for the Salesforce CDC Source connector using the Confluent Cloud APIs:

To get the current offset:

GET /connect/v1/environments/{environment_id}/clusters/{kafka_cluster_id}/connectors/{connector_name}/offsets

To update the offset:

POST /connect/v1/environments/{environment_id}/clusters/{kafka_cluster_id}/connectors/{connector_name}/offsets/request
{
   "type": "PATCH",
   "offsets": [
      {
          "partition": {},
          "offset": {
             "replayId": 75314147
         }
      }
     ]
}

Key considerations:

  • The connector consumes all events after the specified replayID. For example, if replayID is 1234, the connector starts from the next event.

  • To consume all events within the retention window, set replayID to -2.

  • To consume only new events, set replayID to -1.

  • Events outside the 72-hour Salesforce retention window are discarded.

  • The replayID value is available in each Kafka record.

For more information, see Manage custom offsets.

What happens if the connector stops for more than 72 hours?

Salesforce retains Change Data Capture events for 72 hours (3 days). If the connector is stopped for longer than this retention period:

  1. Events older than 72 hours are discarded by Salesforce and cannot be recovered.

  2. When the connector restarts, it attempts to resume from the last recorded replayID.

  3. If the replayID is outside the retention window, the connector issues a warning: “The replayID was invalid or no longer available.”

  4. The connector automatically falls back based on the invalid.replay.id.behaviour configuration:

    • all (default): Consumes all events within the current 72-hour retention window (replayID -2).

    • latest: Consumes only new events from the current point forward (replayID -1).

To avoid data loss, ensure the connector runs continuously or is restarted within the 72-hour retention window.

Note

The connector periodically records the replayID of the last event written to Kafka. If the connector stops unexpectedly, some events may be duplicated when it restarts, as it resumes from the last recorded replayID.

Can the connector handle enriched Change Data Capture events?

Yes, the connector supports Salesforce enriched Change Data Capture events. You can enrich CDC events with additional fields on both:

  • Custom channels (for example, /data/SalesEvents__chn)

  • The standard /data/ChangeEvents channel

To use enriched events:

  1. Configure event enrichment in Salesforce for the entities you want to monitor. See Salesforce’s CDC event enrichment documentation.

  2. Configure the connector to consume from the appropriate channel.

  3. The connector automatically includes the enriched fields in the Kafka records.

The enriched fields are added to the change event payload and are available in the Kafka messages produced by the connector.

Troubleshooting

What does Exception while validating change event channel mean?

This error typically appears in one of these forms:

  • 400 Bad Request: No such column FullName on entity PlatformEventChannel.

  • sObject type PlatformEventChannel is not supported.

Common causes and resolutions:

  1. Using the wrong entity names: For Change Data Capture, you must specify change event entities (for example, AccountChangeEvent, ContactChangeEvent), not object names (for example, Account, Contact). Verify that salesforce.channel.entities contains the correct change event entity names.

  2. CDC not enabled for objects: Ensure that Change Data Capture is enabled for the objects in Salesforce (Setup > Change Data Capture > select objects).

  3. Missing permissions: Verify that your Salesforce user has View Setup and Configuration and API Enabled permissions when using multiple entity channels or custom channels.

  4. Custom channel not properly configured: If using a custom channel, verify that it is properly created in Salesforce with the correct FullName and that the specified entities are added to the channel.

How does the connector handle Salesforce API errors?

The connector has different retry behaviors based on the error code:

Errors that trigger retries with exponential backoff:
  • 403: “Organization concurrent user limit exceeded”

  • 403: “Organization total events daily limit exceeded”

  • 403: “To protect all customers”

  • 503: “Server is too busy”

Errors that cause the connector to fail immediately:
  • 400: Bad Request (validation errors)

  • 413: Request Entity Too Large

  • 403: “Unable to create channel dynamically”

  • 404: “channel names may not vary only by case”

  • 404: “Unknown channel”

If you encounter persistent API errors, verify your Salesforce API limits and quotas. Contact Salesforce support if you need to increase limits.

The connector retries failed requests for up to 15 minutes (default) using exponential backoff. You can adjust this timeout, but it may cause task failures if exceeded.

Next Steps

For an example that shows fully-managed Confluent Cloud connectors in action with Confluent Cloud for Apache Flink, see the Cloud ETL Demo. This example also shows how to use Confluent CLI to manage your resources in Confluent Cloud.

../_images/topology.png