Salesforce Change Data Capture Source Connector for Confluent Platform

The Kafka Connect Salesforce Change Data Capture Source connector monitors Salesforce records. Salesforce sends a notification when a change to a Salesforce record occurs due to a create, update, delete, or undelete operation. The Kafka Connect Salesforce Change Data Capture Source connector can be used to capture these change events and write them to Kafka topic. This connector can be used with either standalone or distributed Connect workers.

Note

The connector’s salesforce.cdc.name property indicates the name of the change event. In case of standard objects in standard channels, the property indicates the name of the change event, must end with ChangeEvent. In case of custom objects in standard channels, the name of the change event must end with a __ChangeEvent. For example, salesforce.cdc.name=Employee__ChangeEvent. These change events must exist in Salesforce before running the connector. Refer the Salesforce Subscription Guide for more information.

Features

The Salesforce CDC Source Connector includes the following features:

At least once delivery

This connector guarantees that records are delivered at least once to the Kafka topic. If the connector restarts, there may be some duplicate records in the Kafka topic.

Supports one task

The Salesforce CDC Source Connector supports running only one task.

Limitations

The following limitations apply to this Salesforce connector:

  • The connector is limited to one task only.
  • The connector is limited by the allocations and limits of the Salesforce account. For example, free org accounts have a cap on the number of events that can occur within a 24-hour period.
  • The connector requires restarting before publishing a new schema.

Considerations

Note the following when using the Salesforce CDC Events source connector.

Replaying Messages

Change events are published to the event bus, where they are stored temporarily for 72 hours (3 days). When you create a connector, you can control how stored events are treated when the connector starts for the first time.

  1. Consume only new change events after the connector starts up, by setting salesforce.initial.start to latest. This is the default behavior, due largely to API limits.
  2. Consume all of the change events by setting salesforce.initial.start to all.

Restarting

When the connector operates, it periodically records the replay ID of the last record written to Kafka. When the connector is stopped and then restarted within 24 hours (for standard volume messages) or 72 hours (for high volume messages) the connector continues consuming the change events where it stopped, with no missed events. However, if the connector is stopped for more than 24 or 72 hours, some events are discarded in Salesforce before the connector can read them.

If the connector stops unexpectedly due to a failure, it may not record the replay ID of the last record successfully written to Kafka. When the connector restarts, it resumes from the last recorded replay ID. This means that some events may be duplicated in Kafka.

Retries

In case of failures caused by network issues, you can configure the connector to retry Salesforce requests (once authentication succeeds) using request.max.retries.time.ms parameters.

The backoff period for each retry attempt uses a randomization function that grows exponentially. But, if the total time spent retrying the request exceeds this duration (15 minutes by default), retries stop and the request fails. This will likely result in task failure.

Retries Example

  1. Create a sfcdc-source.properties file with the following contents:

    name=RetriesExample
    topics=sfcdc-messages
    tasks.max=1
    connector.class=io.confluent.salesforce.SalesforceCdcSourceConnector
    key.converter=org.apache.kafka.connect.json.JsonConverter
    value.converter=org.apache.kafka.connect.json.JsonConverter
    confluent.topic.bootstrap.servers=localhost:9092
    confluent.topic.replication.factor=1
    salesforce.cdc.name=LeadChangeEvent
    request.max.retries.time.ms=20000
    
  2. Run and validate the connector.

  3. The Connector retries requests for an initial backoff duration of 20000 ms. Retries stop if a Salesforce request is successful.

Note that the default value for request.max.retries.time.ms is 900000 ms (15 minutes).

License

You can use this connector for a 30-day trial period without a license key.

After 30 days, you must purchase a connector subscription which includes Confluent enterprise license keys to subscribers, along with enterprise-level support for Confluent Platform and your connectors. If you are a subscriber, you can contact Confluent Support at support@confluent.io for more information.

For license properties, see Confluent Platform license. For information about the license topic, see License topic configuration.

Configuration Properties

For a complete list of configuration properties for this connector, see Salesforce Change Data Capture Source Connector Configuration Properties.

Install the Salesforce Change Data Capture Connector

You can install this connector by using the Confluent Hub client installation instructions or by manually downloading the ZIP file.

Prerequisites

Important

You must install the connector on every machine where Connect will run.

  • Kafka Broker: Confluent Platform 3.3.0 or later, or Kafka 0.11.0 or later.

  • Connect: Confluent Platform 4.0.0 or later, or Kafka 1.0.0 or later.

  • Salesforce Streaming API version 45.0 or later.

  • The Salesforce user account configured for the connector must have permission to View All Data. For details, see Required Permissions for Change Events Received by CometD Subscribers.

  • Salesforce change notification subscription for Salesforce entities. For details, see Salesforce Change Data Capture Setup.

  • An installation of the Confluent Hub Client. This is installed by default with Confluent Enterprise.

  • An installation of the latest (latest) connector version.

    To install the latest connector version, navigate to your Confluent Platform installation directory and run the following command:

    confluent-hub install confluentinc/kafka-connect-salesforce:latest
    

    You can install a specific version by replacing latest with a version number as shown in the following example:

    confluent-hub install confluentinc/kafka-connect-salesforce:2.0.7
    

Quick Start

This quick start uses the Salesforce Change Data Capture Source Connector to capture change events and write them to Kafka topic.

Salesforce Change Data Capture Setup

Following are the steps to enable change notification for Salesforce account.

  1. Use https://login.salesforce.com/ to log into an existing account or create a new one.

    Tip

    You can also use the https://test.salesforce.com/ test environment instead. Make sure to add Leads in the same environment and set salesforce.instance=https://test.salesforce.com in your connector configurations.

  2. Create a new Connected App.

    1. Select the gear icon in the upper right hand corner and choose Setup.
    2. Enter App in the Quick Find search box, and choose App Manager in the filtered results.
    3. Click the New Connected App button in the upper right corner of the Setup panel.
    4. Supply a Connected App Name, API Name, and Contact Email.
    5. Select Enable OAuth Settings checkbox and select the Enable for Device Flow checkbox. These selections enable the connector to use the Salesforce API.
    6. Under the Select OAuth Scopes field, select all of the items under Available OAuth scopes and add them to the Selected OAuth Scopes.
    7. Save the new app and press Continue at the prompt.
    8. Look for the Consumer Key and Consumer Secret in the displayed form. Save these so you can put them in the configuration properties file for the Salesforce connect worker.
  3. Find your Security Token (emailed to you from Salesforce.com). If you need to reset your token or view your profile on Salesforce.com, select Reset My Security Token and follow the instructions.

  4. Finally, subscribe to change event notifications for Salesforce entities. Subscribe the Leads entity.

Following are the steps to add a Lead to Salesforce:

  1. In the Salesforce UI, find and click on the App launcher icon in the upper left of the page.
  2. Find and start the Leads application.
  3. Click the New button, fill out the form, and save it. The Lead has been created.
  4. Alternatively, the leads can be created through simple a Java application, follow the salesforce documentation.

Start Confluent Platform

Start the Confluent services using the following Confluent CLI command:

confluent local services start

Important

Do not use the Confluent CLI in production environments.

Property-based example

Create a configuration file salesforce-cdc-source.properties with the following content. This file should be placed inside the Confluent Platform installation directory. This configuration is used typically along with standalone workers.

name=SalesforceCdcSourceConenctor

connector.class=io.confluent.salesforce.SalesforceCdcSourceConnector
tasks.max=1
kafka.topic=< Required Configuration >

salesforce.consumer.key=< Required Configuration >
salesforce.consumer.secret=< Required Configuration >
salesforce.username=< Required Configuration >
salesforce.password=< Required Configuration >
salesforce.password.token=< Required Configuration >

salesforce.cdc.name=< Required Configuration >
salesforce.initial.start=all

confluent.topic.bootstrap.servers=localhost:9092
confluent.topic.replication.factor=1
confluent.license=

Note

Note the following:

  • salesforce.consumer.key and salesforce.consumer.secret are required properties used for OAuth2 secure authentication by saleforce.com. Additional information and tutorials are available at salesforce.com.
  • Change the confluent.topic.bootstrap.servers property to include your broker address(es) and change the confluent.topic.replication.factor to 3 for staging or production use.
  • Set the following connector configuration properties to enable OAuth JWT bearer token support:
    • salesforce.username
    • salesforce.consumer.key
    • salesforce.jwt.keystore.path
    • salesforce.jwt.keystore.password

Run the connector with this configuration.

confluent local services connect connector load SalesforceCdcSourceConenctor --config salesforce-cdc-source.properties

Confirm that the connector is in a RUNNING state.

confluent local services connect connector status SalesforceCdcSourceConenctor

REST-based example

Use this setting with distributed workers. Write the following JSON to config.json, configure all of the required values, and use the following command to post the configuration to one of the distributed connect workers. Check here for more information about the Kafka Connect REST API

{
  "name" : "SalesforceCdcSourceConenctor",
  "config" : {
    "connector.class" : "io.confluent.salesforce.SalesforceCdcSourceConnector",
    "tasks.max" : "1",
    "kafka.topic" : "< Required Configuration >",
    "salesforce.cdc.name" : "< Required Configuration >",
    "salesforce.initial.start" : "all",
    "salesforce.consumer.key" : "< Required Configuration >",
    "salesforce.consumer.secret" : "< Required Configuration >",
    "salesforce.username" : "< Required Configuration >",
    "salesforce.password" : "< Required Configuration >",
    "salesforce.password.token" : "< Required Configuration >",
    "confluent.topic.bootstrap.servers": "localhost:9092",
    "confluent.topic.replication.factor": "1",
    "confluent.license": " Omit to enable trial mode "
  }
}

Note

Change the confluent.topic.bootstrap.servers property to include your broker address(es), and change the confluent.topic.replication.factor to 3 for staging or production use.

Use curl to post a configuration to one of the Kafka Connect workers. Change http://localhost:8083/ to the endpoint of one of your Kafka Connect worker(s).

curl -s -X POST -H 'Content-Type: application/json' --data @config.json http://localhost:8083/connectors

Use the following command to update the configuration of existing connector.

curl -s -X PUT -H 'Content-Type: application/json' --data @config.json http://localhost:8083/connectors/SalesforceCdcSourceConenctor/config

To consume records written by the connector to the configured Kafka topic, run the following command:

kafka-avro-console-consumer --bootstrap-server localhost:9092 --property schema.registry.url=http://localhost:8081  --topic cdc-topic --from-beginning

The leads that were inserted into Salesforce will be present in the Kafka topic.

Record Schema

Change Data Capture Record Format

The following example shows the JSON document structure of a Salesforce Change Data Capture Event as received by the Salesforce Change Data Capture Connector.

{
   "data":{
      "schema":"MbZFVarhwdRnx4cf-8DowQ",
      "payload":{
         "Status":"Open - Not Contacted",
         "LastModifiedDate":"2019-08-14T10:59:15.000Z",
         "Company":"Confluent1010",
         "CleanStatus":"Pending",
         "OwnerId":"0052v00000alfOXAAY",
         "CreatedById":"0052v00000alfOXAAY",
         "IsConverted":false,
         "ChangeEventHeader":{
            "commitNumber":10663250415860,
            "commitUser":"0052v00000alfOXAAY",
            "sequenceNumber":1,
            "entityName":"Lead",
            "changeType":"CREATE",
            "changeOrigin":"com/salesforce/api/rest/46.0",
            "transactionKey":"00025790-0623-3300-2724-bdf92c57a1e7",
            "commitTimestamp":1565780355000,
            "recordIds":[
               "00Q2v00001V4bqkEAB"
            ]
         },
         "IsUnreadByOwner":true,
         "CreatedDate":"2019-08-14T10:59:15.000Z",
         "LastModifiedById":"0052v00000alfOXAAY",
         "Name":{
            "LastName":"ConfluentLastName101"
         }
      },
      "event":{
         "replayId":1109257
      }
   },
   "channel":"/data/LeadChangeEvent"
}

Kafka Record Value

The following is a sample record written to Kafka topic for a Change Data Capture event.

{
    "Id":"00Q2v00001WOn3zEAD",
    "ReplayId":"1115950",
    "ChangeEventHeader":{
       "entityName":"Lead",
       "recordIds":["00Q2v00001WOn3zEAD"],
       "changeType":"CREATE",
       "changeOrigin":"com/salesforce/api/rest/46.0",
       "transactionKey":"00097908-d9b2-d840-5501-ae933a58ee4b",
       "sequenceNumber":1,
       "commitTimestamp":1568628454000,
       "commitUser":"0052v00000alfOXAAY",
       "commitNumber":10683824131775
    },
    "LastName":"ConfluentLastName89",
    "FirstName":null,
    "Salutation":null,
    "Name":null,
    "Title":null,
    "Company":"Confluent89",
    "City":null,
    "State":null,
    "PostalCode":null,
    "Country":null,
    "Latitude":null,
    "Longitude":null,
    "GeocodeAccuracy":null,
    "Address":null,
    "Phone":null,
    "MobilePhone":null,
    "Fax":null,
    "Email":null,
    "Website":null,
    "LeadSource":null,
    "Status":"Open - Not Contacted",
    "Industry":null,
    "Rating":null,
    "AnnualRevenue":null,
    "NumberOfEmployees":null,
    "OwnerId":"0052v00000alfOXAAY",
    "IsConverted":false,
    "ConvertedDate":null,
    "ConvertedAccountId":null,
    "ConvertedContactId":null,
    "ConvertedOpportunityId":null,
    "IsUnreadByOwner":true,
    "CreatedDate":1568628454000,
    "CreatedById":"0052v00000alfOXAAY",
    "LastModifiedDate":1568628454000,
    "LastModifiedById":"0052v00000alfOXAAY",
    "Jigsaw":null,
    "JigsawContactId":null,
    "CleanStatus":"Pending",
    "CompanyDunsNumber":null,
    "DandbCompanyId":null,
    "EmailBouncedReason":null,
    "EmailBouncedDate":null,
    "IndividualId":null,
    "SICCode__c":null,
    "ProductInterest__c":null,
    "Primary__c":null,
    "CurrentGenerators__c":null,
    "NumberofLocations__c":null,
    "_ObjectType":"LeadChangeEvent",
    "_EventType":"MbZFVarhwdRnx4cf-8DowQ"
}

Note

The Salesforce record fields unaltered in a Change Data Capture event are expected to have value as null when written to a Kafka topic.

Troubleshooting Connector and Task Failures

You can use the Kafka Connect REST API to check the status of the connectors and tasks. If a task or connector has failed, the trace field will include a reason and a stack trace.

Authorization Failures

The Salesforce connector must authenticate with Salesforce and establish a connection. If a connection fails because of authentication or authorization errors, the connector will stop immediately. These errors require changes in your Salesforce account which may include creating OAuth tokens. Try to rerun your connector after you make the account changes.

Salesforce Error Codes

Whenever the connector or task fails, it captures a message that includes the Salesforce error code and error message. The message may also include a failure reason or suggested fix. See the Salesforce error codes for more detail about each error code.

Repeated Connection Failures

The Salesforce connector uses a long-lived connection to the Salesforce service. Periodically this connection is lost due to network lag, service disruptions, or network issues. When the connector loses its connection, it automatically attempts to reconnect so that the connector can continue working. In some cases, however, the error reported by Salesforce indicates that the connector is unable to continue. In these cases, the connector task fails and includes the Salesforce error and reason in the task failure message.

If the connector stops frequently because of connection timeouts, consider changing the following connector configuration properties and restarting the connector:

  1. The ​connection.timeout defaults to 30 seconds (30000 milliseconds) and dictates the maximum amount of time that the connector should wait for a single connection attempt to succeed. If the initial connection takes longer than this, the connector task will fail.
  2. The ​request.max.retries.time.ms defaults to 15 minutes (900000 milliseconds) and specifies the maximum amount of time that the connector should continue to retry a failed request. The actual duration of the delay is random and it grows exponentially with each retry. If all retries take longer than this value, the connector task will fail. This does not apply to initial connection attempts, but it does apply to subsequent requests to reconnect. Any requests that result in authorization or authentication failures will not be retried.

If the failure rate is not affected by tuning these properties, it may be necessary to look at the Connect worker logs for errors, warnings, and exceptions. The log messages typically have a more detail than the connector status. The log messages preceding failure log messages can provide additional context and may provide you with information that identifies the failure cause.

Enabling Debug Logging

The Connect worker log configuration controls how much detail is included in the logs. By default, the worker logs include enough detail to identify basic functionality. Enable DEBUG logs in the Connect worker’s log configuration to include more details. This change must be made on each worker and only takes effect upon worker startup. After you change the log configuration as outlined below on each Connect worker, restart all of the Connect workers. A rolling restart can be used if necessary.

Note

Trace level logging is verbose and contains many more details, and may be useful to solve certain failures. Trace level logging is enabled like debug level logging is enabled, except TRACE is used instead of DEBUG.

On-Premises Installation

For local or on-premises installations of Confluent Platform, the etc/kafka/connect-log4j.properties file defines the logging configuration of the Connect worker process. To enable DEBUG on just the Salesforce connector, modify the etc/kafka/connect-log4j.properties file to include the following line:

log4j.logger.io.confluent.salesforce=DEBUG

To enable DEBUG on all of the Connect worker’s code, including all connectors, change the log4j.rootLogger= line to use DEBUG instead of INFO. For example, the default log configuration for Connect includes this line:

log4j.rootLogger=INFO, stdout

Change this line to the following to enable DEBUG on all of the Connect worker code:

log4j.rootLogger=DEBUG, stdout

Docker

For Docker containers, the logging configuration is set using environment variables. To enable DEBUG on just the Salesforce connector, use the following environment variable when starting your Confluent Platform Connect container:

CONNECT_LOG4J_ROOT_LOGLEVEL=DEBUG

To enable DEBUG log messages on all Connect worker code, including all connectors, use the following environment variable when starting your Confluent Platform Connect container:

CONNECT_LOG4J_LOGGERS="log4j.logger.io.confluent.salesforce=DEBUG"

The value of this environment variable is a comma-separated list of key-value pairs. For example, the following enables DEBUG on the Salesforce connector and the Connect framework:

CONNECT_LOG4J_LOGGERS="log4j.logger.io.confluent.salesforce=DEBUG,org.apache.kafka.connect=DEBUG"