Example: Configure Salesforce PushTopic Source Connector¶
The Salesforce PushTopic source connector is used to subscribe to PushTopics and capture the create, update, delete, and undelete events related to Salesforce Objects (SObjects).
Salesforce Account¶
Use https://login.salesforce.com/ to log into an existing account or create a new one.
Tip
You can also use the https://test.salesforce.com/ test environment instead. Make sure to add Leads in the same environment and set
salesforce.instance=https://test.salesforce.com
in your connector configurations.Create a new Connected App.
- Select the gear icon in the upper right hand corner and choose Setup.
- Enter App in the Quick Find search box, and choose App Manager in the filtered results.
- Click the New Connected App button in the upper right corner of the Setup panel.
- Supply a Connected App Name, API Name, and Contact Email.
- Select Enable OAuth Settings checkbox and select the Enable for Device Flow checkbox. These selections enable the connector to use the Salesforce API.
- Under the Select OAuth Scopes field, select all of the items under Available OAuth scopes and add them to the Selected OAuth Scopes.
- Save the new app and press Continue at the prompt.
- Look for the Consumer Key and Consumer Secret in the displayed form. Save these so you can put them in the configuration properties file for the Salesforce connect worker.
Find your Security Token (emailed to you from Salesforce.com). If you need to reset your token or view your profile on Salesforce.com, select Reset My Security Token and follow the instructions.
Complete the following steps for JWT token bearer:
- Log into the account at https://login.salesforce.com/.
- Go to the setup area (the gear icon, top right).
- Go to Apps > App Manager (in the side navigation area).
- Click New Connect App.
- Populate the required Basic Information fields.
- In the API (Enable OAuth Settings) section:
- Check Enable OAuth Settings.
- Callback URL is unused in the JWT flow, but it still requires a value. Use “http://localhost/” or any other sample host URL.
- Check Use digital signatures. Upload the salesforce.crt that was generated earlier.
- For Selected OAuth Scopes, add Access and manage your data (api) and Perform requests on your behalf at any time (refresh_token, offline_access).
- Click Save. If there are any errors, upload salesforce.crt again.
- On the resulting app page, click Manage.
- Click Edit Policies.
- In the OAuth policies section, change Permitted Users to Admin approved users are pre-authorized.
- Click Save.
- In the Profiles section (on the app page), click Manage Profiles.
- On the Application Profile Assignment page, assign the user profiles that may access the app.
- Complete the following steps to get the Salesforce app Consumer Key:
- Log into the account at https://login.salesforce.com/.
- Go to the setup area (the gear icon, top right).
- Go to Apps > App Manager (in the side navigation area).
- In the list, find the app that you created in the App Creation in a previous step.
- Click View from the app row drop-down selections. The Consumer Ke is in the API (Enable OAuth Settings) section.
Install the Salesforce Connector Plugin¶
- Prerequisites
- Confluent Platform
- Confluent CLI (requires separate installation)
Navigate to your Confluent Platform installation directory and run this command to install the latest version of the Salesforce connector.
confluent-hub install confluentinc/kafka-connect-salesforce:latest
Tip
To install a specific version of connector, replace
latest
with version number (for example,1.1.0-preview
) in the command above.Restart Connect to pick up the new plugin.
confluent local services connect stop && confluent local services connect start
Check if the Salesforce plugin has been installed correctly and picked up by the plugin loader.
curl -sS localhost:8083/connector-plugins | jq -c '.[] | select( .class | contains("Salesforce") )'
This command should print the new Salesforce connector plugins available to the worker.
{"class": "io.confluent.salesforce.SalesforcePlatformEventSourceConnector","type": "source","version": "1.1.0-preview"} {"class": "io.confluent.salesforce.SalesforcePushTopicSourceConnector","type": "source","version": "1.1.0-preview"} {"class": "io.confluent.salesforce.SalesforceSourceConnector","type": "source","version": "1.1.0-preview"}
Tip
The
SalesforceSourceConnector
connector is the old deprecated connector. TheSalesforcePushTopicSourceConnector
is identical to the deprecated connector and should be used in new connectors. When it’s convenient, change your existing configurations to use it.
Add a Lead to Salesforce¶
- In the Salesforce UI, find and click on the App launcher icon in the upper left of the page.
- Find and start the “Leads” app.
- Click the New button, fill out the form, and save it.
Configure Source Connector¶
- Prerequisites
- Confluent Platform
- Confluent CLI (requires separate installation)
Note
You set the following connector configuration properties to enable OAuth JWT bearer token support:
salesforce.username
salesforce.consumer.key
salesforce.jwt.keystore.path
salesforce.jwt.keystore.password
Create a configuration file named
salesforce-pushtopic-lead-source-config.json
with the following contents. Make sure to enter a real username, password, security token, consumer key, and consumer secret. See Salesforce PushTopic Source Connector Configuration Properties for more information on these and the other configuration properties.{ "name": "lead-pushtopic", "config": { "connector.class" : "io.confluent.salesforce.SalesforcePushTopicSourceConnector", "key.converter" : "org.apache.kafka.connect.json.JsonConverter", "value.converter" : "org.apache.kafka.connect.json.JsonConverter", "tasks.max" : "1", "kafka.topic" : "sfdc-pushtopic-leads", "salesforce.object" : "Lead", "salesforce.push.topic.name" : "LeadsPushTopic", "salesforce.username" : "<Required>", "salesforce.password" : "<Required>", "salesforce.password.token" : "<Required>", "salesforce.consumer.key" : "<Required>", "salesforce.consumer.secret" : "<Required>", "salesforce.initial.start" : "all", "confluent.topic.bootstrap.servers": "localhost:9092", "confluent.topic.replication.factor": "1", "confluent.license": " Omit to enable trial mode " } }
Enter the Confluent CLI confluent local services connect connector load command to start the Salesforce source connector.
Caution
You must include a double dash (
--
) between the topic name and your flag. For more information, see this post.Tip
The command syntax for the Confluent CLI development commands changed in 5.3.0. These commands have been moved to
confluent local
. For example, the syntax forconfluent start
is nowconfluent local services start
. For more information, see confluent local.confluent local services connect connector load lead-pushtopic --config salesforce-pushtopic-lead-source-config.json
Your output should resemble:
{ "name": "lead-pushtopic", "config": { "connector.class" : "io.confluent.salesforce.SalesforcePushTopicSourceConnector", "key.converter" : "org.apache.kafka.connect.json.JsonConverter", "value.converter" : "org.apache.kafka.connect.json.JsonConverter", "tasks.max" : "1", "kafka.topic" : "sfdc-pushtopic-leads", "salesforce.object" : "Lead", "salesforce.push.topic.name" : "LeadsPushTopic", "salesforce.username" : "<Required>", "salesforce.password" : "<Required>", "salesforce.password.token" : "<Required>", "salesforce.consumer.key" : "<Required>", "salesforce.consumer.secret" : "<Required>", "confluent.topic.bootstrap.servers": "localhost:9092", "confluent.topic.replication.factor": "1", "confluent.license": " Omit to enable trial mode " }, "tasks": [ ... ], "type": null }
Tip
The
tasks
field may include information about the one started task.
View Leads in Kafka Topic¶
Navigate to your Confluent Platform installation directory and enter the following command to install the latest version of the Salesforce connector.
kafka-avro-console-consumer --bootstrap-server kafka:9092 --from-beginning --topic sfdc-pushtopic-leads
Add More Leads to Salesforce¶
Add and change leads as necessary. The connector captures your changes and writes them to the same topic.