Example: Configure Salesforce SObject Sink Connector

The Salesforce SObject sink connector is used to perform CRUD operations (create, update, delete, and upsert) on Salesforce Objects (SObjects). This example uses both Salesforce source (PushTopics) and sink connectors in tandem to move SObjects from one Salesforce organization to a secondary Salesforce organization. The example shows the source connector streaming a Salesforce Lead object creation, updating an event into an Kafka topic, and then using the sink connector to duplicate those changes into a secondary Salesforce organization.

Populate source topic using Source Connector

  1. See Example: Configure Salesforce Push Topic Source Connector and follow those steps to completion in order to populate Kafka with a Lead object. Before checking the topic for the Lead object record and after creating the Lead, edit the name of the Lead. Two records should be present when you check the contents of the source connector topic: a record of _EventType created and another of updated.
  2. Go back to Example: Configure Salesforce Push Topic Source Connector and follow the steps for Salesforce Account to create a secondary account in a separate Salesforce organization from the sink connector which is used to configure the sink connector.
  3. Log in to the secondary Salesforce organization and edit the Lead object to add the custom Id field. Follow these steps.
    1. Goto Salesforce lightning UI and select Setup->Platform Tools->Object Manager->Lead->Fields & Relationships->New->Text Field
    2. Name the field CustomId and check the External Id check box.
    3. Exit.

Note

The CustomId field is actually named CustomId__c while the label is CustomId. Use the name when referencing the external id field in the salesforce.custom.id.field.name property of the sink connector.

Configure Sink Connector

Prerequisites
  1. Create a configuration file named salesforce-sobject-leads-sink-config.json with the following contents. Make sure to enter a real username, password, security token, consumer key, and consumer secret. See Configuration Properties for more information on these and the other configuration properties.

    {
       "name" : "SalesforceSObjectSinkConnector1",
       "config" : {
    
         "connector.class" : "io.confluent.salesforce.SalesforceSObjectSinkConnector",
         "tasks.max" : "1",
         "topics" : "sfdc-pushtopic-lead",
         "salesforce.consumer.key" : "< Required Configuration >",
         "salesforce.consumer.secret" : "< Required Configuration >",
         "salesforce.object" : "Lead",
         "salesforce.password" : "< Required Configuration >",
         "salesforce.password.token" : "< Required Configuration >",
         "salesforce.username" : "< Required Configuration: secondary organization username >",
         "salesforce.ignore.fields" : "CleanStatus",
         "salesforce.ignore.reference.fields" : "true",
         "salesforce.use.custom.id.field" : "true",
         "salesforce.custom.id.field.name" : "CustomId__c",
         "confluent.topic.bootstrap.servers": "localhost:9092",
         "confluent.topic.replication.factor": "1",
         "confluent.license": " Omit to enable trial mode "
       }
     }
    
  2. Enter the Confluent CLI confluent local load command to start the Salesforce source connector.

    Caution

    You must include a double dash (--) between the topic name and your flag. For more information, see this post.

    Tip

    The command syntax for the Confluent CLI development commands changed in 5.3.0. These commands have been moved to confluent local. For example, the syntax for confluent start is now confluent local start. For more information, see confluent local.

    confluent local load SalesforceSObjectSinkConnector1 -- -d salesforce-sobject-leads-sink-config.json
    

    Your output should resemble:

      {
         "name": "SalesforceSObjectSinkConnector1",
          "config": {
              "connector.class" : "io.confluent.salesforce.SalesforceSObjectSinkConnector",
              "tasks.max" : "1",
              "topics" : "sfdc-pushtopic-leads",
              "salesforce.object" : "Lead",
              "salesforce.username" : "<Required>"
              "salesforce.password" : "<Required>",
              "salesforce.password.token" : "<Required>",
              "salesforce.consumer.key" : "<Required>",
              "salesforce.consumer.secret" : "<Required>",
              "salesforce.ignore.fields" : "CleanStatus",
              "salesforce.ignore.reference.fields" : "true",
              "salesforce.use.custom.id.field" : "true",
              "salesforce.custom.id.field.name" : "CustomId__c",
              "confluent.topic.bootstrap.servers": "localhost:9092",
              "confluent.topic.replication.factor": "1",
              "confluent.license": " Omit to enable trial mode "
          },
          "tasks": [
              ...
          ],
          "type": null
      }
    
    .. tip:: The ``tasks`` field may include information about the one started task.
    

View Leads in Salesforce

  1. Login to the secondary Salesforce organization and verify that the Leads object exists with the correct name. It should match the primary Salesforce organization.

Running both connectors concurrently

  1. Running both the sink connector and the source connector as separate workers allows for synchronizing SOjbect changes in near real time. If running in stand alone mode, add a custom port to one of the workers using the rest.port configuration property.
  2. Add and change leads as necessary in the primary organization. The source connector captures your changes and writes them to same topic that the sink connector reads from and uses as a source for operations.