Example: Configure Salesforce SObject Sink Connector¶
The Salesforce SObject sink connector is used to perform CRUD operations (create, update, delete, and upsert) on Salesforce Objects (SObjects). This example uses both Salesforce source (PushTopics) and sink connectors in tandem to move SObjects from one Salesforce organization to a secondary Salesforce organization. The example shows the source connector streaming a Salesforce Lead
object creation, updating an event into an Kafka topic, and then using the sink connector to duplicate those changes into a secondary Salesforce organization.
Populate source topic using Source Connector¶
- See Example: Configure Salesforce Push Topic Source Connector and follow those steps to completion in order to populate Kafka with a
Lead
object. Before checking the topic for theLead
object record and after creating theLead
, edit the name of theLead
. Two records should be present when you check the contents of the source connector topic: a record of _EventTypecreated
and another ofupdated
. - Go back to Example: Configure Salesforce Push Topic Source Connector and follow the steps for Salesforce Account to create a secondary account in a separate Salesforce organization from the sink connector which is used to configure the sink connector.
- Log in to the secondary Salesforce organization and edit the Lead object to add the custom
Id
field. Follow these steps.- Goto Salesforce lightning UI and select Setup->Platform Tools->Object Manager->Lead->Fields & Relationships->New->Text Field
- Name the field
CustomId
and check theExternal Id
check box. - Exit.
Note
The CustomId
field is actually named CustomId__c
while the label is CustomId
. Use the name when referencing the external id field in the salesforce.custom.id.field.name
property of the sink connector.
Configure Sink Connector¶
- Prerequisites
- Confluent Platform
- Confluent CLI (requires separate installation)
Create a configuration file named
salesforce-sobject-leads-sink-config.json
with the following contents. Make sure to enter a real username, password, security token, consumer key, and consumer secret. See Configuration Properties for more information on these and the other configuration properties.{ "name" : "SalesforceSObjectSinkConnector1", "config" : { "connector.class" : "io.confluent.salesforce.SalesforceSObjectSinkConnector", "tasks.max" : "1", "topics" : "sfdc-pushtopic-lead", "salesforce.consumer.key" : "< Required Configuration >", "salesforce.consumer.secret" : "< Required Configuration >", "salesforce.object" : "Lead", "salesforce.password" : "< Required Configuration >", "salesforce.password.token" : "< Required Configuration >", "salesforce.username" : "< Required Configuration: secondary organization username >", "salesforce.ignore.fields" : "CleanStatus", "salesforce.ignore.reference.fields" : "true", "salesforce.use.custom.id.field" : "true", "salesforce.custom.id.field.name" : "CustomId__c", "confluent.topic.bootstrap.servers": "localhost:9092", "confluent.topic.replication.factor": "1", "confluent.license": " Omit to enable trial mode " } }
Enter the Confluent CLI confluent local load command to start the Salesforce source connector.
Caution
You must include a double dash (
--
) between the topic name and your flag. For more information, see this post.Tip
The command syntax for the Confluent CLI development commands changed in 5.3.0. These commands have been moved to
confluent local
. For example, the syntax forconfluent start
is nowconfluent local start
. For more information, see confluent local.confluent local load SalesforceSObjectSinkConnector1 -- -d salesforce-sobject-leads-sink-config.json
Your output should resemble:
{ "name": "SalesforceSObjectSinkConnector1", "config": { "connector.class" : "io.confluent.salesforce.SalesforceSObjectSinkConnector", "tasks.max" : "1", "topics" : "sfdc-pushtopic-leads", "salesforce.object" : "Lead", "salesforce.username" : "<Required>" "salesforce.password" : "<Required>", "salesforce.password.token" : "<Required>", "salesforce.consumer.key" : "<Required>", "salesforce.consumer.secret" : "<Required>", "salesforce.ignore.fields" : "CleanStatus", "salesforce.ignore.reference.fields" : "true", "salesforce.use.custom.id.field" : "true", "salesforce.custom.id.field.name" : "CustomId__c", "confluent.topic.bootstrap.servers": "localhost:9092", "confluent.topic.replication.factor": "1", "confluent.license": " Omit to enable trial mode " }, "tasks": [ ... ], "type": null } .. tip:: The ``tasks`` field may include information about the one started task.
View Leads in Salesforce¶
- Login to the secondary Salesforce organization and verify that the
Leads
object exists with the correct name. It should match the primary Salesforce organization.
Running both connectors concurrently¶
- Running both the sink connector and the source connector as separate workers allows for synchronizing SOjbect changes in near real time. If running in stand alone mode, add a custom port to one of the workers using the
rest.port
configuration property. - Add and change leads as necessary in the primary organization. The source connector captures your changes and writes them to same topic that the sink connector reads from and uses as a source for operations.