.. _salesforce-bulk-api-source-connector: Salesforce Bulk API Source Connector for |cp| ============================================= The Salesforce Bulk API Source Connector integrates `Salesforce.com `_ with |ak-tm|. The Salesforce Bulk API Source Connector provides the capability to pull records and capture changes from Salesforce.com via `Salesforce Bulk Query API `_. Salesforce `Objects `_ are standard salesforce objects. The ``SalesforceBulkApiSourceConnector`` can be used to pull objects/capture changes and write them to |ak-tm|. This connector can be used with either standalone or distributed Connect workers. Prerequisites ------------- The following are required to run the |kconnect-long| Salesforce Bulk API Source Connector: * |ak| Broker: |cp| 3.3.0 or above, or |ak| 0.11.0 or above * |kconnect|: |cp| 4.0.0 or above, or |ak| 1.0.0 or above * Java Version: 1.8 Install the Salesforce Bulk API Source Connector ------------------------------------------------ .. include:: ../includes/connector-install.rst .. include:: ../includes/connector-install-hub.rst .. codewithvars:: bash confluent-hub install confluentinc/kafka-connect-salesforce-bulk-api:latest .. include:: ../includes/connector-install-version.rst .. codewithvars:: bash confluent-hub install confluentinc/kafka-connect-salesforce-bulk-api:1.0.0-preview Install Connector Manually -------------------------- `Download and extract the ZIP file `_ for your connector and then follow the manual connector installation :ref:`instructions `. .. _salesforce_bulk_api_connector_license: License ------- .. include:: ../includes/enterprise-license.rst See :ref:`salesforce-bulk-api-connector-license-config` for license properties and :ref:`salesforce-bulk-api-license-topic-configuration` for information about the license topic. .. _salesforce-bulk-api-source-connector-considerations: Considerations -------------- Note the following when using the Salesforce Bulk API source connector. ---------- Restarting ---------- When the connector operates, it periodically records the *last query time* execution in the |kconnect| offset topic. When the connector is restarted, it fetches Salesforce objects with a ``LastModifiedDate`` that is later than last queried time. ---------- API Limits ---------- The Salesforce Bulk API connector is limited by non compound fields. For Example, Bulk Query doesn't support address, location fields. The connector will discard the address and geolocation fields. Salesforce Object Support ------------------------- The following objects from Salesforce are supported in this version of Kafka Connect Salesforce Bulk API Source connector: * Account * Campaign * CampaignMember * Case * Contact * Contract * Event * Group * Lead * Opportunity * OpportunityContactRole * OpportunityLineItem * Period * PricebookEntry * Product2 * Task * TaskFeed * TaskRelation * User * UserRole The Kafka Connect Salesforce Bulk API Source connector also supports custom objects with noncompound fields. The following objects are not supported by Salesforce Bulk API: * Feed (e.g. AccountFeed, AssetFeed, ...) * Share (e.g. AccountBrandShare, ChannelProgramLevelShare, ...) * History (e.g. AccountHistory, ActivityHistory, ...) * EventRelation (e.g. AcceptedEventRelation, DeclinedEventRelation, ...) * AggregateResult * AttachedContentDocument * CaseStatus * CaseTeamMember * CaseTeamRole * CaseTeamTemplate * CaseTeamTemplateMember * CaseTeamTemplateRecord * CombinedAttachment * ContentFolderItem * ContractStatus * EventWhoRelation * FolderedContentDocument * KnowledgeArticleViewStat * KnowledgeArticleVoteStat * LookedUpFromActivity * Name * NoteAndAttachment * OpenActivity * OwnedContentDocument * PartnerRole * RecentlyViewed * ServiceAppointmentStatus * SolutionStatus * TaskPriority * TaskStatus * TaskWhoRelation * UserRecordAccess * WorkOrderLineItemStatus * WorkOrderStatus Quick Start ----------- In this quick start, the Salesforce Bulk API source connector is used to import data from Salesforce to |ak|. #. Create Salesforce developer account using this `link `_ if you don't have it. #. Add records to the objects by clicking on App Launcher and selecting the required Salesforce object. Install the connector through the :ref:`confluent_hub_client`. .. codewithvars:: bash # run from your CP installation directory confluent-hub install confluentinc/kafka-connect-salesforce-bulk-api:latest .. tip:: By default, it will install the plugin into ``share/confluent-hub-components`` and add the directory to the plugin path. If this is the first connector you have installed, you may need to restart the connect server for the plugin path change to take effect. Start the services using the Confluent CLI. .. codewithvars:: bash |confluent_start| Every service starts in order, printing a message with its status. .. include:: ../../includes/cli.rst :start-after: CE_CLI_startup_output :end-before: COS_CP_CLI_startup_output .. note:: The ``SalesforceBulkApiSourceConnector`` supports a single task only. ---------------------- Property-based example ---------------------- Create a configuration file, ``salesforce-bulk-api.properties``.This configuration is used typically along with :ref:`standalone workers `. .. codewithvars:: properties name=SalesforceBulkApiSourceConnector tasks.max=1 connector.class=io.confluent.connect.salesforce.SalesforceBulkApiSourceConnector salesforce.username=< Required Configuration > salesforce.password=< Required Configuration > salesforce.password.token=< Required Configuration > salesforce.object=< Required Configuration > salesforce.since=< Required Configuration > kafka.topic=< Required Configuration > salesforce.instance=< Required Configuration > confluent.topic.bootstrap.servers=localhost:9092 confluent.topic.replication.factor=1 confluent.license=Omit to enable trial mode Before starting the connector, make sure that the configurations in ``salesforce-bulk-api.properties`` are properly set. Then start the Salesforce Bulk API source connector by loading its configuration with the following command. .. include:: ../../includes/confluent-local-connector-limit.rst .. codewithvars:: bash |confluent_load| salesforce-bulk-api-source|dash| -d salesforce-bulk-api.properties { "name" : "SalesforceBulkApiSourceConnector", "config" : { "connector.class", "io.confluent.connect.salesforce.SalesforceBulkApiSourceConnector", "tasks.max" : "1", "kafka.topic" : "< Required Configuration >", "salesforce.password" : "< Required Configuration >", "salesforce.password.token" : "< Required Configuration >", "salesforce.object" : "< Required Configuration >", "salesforce.username" : "< Required Configuration >", "salesforce.since" : "< Required Configuration >", "confluent.topic.bootstrap.servers": "localhost:9092", "confluent.topic.replication.factor": "1", "confluent.license": "" }, "tasks": [] } Check that the connector started successfully. Review the |kconnect| worker's log by entering the following: .. codewithvars:: bash |confluent_log| connect Confirm that the connector is in a ``RUNNING`` state. .. codewithvars:: bash |confluent_status| SalesforceBulkApiSourceConnector Confirm that the messages are being sent to |ak|. .. codewithvars:: bash kafka-avro-console-consumer \ --bootstrap-server localhost:9092 \ --property schema.registry.url=http://localhost:8081 \ --topic \ --from-beginning | jq '.' ------------------ REST-based example ------------------ This configuration is used typically along with :ref:`distributed workers `. Write the following JSON to ``connector.json``, configure all of the required values, and use the command below to post the configuration to one of the distributed connect worker(s). See |kconnect-long| :ref:`REST API ` for more information. **Connect Distributed REST example:** .. codewithvars:: json { "name" : "SalesforceBulkApiSourceConnector", "config" : { "connector.class", "io.confluent.connect.salesforce.SalesforceBulkApiSourceConnector", "tasks.max" : "1", "kafka.topic" : "< Required Configuration >", "salesforce.password" : "< Required Configuration >", "salesforce.password.token" : "< Required Configuration >", "salesforce.object" : "< Required Configuration >", "salesforce.username" : "< Required Configuration >", "salesforce.since" : "< Required Configuration >", "confluent.topic.bootstrap.servers": "localhost:9092", "confluent.topic.replication.factor": "1", "confluent.license": " Omit to enable trial mode " } } .. note:: Change the ``confluent.topic.bootstrap.servers`` property to include your broker address(es), and change the ``confluent.topic.replication.factor`` to 3 for staging or production use. Use curl to post a configuration to one of the |kconnect-long| Workers. Change ``http://localhost:8083/`` to the endpoint of one of your |kconnect-long| worker(s). **Create a new connector:** .. codewithvars:: bash curl -sS -X POST -H 'Content-Type: application/json' --data @connector.json http://localhost:8083/connectors **Update an existing connector:** .. codewithvars:: bash curl -s -X PUT -H 'Content-Type: application/json' --data @connector.json http://localhost:8083/connectors/SalesforceBulkApiSourceConnector/config Sample data formats ------------------- The following examples show the JSON document structure of a Salesforce Bulk Query results (for Contact object) as it received by the Saleforce Connector, converted to a Kafka record, and then stored in a topic. **Raw JSON received from Salesforce Bulk Query:** .. codewithvars:: json [ { "attributes" : { "type" : "Contact", "url" : "/services/data/v47.0/sobjects/Contact/0032v00002qXTBlAAO" }, "Id" : "0032v00002qXTBlAAO", "IsDeleted" : false, "MasterRecordId" : null, "AccountId" : "0012v00002RkgUVAAZ", "LastName" : "Gonzalez", "FirstName" : "Rose", "Salutation" : "Ms.", "Name" : "Rose Gonzalez", "OtherStreet" : null, "OtherCity" : null, "OtherState" : null, "OtherPostalCode" : null, "OtherCountry" : null, "OtherLatitude" : null, "OtherLongitude" : null, "OtherGeocodeAccuracy" : null, "MailingStreet" : "313 Constitution Place\nAustin, TX 78767\nUSA", "MailingCity" : null, "MailingState" : null, "MailingPostalCode" : null, "MailingCountry" : null, "MailingLatitude" : null, "MailingLongitude" : null, "MailingGeocodeAccuracy" : null, "Phone" : "(512) 757-6000", "Fax" : "(512) 757-9000", "MobilePhone" : "(512) 757-9340", "HomePhone" : null, "OtherPhone" : null, "AssistantPhone" : null, "ReportsToId" : null, "Email" : "rose@edge.com", "Title" : "SVP, Procurement", "Department" : "Procurement", "AssistantName" : null, "LeadSource" : "Trade Show", "Birthdate" : "1967-07-14", "Description" : null, "OwnerId" : "0052v00000ajtG3AAI", "CreatedDate" : 1564636138000, "CreatedById" : "0052v00000ajtG3AAI", "LastModifiedDate" : 1564636138000, "LastModifiedById" : "0052v00000ajtG3AAI", "SystemModstamp" : 1564636138000, "LastActivityDate" : null, "LastCURequestDate" : null, "LastCUUpdateDate" : null, "LastViewedDate" : 1573528066000, "LastReferencedDate" : 1573528066000, "EmailBouncedReason" : null, "EmailBouncedDate" : null, "IsEmailBounced" : false, "PhotoUrl" : "/services/images/photo/0032v00002qXTBlAAO", "Jigsaw" : null, "JigsawContactId" : null, "CleanStatus" : "Pending", "IndividualId" : null, "Level__c" : "Primary", "Languages__c" : "English" } ] **Kafka record value:** .. codewithvars:: json { "Id":"0032v00002qXTBlAAO", "IsDeleted":{ "boolean":false }, "MasterRecordId":null, "AccountId":{ "string":"0012v00002RkgUVAAZ" }, "LastName":{ "string":"Gonzalez" }, "FirstName":{ "string":"Rose" }, "Salutation":{ "string":"Ms." }, "Name":{ "string":"Rose Gonzalez" }, "OtherStreet":null, "OtherCity":null, "OtherState":null, "OtherPostalCode":null, "OtherCountry":null, "OtherLatitude":null, "OtherLongitude":null, "OtherGeocodeAccuracy":null, "MailingStreet":{ "string":"313 Constitution Place\nAustin, TX 78767\nUSA" }, "MailingCity":null, "MailingState":null, "MailingPostalCode":null, "MailingCountry":null, "MailingLatitude":null, "MailingLongitude":null, "MailingGeocodeAccuracy":null, "Phone":{ "string":"(512) 757-6000" }, "Fax":{ "string":"(512) 757-9000" }, "MobilePhone":{ "string":"(512) 757-9340" }, "HomePhone":null, "OtherPhone":null, "AssistantPhone":null, "ReportsToId":null, "Email":{ "string":"rose@edge.com" }, "Title":{ "string":"SVP, Procurement" }, "Department":{ "string":"Procurement" }, "AssistantName":null, "LeadSource":{ "string":"Trade Show" }, "Birthdate":{ "int":-903 }, "Description":null, "OwnerId":{ "string":"0052v00000ajtG3AAI" }, "CreatedDate":{ "long":1564636138000 }, "CreatedById":{ "string":"0052v00000ajtG3AAI" }, "LastModifiedDate":{ "long":1564636138000 }, "LastModifiedById":{ "string":"0052v00000ajtG3AAI" }, "SystemModstamp":{ "long":1564636138000 }, "LastActivityDate":null, "LastCURequestDate":null, "LastCUUpdateDate":null, "LastViewedDate":{ "long":1573722558000 }, "LastReferencedDate":{ "long":1573722558000 }, "EmailBouncedReason":null, "EmailBouncedDate":null, "IsEmailBounced":{ "boolean":false }, "PhotoUrl":{ "string":"/services/images/photo/0032v00002qXTBlAAO" }, "Jigsaw":null, "JigsawContactId":null, "CleanStatus":{ "string":"Pending" }, "IndividualId":null, "Level__c":{ "string":"Primary" }, "Languages__c":{ "string":"English" } } Additional Documentation ------------------------ .. toctree:: :maxdepth: 1 configuration_options