.. _salesforce-push-topic-source-connector: Salesforce PushTopic Source Connector for |cp| ============================================== The |kconnect-long| Salesforce `PushTopic `__ source connector provides the ability to subscribe to create, update, delete, and undelete events related to Salesforce Objects (SObjects). The ``SalesforcePushTopicSourceConnector`` can be used to capture these PushTopic events and write them to |ak-tm| in the same order. This connector can be used with either standalone or distributed Connect workers. .. note:: The connector's ``salesforce.push.topic.name`` property indicates the name of the PushTopic. If it does not exist when the connector is started, a PushTopic with that name is created. See the `Salesforce Developer Guide `_ for more information. .. _salesforce-push-topic-source-connector-considerations: Considerations -------------- Note the following when using the Salesforce PushTopic source connector. .. _salesforce-push-topic-source-connector-considerations-replay: Replaying Messages ^^^^^^^^^^^^^^^^^^ PushTopics currently retain events for 24 hours. When you create a connector, you can control how stored events are treated when the connector starts for the first time. #. Consume only new events that arrive in the PushTopic after the connector starts up, via ``salesforce.initial.start=latest``. This is the default behavior, due largely to :ref:`API limits `. #. Consume all of the events in the PushTopic, via ``salesforce.initial.start=all``. .. _salesforce-push-topic-source-connector-considerations-restart: Restarting ^^^^^^^^^^ When the connector operates, it periodically records the *replay ID* of the last record written to |ak|. When the connector is stopped and then restarted within 24 hours, the connector continues consuming the PushTopic where it stopped, with no missed events. However, if the connector stops for more than 24 hours, some events are discarded in Salesforce before the connector can read them. If the connector stops unexpectedly due to a failure, it may not record the *replay ID* of the last record successfully written to |ak|. When the connector restarts, it resumes from the last recorded *replay ID*. This means that some events may be duplicated in |ak|. .. _salesforce-push-topic-source-connector-considerations-messageorder: Message Order ^^^^^^^^^^^^^ Changes to data in your organization happen in a sequential manner. However, Salesforce clients (like the connector) may receive out-of-order event notifications. The connector writes events in the same order that it receives them. If required, you can set a ``createdDate`` property for topic consumers that puts the notification messages in event order. .. _salesforce-push-topic-source-connector-considerations-visibility: Visibility ^^^^^^^^^^ Only the events captured by a PushTopic are visible to the connector. These events are retained for 24 hours. .. _salesforce-push-topic-source-connector-considerations-limits: API Limits ^^^^^^^^^^ The Salesforce connector is limited by the `allocations and limits `_ of the Salesforce account. For example, free org accounts have a cap on the number of events that can occur within a 24-hour period. .. _salesforce-push-topic-source-connector-creating-push-topics: Creating Push Topics -------------------- The connector is able to use an existing PushTopic. Or, if no PushTopics with the name exists, it creates one when the connector is started using the following syntax: .. codewithvars:: sql SELECT ID, FROM where ```` is the comma-separated list of non-textarea fields, and ```` is the name of the SObject (e.g., ``Lead``, ``Case``, ``Account``, etc.). If you want the push topic to be defined differently, simply create it using the `Salesforce UI `_ before running the connector. .. _salesforce-push-topic-source-connector-examples: Examples -------- .. note:: ``salesforce.consumer.key`` and ``salesforce.consumer.secret`` are required properties used for OAuth2 secure authentication by Saleforce.com. Additional information and tutorials are available at `Salesforce.com `_. Property-based example ^^^^^^^^^^^^^^^^^^^^^^ This configuration is used typically along with :ref:`standalone workers `. .. codewithvars:: properties :emphasize-lines: 4,5,6,7,8,9,10,11,12,13,14,15 name=SalesforcePushTopicSourceConnector1 connector.class=io.confluent.salesforce.SalesforcePushTopicSourceConnector tasks.max=1 kafka.topic=< Required Configuration > salesforce.consumer.key=< Required Configuration > salesforce.consumer.secret=< Required Configuration > salesforce.object=< Required Configuration > salesforce.password=< Required Configuration > salesforce.password.token=< Required Configuration > salesforce.push.topic.name=< Required Configuration > salesforce.username=< Required Configuration > salesforce.initial.start=all confluent.topic.bootstrap.servers=localhost:9092 confluent.topic.replication.factor=1 confluent.license= .. note:: Change the ``confluent.topic.bootstrap.servers`` property to include your broker address(es), and change the ``confluent.topic.replication.factor`` to 3 for staging or production use. REST-based example ^^^^^^^^^^^^^^^^^^ This configuration is used typically along with :ref:`distributed workers `. Write the following JSON to ``connector.json``, configure all of the required values, and use the command below to post the configuration to one the distributed connect worker(s). See |kconnect| :ref:`REST API ` for more information. **Connect Distributed REST example with Push Topic**: .. codewithvars:: json :emphasize-lines: 6,7,8,9,10,11,12,13,14,15,16,17 { "name" : "SalesforcePushTopicSourceConnector1", "config" : { "connector.class" : "io.confluent.salesforce.SalesforcePushTopicSourceConnector1", "tasks.max" : "1", "kafka.topic" : "< Required Configuration >", "salesforce.consumer.key" : "< Required Configuration >", "salesforce.consumer.secret" : "< Required Configuration >", "salesforce.object" : "< Required Configuration >", "salesforce.password" : "< Required Configuration >", "salesforce.password.token" : "< Required Configuration >", "salesforce.push.topic.name" : "< Required Configuration >", "salesforce.username" : "< Required Configuration >", "salesforce.initial.start" : "all", "confluent.topic.bootstrap.servers": "localhost:9092", "confluent.topic.replication.factor": "1", "confluent.license": " Omit to enable trial mode " } } .. note:: Change the ``confluent.topic.bootstrap.servers`` property to include your broker address(es), and change the ``confluent.topic.replication.factor`` to 3 for staging or production use. Use curl to post a configuration to one of the |kconnect| workers. Change ``http://localhost:8083/`` to the endpoint of one of your |kconnect| worker(s). **Create a new connector:** .. codewithvars:: bash curl -s -X POST -H 'Content-Type: application/json' --data @connectorPushTopic.json http://localhost:8083/connectors **Update an existing connector:** .. codewithvars:: bash curl -s -X PUT -H 'Content-Type: application/json' --data @connector.json http://localhost:8083/connectors/SalesforcePushTopicSourceConnector1/config .. _salesforce-push-topic-source-connector-record-format: Sample data formats ------------------- The following are examples of a Salesforce ``Lead`` SObject that is received by the connector in the original Salesforce JSON format, then converted to a |ak| record, and stored in a topic in JSON Format. **Raw PushTopic Event received from Salesforce:** .. codewithvars:: json { "data": { "event": { "createdDate": "2019-03-25T22:58:26.749Z", "replayId": 798, "type": "created" }, "sobject": { "LastModifiedDate": "2019-03-25T22:58:26.000Z", "Company": "Acme Ind", "Email": "jill.doe@gmail.com", "Address": { "GeocodeAccuracy": null, "State": "Fl", "Street": "123 Elm Street", "PostalCode": "33133", "Country": null, "Latitude": null, "City": "Centerville", "Longitude": null }, "Latitude": null, "ProductInterest__c": null, "MobilePh one": null, "Name": "Jill Doe", "Industry": "Communications", "CreatedById": "0051U000004EE5wQAG", "SICCode__c": null, "DandbCompanyId": null, "PhotoUrl": null, "ConvertedOpportunityId": null, "MasterRecordId": null, "Status": "Open - Not Contacted", "Co nvertedAccountId": null, "IsDeleted": false, "IsConverted": false, "LastViewedDate": null, "City": "Centerville", "Longitude": null, "CleanStatus": "5", "LeadSource": "Phone Inquiry", "GeocodeAccuracy": null, "Primary__c": null, "State": "Fl", "CreatedDate ": "2019-03-25T22:58:26.000Z", "Country": null, "LastName": "Doe", "Id": "00Q1U000006f5Q9UAI", "AnnualRevenue": null, "Jigsaw": null, "EmailBouncedDate": null, "ConvertedDate": null, "Rating": "Warm", "Website": null, "PostalCode": "33133", "LastReferenced Date": null, "NumberOfEmployees": null, "CompanyDunsNumber": null, "Salutation": null, "ConvertedContactId": null, "OwnerId": "0051U000004EE5wQAG", "Phone": "555-1212", "NumberofLocations__c": null, "EmailBouncedReason": null, "FirstName": "Jill", "IsUnr eadByOwner": true, "Title": null, "SystemModstamp": "2019-03-25T22:58:26.000Z", "CurrentGenerators__c": null, "LastActivityDate": null, "Fax": null, "LastModifiedById": "0051U000004EE5wQAG", "JigsawContactId": null } }, "channel": "/topic/LeadsPushTopic " } **Kafka record key for the Salesforce PushTopic Event:** .. codewithvars:: json { "Id": "00Q1U000006f5Q9UAI" } **Kafka record value for the Salesforce PushTopic Event:** .. codewithvars:: json { "Id": "00Q1U000006f5Q9UAI", "IsDeleted": { "boolean": false }, "MasterRecordId": null, "LastName": { "string": "Doe" }, "FirstName": { "string": "Jill" }, "Salutation": null, "Name": { "string": "Jill Doe" }, "Title": null, "Company": { "string": "Acme Ind" }, "City": { "string": "Centerville" }, "State": { "string": "Fl" }, "PostalCode": { "string": "33133" }, "Country": null, "Latitude": null, "Longitude": null, "GeocodeAccuracy": null, "Address": { "io.confluent.salesforce.Address": { "GeocodeAccuracy": null, "State": { "string": "Fl" }, "Street": { "string": "123 Elm Street" }, "PostalCode": { "string": "33133" }, "Country": null, "Latitude": null, "City": { "string": "Centerville" }, "Longitude": null } }, "Phone": { "string": "555-1212" }, "MobilePhone": null, "Fax": null, "Email": { "string": "jill.doe@gmail.com" }, "Website": null, "PhotoUrl": null, "LeadSource": { "string": "Phone Inquiry" }, "Status": { "string": "Open - Not Contacted" }, "Industry": { "string": "Communications" }, "Rating": { "string": "Warm" }, "AnnualRevenue": null, "NumberOfEmployees": null, "OwnerId": { "string": "0051U000004EE5wQAG" }, "IsConverted": { "boolean": false }, "ConvertedDate": null, "ConvertedAccountId": null, "ConvertedContactId": null, "ConvertedOpportunityId": null, "IsUnreadByOwner": { "boolean": true }, "CreatedDate": { "long": 1553554706000 }, "CreatedById": { "string": "0051U000004EE5wQAG" }, "LastModifiedDate": { "long": 1553554706000 }, "LastModifiedById": { "string": "0051U000004EE5wQAG" }, "SystemModstamp": { "long": 1553554706000 }, "LastActivityDate": null, "LastViewedDate": null, "LastReferencedDate": null, "Jigsaw": null, "JigsawContactId": null, "CleanStatus": { "string": "5" }, "CompanyDunsNumber": null, "DandbCompanyId": null, "EmailBouncedReason": null, "EmailBouncedDate": null, "SICCode__c": null, "ProductInterest__c": null, "Primary__c": null, "CurrentGenerators__c": null, "NumberofLocations__c": null, "_ObjectType": { "string": "Lead" }, "_EventType": { "string": "created" } } .. _salesforce-pushtopic-source-connector-troubleshooting: Troubleshooting Connector and Task Failures ------------------------------------------- .. include:: ../includes/salesforce_troubleshooting.rst Additional Documentation ------------------------ .. toctree:: :maxdepth: 1 salesforce_pushtopic_source_connector_config salesforce_pushtopic_source_connector_quickstart