Salesforce Bulk API Source Connector for Confluent Platform¶
The Salesforce Bulk API Source connector integrates Salesforce.com with Apache Kafka® and provides the capability to pull records and capture changes from Salesforce.com through the Salesforce Bulk Query API.
Salesforce Objects
are standard Salesforce objects. The SalesforceBulkApiSourceConnector
can be
used to pull objects/capture changes and write them to Kafka. You can use this
connector with either standalone or distributed Connect workers.
Features¶
The Salesforce Bulk API Source connector includes the following features:
At least once delivery¶
This connector guarantees that records are delivered at least once to the Kafka topic. If the connector restarts, there may be some duplicate records in the Kafka topic.
Supports one task¶
The Salesforce Bulk API Source connector supports running only one task.
Limitations¶
The following Salesforce object (SObject) error message may be displayed when you are using the Salesforce Bulk API Source connector:
Entity 'Order' is not supported to use PKChunking.
Set the configuration property batch.enable=false
for these SObjects. This
property is available for Salesforce Bulk API Source connector version 1.7.0 or
later.
Considerations¶
Note the following when using the Salesforce Bulk API Source connector.
Restarting¶
When the connector operates, it periodically records the last query time
execution in the Connect offset topic. When the connector is restarted, it
fetches Salesforce objects with a LastModifiedDate
that is later than last
queried time.
API Limits¶
The Salesforce Bulk API connector is limited by non compound fields. For Example, Bulk Query doesn’t support address, location fields. The connector will discard the address and geolocation fields.
Salesforce Object Support¶
The following objects from Salesforce are supported in this version of Kafka Connect Salesforce Bulk API Source connector:
- Account
- Campaign
- CampaignMember
- Case
- Contact
- Contract
- Event
- Group
- Lead
- Opportunity
- OpportunityContactRole
- OpportunityLineItem
- Period
- PricebookEntry
- Product2
- Task
- TaskFeed
- TaskRelation
- User
- UserRole
The Kafka Connect Salesforce Bulk API Source connector also supports custom objects with non-compound fields. The following objects are not supported by Salesforce Bulk API:
- Feed (for example, AccountFeed, AssetFeed, etc.)
- Share (for example, AccountBrandShare, ChannelProgramLevelShare, etc.)
- History (for example, AccountHistory, ActivityHistory, etc.)
- EventRelation (for example, AcceptedEventRelation, DeclinedEventRelation, etc.)
- AggregateResult
- AttachedContentDocument
- CaseStatus
- CaseTeamMember
- CaseTeamRole
- CaseTeamTemplate
- CaseTeamTemplateMember
- CaseTeamTemplateRecord
- CombinedAttachment
- ContentFolderItem
- ContractStatus
- EventWhoRelation
- FolderedContentDocument
- KnowledgeArticleViewStat
- KnowledgeArticleVoteStat
- LookedUpFromActivity
- Name
- NoteAndAttachment
- OpenActivity
- OwnedContentDocument
- PartnerRole
- RecentlyViewed
- ServiceAppointmentStatus
- SolutionStatus
- TaskPriority
- TaskStatus
- TaskWhoRelation
- UserRecordAccess
- WorkOrderLineItemStatus
- WorkOrderStatus
License¶
You can use this connector for a 30-day trial period without a license key.
After 30 days, you must purchase a connector subscription which includes Confluent enterprise license keys to subscribers, along with enterprise-level support for Confluent Platform and your connectors. If you are a subscriber, you can contact Confluent Support at support@confluent.io for more information.
For license properties, see Confluent Platform license. For information about the license topic, see License topic configuration.
Configuration Properties¶
For a complete list of configuration properties for this connector, see Salesforce Bulk API Source Connector Configuration Properties.
Install the Salesforce Bulk API Source Connector¶
You can install this connector by using the Confluent Hub client installation instructions or by manually downloading the ZIP file.
Prerequisites¶
Important
You must install the connector on every machine where Connect will run.
Kafka Broker: Confluent Platform 3.3.0 or later, or Kafka 0.11.0 or later.
Connect: Confluent Platform 4.0.0 or later, or Kafka 1.0.0 or later.
Java Version: 1.8.
Confluent CLI to run Quick Start (requires separate installation).
An installation of the Confluent Hub Client. This is installed by default with Confluent Enterprise.
An installation of the latest (
latest
) connector version.To install the
latest
connector version, navigate to your Confluent Platform installation directory and run the following command:confluent-hub install confluentinc/kafka-connect-salesforce-bulk-api:latest
You can install a specific version by replacing
latest
with a version number as shown in the following example:confluent-hub install confluentinc/kafka-connect-salesforce-bulk-api:2.0.7
Install the connector manually¶
Download and extract the ZIP file for your connector and then follow the manual connector installation instructions.
Quick Start¶
In this quick start, the Salesforce Bulk API Source connector is used to import data from Salesforce to Kafka. Use the following steps to get started:
Create Salesforce developer account using this link if you don’t have it.
Add records to the objects by clicking on App Launcher and selecting the required Salesforce object.
Install the connector through the Confluent Hub Client.
# run from your CP installation directory confluent-hub install confluentinc/kafka-connect-salesforce-bulk-api:latest
Note that by default, the connector will install the plugin into the
share/confluent-hub-components
directory and add the directory to the plugin path. For the plugin path change to take effect, you must restart the Connect worker.Start the services using the Confluent CLI.
confluent local services start
Every service starts in order, printing a message with its status. Note also that the
SalesforceBulkApiSourceConnector
supports a single task only.Starting Zookeeper Zookeeper is [UP] Starting Kafka Kafka is [UP] Starting Schema Registry Schema Registry is [UP] Starting Kafka REST Kafka REST is [UP] Starting Connect Connect is [UP] Starting KSQL Server KSQL Server is [UP] Starting Control Center Control Center is [UP]
Property-based example¶
The following steps provide a property-based example.
Create a configuration file,
salesforce-bulk-api.properties
. This configuration is used typically along with standalone workers.name=SalesforceBulkApiSourceConnector tasks.max=1 connector.class=io.confluent.connect.salesforce.SalesforceBulkApiSourceConnector key.converter=org.apache.kafka.connect.storage.StringConverter value.converter=io.confluent.connect.avro.AvroConverter value.converter.schema.registry.url=http://localhost:8081 salesforce.username=< Required Configuration > salesforce.password=< Required Configuration > salesforce.password.token=< Required Configuration > salesforce.object=< Required Configuration > salesforce.since=< Required Configuration > kafka.topic=< Required Configuration > salesforce.instance=< Required Configuration > confluent.topic.bootstrap.servers=localhost:9092 confluent.topic.replication.factor=1 confluent.license=Omit to enable trial mode
Ensure the configurations in
salesforce-bulk-api.properties
are properly set.Start the Salesforce Bulk API source connector by loading its configuration with the following command:
Caution
You must include a double dash (
--
) between the connector name and your flag. For more information, see this post.confluent local services connect connector load salesforce-bulk-api-source -- -d salesforce-bulk-api.properties { "name" : "SalesforceBulkApiSourceConnector", "config" : { "connector.class", "io.confluent.connect.salesforce.SalesforceBulkApiSourceConnector", "tasks.max" : "1", "key.converter": "org.apache.kafka.connect.storage.StringConverter", "value.converter": "io.confluent.connect.avro.AvroConverter", "value.converter.schema.registry.url": "http://localhost:8081", "kafka.topic" : "< Required Configuration >", "salesforce.password" : "< Required Configuration >", "salesforce.password.token" : "< Required Configuration >", "salesforce.object" : "< Required Configuration >", "salesforce.username" : "< Required Configuration >", "salesforce.since" : "< Required Configuration >", "confluent.topic.bootstrap.servers": "localhost:9092", "confluent.topic.replication.factor": "1", "confluent.license": "" }, "tasks": [] }
Verify the connector starts successfully and review the Connect worker’s log by entering the following:
confluent local services connect log connect
Confirm the connector is in a
RUNNING
state.confluent local services connect connector status SalesforceBulkApiSourceConnector
Confirm messages are being sent to Kafka.
kafka-avro-console-consumer \ --bootstrap-server localhost:9092 \ --property schema.registry.url=http://localhost:8081 \ --topic <topic-name> \ --from-beginning | jq '.'
REST-based example¶
This configuration is used typically along with distributed
workers. Write the following JSON to
connector.json
, configure all of the required values, and use the command
below to post the configuration to one of the distributed connect worker(s). See
Kafka Connect Kafka Connect REST
Interface for more information.
Connect Distributed REST example:
{
"name" : "SalesforceBulkApiSourceConnector",
"config" : {
"connector.class": "io.confluent.connect.salesforce.SalesforceBulkApiSourceConnector",
"tasks.max" : "1",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://localhost:8081",
"kafka.topic" : "< Required Configuration >",
"salesforce.password" : "< Required Configuration >",
"salesforce.password.token" : "< Required Configuration >",
"salesforce.object" : "< Required Configuration >",
"salesforce.username" : "< Required Configuration >",
"salesforce.since" : "< Required Configuration >",
"confluent.topic.bootstrap.servers": "localhost:9092",
"confluent.topic.replication.factor": "1",
"confluent.license": " Omit to enable trial mode "
}
}
Note
Change the confluent.topic.bootstrap.servers
property to include your broker address(es), and change the confluent.topic.replication.factor
to 3 for staging or production use.
Use curl to post a configuration to one of the Kafka Connect Workers. Change http://localhost:8083/
to the endpoint of
one of your Kafka Connect worker(s).
Create a new connector:
curl -sS -X POST -H 'Content-Type: application/json' --data @connector.json http://localhost:8083/connectors
Update an existing connector:
curl -s -X PUT -H 'Content-Type: application/json' --data @connector.json http://localhost:8083/connectors/SalesforceBulkApiSourceConnector/config
Sample data formats¶
The following examples show the JSON document structure of a Salesforce Bulk Query results (for Contact object) as it received by the Saleforce Connector, converted to a Kafka record, and then stored in a topic.
Raw JSON received from Salesforce Bulk Query:
[
{
"attributes" : {
"type" : "Contact",
"url" : "/services/data/v47.0/sobjects/Contact/0032v00002qXTBlAAO"
},
"Id" : "0032v00002qXTBlAAO",
"IsDeleted" : false,
"MasterRecordId" : null,
"AccountId" : "0012v00002RkgUVAAZ",
"LastName" : "Gonzalez",
"FirstName" : "Rose",
"Salutation" : "Ms.",
"Name" : "Rose Gonzalez",
"OtherStreet" : null,
"OtherCity" : null,
"OtherState" : null,
"OtherPostalCode" : null,
"OtherCountry" : null,
"OtherLatitude" : null,
"OtherLongitude" : null,
"OtherGeocodeAccuracy" : null,
"MailingStreet" : "313 Constitution Place\nAustin, TX 78767\nUSA",
"MailingCity" : null,
"MailingState" : null,
"MailingPostalCode" : null,
"MailingCountry" : null,
"MailingLatitude" : null,
"MailingLongitude" : null,
"MailingGeocodeAccuracy" : null,
"Phone" : "(512) 757-6000",
"Fax" : "(512) 757-9000",
"MobilePhone" : "(512) 757-9340",
"HomePhone" : null,
"OtherPhone" : null,
"AssistantPhone" : null,
"ReportsToId" : null,
"Email" : "rose@edge.com",
"Title" : "SVP, Procurement",
"Department" : "Procurement",
"AssistantName" : null,
"LeadSource" : "Trade Show",
"Birthdate" : "1967-07-14",
"Description" : null,
"OwnerId" : "0052v00000ajtG3AAI",
"CreatedDate" : 1564636138000,
"CreatedById" : "0052v00000ajtG3AAI",
"LastModifiedDate" : 1564636138000,
"LastModifiedById" : "0052v00000ajtG3AAI",
"SystemModstamp" : 1564636138000,
"LastActivityDate" : null,
"LastCURequestDate" : null,
"LastCUUpdateDate" : null,
"LastViewedDate" : 1573528066000,
"LastReferencedDate" : 1573528066000,
"EmailBouncedReason" : null,
"EmailBouncedDate" : null,
"IsEmailBounced" : false,
"PhotoUrl" : "/services/images/photo/0032v00002qXTBlAAO",
"Jigsaw" : null,
"JigsawContactId" : null,
"CleanStatus" : "Pending",
"IndividualId" : null,
"Level__c" : "Primary",
"Languages__c" : "English"
}
]
Kafka record value:
{
"Id":"0032v00002qXTBlAAO",
"IsDeleted":{
"boolean":false
},
"MasterRecordId":null,
"AccountId":{
"string":"0012v00002RkgUVAAZ"
},
"LastName":{
"string":"Gonzalez"
},
"FirstName":{
"string":"Rose"
},
"Salutation":{
"string":"Ms."
},
"Name":{
"string":"Rose Gonzalez"
},
"OtherStreet":null,
"OtherCity":null,
"OtherState":null,
"OtherPostalCode":null,
"OtherCountry":null,
"OtherLatitude":null,
"OtherLongitude":null,
"OtherGeocodeAccuracy":null,
"MailingStreet":{
"string":"313 Constitution Place\nAustin, TX 78767\nUSA"
},
"MailingCity":null,
"MailingState":null,
"MailingPostalCode":null,
"MailingCountry":null,
"MailingLatitude":null,
"MailingLongitude":null,
"MailingGeocodeAccuracy":null,
"Phone":{
"string":"(512) 757-6000"
},
"Fax":{
"string":"(512) 757-9000"
},
"MobilePhone":{
"string":"(512) 757-9340"
},
"HomePhone":null,
"OtherPhone":null,
"AssistantPhone":null,
"ReportsToId":null,
"Email":{
"string":"rose@edge.com"
},
"Title":{
"string":"SVP, Procurement"
},
"Department":{
"string":"Procurement"
},
"AssistantName":null,
"LeadSource":{
"string":"Trade Show"
},
"Birthdate":{
"int":-903
},
"Description":null,
"OwnerId":{
"string":"0052v00000ajtG3AAI"
},
"CreatedDate":{
"long":1564636138000
},
"CreatedById":{
"string":"0052v00000ajtG3AAI"
},
"LastModifiedDate":{
"long":1564636138000
},
"LastModifiedById":{
"string":"0052v00000ajtG3AAI"
},
"SystemModstamp":{
"long":1564636138000
},
"LastActivityDate":null,
"LastCURequestDate":null,
"LastCUUpdateDate":null,
"LastViewedDate":{
"long":1573722558000
},
"LastReferencedDate":{
"long":1573722558000
},
"EmailBouncedReason":null,
"EmailBouncedDate":null,
"IsEmailBounced":{
"boolean":false
},
"PhotoUrl":{
"string":"/services/images/photo/0032v00002qXTBlAAO"
},
"Jigsaw":null,
"JigsawContactId":null,
"CleanStatus":{
"string":"Pending"
},
"IndividualId":null,
"Level__c":{
"string":"Primary"
},
"Languages__c":{
"string":"English"
}
}