Important

You are viewing documentation for an older version of Confluent Platform. For the latest, click here.

Salesforce Bulk API Source Connector for Confluent Platform

The Salesforce Bulk API Source Connector integrates Salesforce.com with Apache Kafka®.

The Salesforce Bulk API Source Connector provides the capability to pull records and capture changes from Salesforce.com via Salesforce Bulk Query API.

Salesforce Objects are standard salesforce objects. The SalesforceBulkApiSourceConnector can be used to pull objects/capture changes and write them to Apache Kafka®. This connector can be used with either standalone or distributed Connect workers.

Prerequisites

The following are required to run the Kafka Connect Salesforce Bulk API Source Connector:

  • Kafka Broker: Confluent Platform 3.3.0 or above, or Kafka 0.11.0 or above
  • Connect: Confluent Platform 4.0.0 or above, or Kafka 1.0.0 or above
  • Java Version: 1.8

Install the Salesforce Bulk API Source Connector

You can install this connector by using the Confluent Hub client (recommended) or you can manually download the ZIP file.

Install the connector using Confluent Hub

Prerequisite
Confluent Hub Client must be installed. This is installed by default with Confluent Enterprise.

Navigate to your Confluent Platform installation directory and run the following command to install the latest (latest) connector version. The connector must be installed on every machine where Connect will run.

confluent-hub install confluentinc/kafka-connect-salesforce-bulk-api:latest

You can install a specific version by replacing latest with a version number. For example:

confluent-hub install confluentinc/kafka-connect-salesforce-bulk-api:1.0.0-preview

Install Connector Manually

Download and extract the ZIP file for your connector and then follow the manual connector installation instructions.

License

You can use this connector for a 30-day trial period without a license key.

After 30 days, this connector is available under a Confluent enterprise license. Confluent issues enterprise license keys to subscribers, along with providing enterprise-level support for Confluent Platform and your connectors. If you are a subscriber, please contact Confluent Support at support@confluent.io for more information.

See Confluent Platform license for license properties and License topic configuration for information about the license topic.

Considerations

Note the following when using the Salesforce Bulk API source connector.

Restarting

When the connector operates, it periodically records the last query time execution in the Connect offset topic. When the connector is restarted, it fetches Salesforce objects with a LastModifiedDate that is later than last queried time.

API Limits

The Salesforce Bulk API connector is limited by non compound fields. For Example, Bulk Query doesn’t support address, location fields. The connector will discard the address and geolocation fields.

Salesforce Object Support

The following objects from Salesforce are supported in this version of Kafka Connect Salesforce Bulk API Source connector:

  • Account
  • Campaign
  • CampaignMember
  • Case
  • Contact
  • Contract
  • Event
  • Group
  • Lead
  • Opportunity
  • OpportunityContactRole
  • OpportunityLineItem
  • Period
  • PricebookEntry
  • Product2
  • Task
  • TaskFeed
  • TaskRelation
  • User
  • UserRole

The Kafka Connect Salesforce Bulk API Source connector also supports custom objects with noncompound fields.

The following objects are not supported by Salesforce Bulk API:

  • Feed (e.g. AccountFeed, AssetFeed, …)
  • Share (e.g. AccountBrandShare, ChannelProgramLevelShare, …)
  • History (e.g. AccountHistory, ActivityHistory, …)
  • EventRelation (e.g. AcceptedEventRelation, DeclinedEventRelation, …)
  • AggregateResult
  • AttachedContentDocument
  • CaseStatus
  • CaseTeamMember
  • CaseTeamRole
  • CaseTeamTemplate
  • CaseTeamTemplateMember
  • CaseTeamTemplateRecord
  • CombinedAttachment
  • ContentFolderItem
  • ContractStatus
  • EventWhoRelation
  • FolderedContentDocument
  • KnowledgeArticleViewStat
  • KnowledgeArticleVoteStat
  • LookedUpFromActivity
  • Name
  • NoteAndAttachment
  • OpenActivity
  • OwnedContentDocument
  • PartnerRole
  • RecentlyViewed
  • ServiceAppointmentStatus
  • SolutionStatus
  • TaskPriority
  • TaskStatus
  • TaskWhoRelation
  • UserRecordAccess
  • WorkOrderLineItemStatus
  • WorkOrderStatus

Quick Start

In this quick start, the Salesforce Bulk API source connector is used to import data from Salesforce to Kafka.

  1. Create Salesforce developer account using this link if you don’t have it.
  2. Add records to the objects by clicking on App Launcher and selecting the required Salesforce object.

Install the connector through the Confluent Hub Client.

# run from your CP installation directory
confluent-hub install confluentinc/kafka-connect-salesforce-bulk-api:latest

Tip

By default, it will install the plugin into share/confluent-hub-components and add the directory to the plugin path. If this is the first connector you have installed, you may need to restart the connect server for the plugin path change to take effect.

Start the services using the Confluent CLI.

confluent local start

Every service starts in order, printing a message with its status.

Starting zookeeper
zookeeper is [UP]
Starting kafka
kafka is [UP]
Starting schema-registry
schema-registry is [UP]
Starting kafka-rest
kafka-rest is [UP]
Starting connect
connect is [UP]
Starting ksql-server
ksql-server is [UP]
Starting control-center
control-center is [UP]

Note

The SalesforceBulkApiSourceConnector supports a single task only.

Property-based example

Create a configuration file, salesforce-bulk-api.properties.This configuration is used typically along with standalone workers.

name=SalesforceBulkApiSourceConnector
tasks.max=1
connector.class=io.confluent.connect.salesforce.SalesforceBulkApiSourceConnector
salesforce.username=< Required Configuration >
salesforce.password=< Required Configuration >
salesforce.password.token=< Required Configuration >
salesforce.object=< Required Configuration >
salesforce.since=< Required Configuration >
kafka.topic=< Required Configuration >
salesforce.instance=< Required Configuration >
confluent.topic.bootstrap.servers=localhost:9092
confluent.topic.replication.factor=1
confluent.license=Omit to enable trial mode

Before starting the connector, make sure that the configurations in salesforce-bulk-api.properties are properly set.

Then start the Salesforce Bulk API source connector by loading its configuration with the following command.

Caution

You must include a double dash (--) between the connector name and your flag. For more information, see this post.

confluent local load salesforce-bulk-api-source -- -d salesforce-bulk-api.properties
{
   "name" : "SalesforceBulkApiSourceConnector",
   "config" : {
     "connector.class", "io.confluent.connect.salesforce.SalesforceBulkApiSourceConnector",
     "tasks.max" : "1",
     "kafka.topic" : "< Required Configuration >",
     "salesforce.password" : "< Required Configuration >",
     "salesforce.password.token" : "< Required Configuration >",
     "salesforce.object" : "< Required Configuration >",
     "salesforce.username" : "< Required Configuration >",
     "salesforce.since" : "< Required Configuration >",
     "confluent.topic.bootstrap.servers": "localhost:9092",
     "confluent.topic.replication.factor": "1",
     "confluent.license": ""
   },
  "tasks": []
 }

Check that the connector started successfully. Review the Connect worker’s log by entering the following:

confluent local log connect

Confirm that the connector is in a RUNNING state.

confluent local status SalesforceBulkApiSourceConnector

Confirm that the messages are being sent to Kafka.

kafka-avro-console-consumer \
    --bootstrap-server localhost:9092 \
    --property schema.registry.url=http://localhost:8081 \
    --topic <topic-name> \
    --from-beginning | jq '.'

REST-based example

This configuration is used typically along with distributed workers. Write the following JSON to connector.json, configure all of the required values, and use the command below to post the configuration to one of the distributed connect worker(s). See Kafka Connect REST API for more information.

Connect Distributed REST example:

{
  "name" : "SalesforceBulkApiSourceConnector",
  "config" : {
    "connector.class", "io.confluent.connect.salesforce.SalesforceBulkApiSourceConnector",
    "tasks.max" : "1",
    "kafka.topic" : "< Required Configuration >",
    "salesforce.password" : "< Required Configuration >",
    "salesforce.password.token" : "< Required Configuration >",
    "salesforce.object" : "< Required Configuration >",
    "salesforce.username" : "< Required Configuration >",
    "salesforce.since" : "< Required Configuration >",
    "confluent.topic.bootstrap.servers": "localhost:9092",
    "confluent.topic.replication.factor": "1",
    "confluent.license": " Omit to enable trial mode "
  }
}

Note

Change the confluent.topic.bootstrap.servers property to include your broker address(es), and change the confluent.topic.replication.factor to 3 for staging or production use.

Use curl to post a configuration to one of the Kafka Connect Workers. Change http://localhost:8083/ to the endpoint of one of your Kafka Connect worker(s).

Create a new connector:

curl -sS -X POST -H 'Content-Type: application/json' --data @connector.json http://localhost:8083/connectors

Update an existing connector:

curl -s -X PUT -H 'Content-Type: application/json' --data @connector.json http://localhost:8083/connectors/SalesforceBulkApiSourceConnector/config

Sample data formats

The following examples show the JSON document structure of a Salesforce Bulk Query results (for Contact object) as it received by the Saleforce Connector, converted to a Kafka record, and then stored in a topic.

Raw JSON received from Salesforce Bulk Query:

[
{
  "attributes" : {
      "type" : "Contact",
      "url" : "/services/data/v47.0/sobjects/Contact/0032v00002qXTBlAAO"
   },
  "Id" : "0032v00002qXTBlAAO",
  "IsDeleted" : false,
  "MasterRecordId" : null,
  "AccountId" : "0012v00002RkgUVAAZ",
  "LastName" : "Gonzalez",
  "FirstName" : "Rose",
  "Salutation" : "Ms.",
  "Name" : "Rose Gonzalez",
  "OtherStreet" : null,
  "OtherCity" : null,
  "OtherState" : null,
  "OtherPostalCode" : null,
  "OtherCountry" : null,
  "OtherLatitude" : null,
  "OtherLongitude" : null,
  "OtherGeocodeAccuracy" : null,
  "MailingStreet" : "313 Constitution Place\nAustin, TX 78767\nUSA",
  "MailingCity" : null,
  "MailingState" : null,
  "MailingPostalCode" : null,
  "MailingCountry" : null,
  "MailingLatitude" : null,
  "MailingLongitude" : null,
  "MailingGeocodeAccuracy" : null,
  "Phone" : "(512) 757-6000",
  "Fax" : "(512) 757-9000",
  "MobilePhone" : "(512) 757-9340",
  "HomePhone" : null,
  "OtherPhone" : null,
  "AssistantPhone" : null,
  "ReportsToId" : null,
  "Email" : "rose@edge.com",
  "Title" : "SVP, Procurement",
  "Department" : "Procurement",
  "AssistantName" : null,
  "LeadSource" : "Trade Show",
  "Birthdate" : "1967-07-14",
  "Description" : null,
  "OwnerId" : "0052v00000ajtG3AAI",
  "CreatedDate" : 1564636138000,
  "CreatedById" : "0052v00000ajtG3AAI",
  "LastModifiedDate" : 1564636138000,
  "LastModifiedById" : "0052v00000ajtG3AAI",
  "SystemModstamp" : 1564636138000,
  "LastActivityDate" : null,
  "LastCURequestDate" : null,
  "LastCUUpdateDate" : null,
  "LastViewedDate" : 1573528066000,
  "LastReferencedDate" : 1573528066000,
  "EmailBouncedReason" : null,
  "EmailBouncedDate" : null,
  "IsEmailBounced" : false,
  "PhotoUrl" : "/services/images/photo/0032v00002qXTBlAAO",
  "Jigsaw" : null,
  "JigsawContactId" : null,
  "CleanStatus" : "Pending",
  "IndividualId" : null,
  "Level__c" : "Primary",
  "Languages__c" : "English"
}
]

Kafka record value:

{
  "Id":"0032v00002qXTBlAAO",
  "IsDeleted":{
    "boolean":false
  },
  "MasterRecordId":null,
  "AccountId":{
    "string":"0012v00002RkgUVAAZ"
  },
  "LastName":{
    "string":"Gonzalez"
  },
  "FirstName":{
    "string":"Rose"
  },
  "Salutation":{
    "string":"Ms."
  },
  "Name":{
    "string":"Rose Gonzalez"
  },
  "OtherStreet":null,
  "OtherCity":null,
  "OtherState":null,
  "OtherPostalCode":null,
  "OtherCountry":null,
  "OtherLatitude":null,
  "OtherLongitude":null,
  "OtherGeocodeAccuracy":null,
  "MailingStreet":{
    "string":"313 Constitution Place\nAustin, TX 78767\nUSA"
  },
  "MailingCity":null,
  "MailingState":null,
  "MailingPostalCode":null,
  "MailingCountry":null,
  "MailingLatitude":null,
  "MailingLongitude":null,
  "MailingGeocodeAccuracy":null,
  "Phone":{
    "string":"(512) 757-6000"
  },
  "Fax":{
    "string":"(512) 757-9000"
  },
  "MobilePhone":{
    "string":"(512) 757-9340"
  },
  "HomePhone":null,
  "OtherPhone":null,
  "AssistantPhone":null,
  "ReportsToId":null,
  "Email":{
    "string":"rose@edge.com"
  },
  "Title":{
    "string":"SVP, Procurement"
  },
  "Department":{
    "string":"Procurement"
  },
  "AssistantName":null,
  "LeadSource":{
    "string":"Trade Show"
  },
  "Birthdate":{
    "int":-903
  },
  "Description":null,
  "OwnerId":{
    "string":"0052v00000ajtG3AAI"
  },
  "CreatedDate":{
    "long":1564636138000
  },
  "CreatedById":{
    "string":"0052v00000ajtG3AAI"
  },
  "LastModifiedDate":{
    "long":1564636138000
  },
  "LastModifiedById":{
    "string":"0052v00000ajtG3AAI"
  },
  "SystemModstamp":{
    "long":1564636138000
  },
  "LastActivityDate":null,
  "LastCURequestDate":null,
  "LastCUUpdateDate":null,
  "LastViewedDate":{
    "long":1573722558000
  },
  "LastReferencedDate":{
    "long":1573722558000
  },
  "EmailBouncedReason":null,
  "EmailBouncedDate":null,
  "IsEmailBounced":{
    "boolean":false
  },
  "PhotoUrl":{
    "string":"/services/images/photo/0032v00002qXTBlAAO"
  },
  "Jigsaw":null,
  "JigsawContactId":null,
  "CleanStatus":{
    "string":"Pending"
  },
  "IndividualId":null,
  "Level__c":{
    "string":"Primary"
  },
  "Languages__c":{
    "string":"English"
  }
}

Additional Documentation