Important
You are viewing documentation for an older version of Confluent Platform. For the latest, click here.
Salesforce Bulk API Sink Connector for Confluent Platform¶
The Kafka Connect Salesforce Bulk API Sink Connector integrates Salesforce with Apache Kafka®. The connector performs CRUD operations (insert, update, delete) on Salesforce SObjects using records available in Kafka topics and writes them to Salesforce. These records can be written into Kafka topic using Salesforce PushTopic Source Connector.
For example, given two Salesforce.com organizations or Instances: Instance A
and Instance B. Suppose a Data Engineer wants to synchronize Salesforce
Lead
objects from A to B. The Data Engineer can configure and deploy the
PushTopic Source connector to stream a Salesforce Lead
from Instance A into
a single Kafka topic, while the sink connector may be configured to stream a
Lead
from that topic into Instance B. Depending upon the configuration, all
changes to Lead
SObjects may be synchronized across organizations. This
connector can be used with standalone and distributed Connect workers.
Note
The connector’s salesforce.object
property indicates the name of the SObject to operate on. The structure and format of input messages for the sink connector is identical to the output format of the Salesforce PushTopic Source connector.
Configuration Properties¶
For a complete list of configuration properties for the sink connector, see Salesforce Bulk API Sink Connector Configuration Properties.
Considerations¶
Note the following when using the Salesforce Bulk API Sink Connector.
Unexpected errors¶
When the connector is performing operations on Salesforce SObjects, unexpected errors can occur that will be reported. The following lists several reasons why errors may occur:
- Attempting to insert a duplicate record. Rules for determining duplicates are configurable in Salesforce.
- Attempting to delete, update, or upsert a record that does not exist because the
Id
field does not match. - Attempting an operation on a field where the
Id
field value matches a previously deletedId
field value.
ID field semantics¶
When the Salesforce Bulk API Sink Connector consumes records on Kafka topics
which originated from the Salesforce PushTopic Source connector, an Id
field is included that is a
sibling of the other fields in the body of the SObject. Note that the Id
is
only valid within the Salesforce organization from which the record was
streamed. For upsert, delete, and update operations, attempting to rely on the
Id
field causes failures when used on different Salesforce organizations.
Inserts always ignore the Id
field because Id
fields are internally
managed in Salesforce. Upsert operations must be used with the external id
config options salesforce.use.custom.id.field=true
and
salesforce.custom.id.field.name=<externalIdField>
.
Caution
For update and delete operations across Salesforce organizations, an external ID must be configured in Salesforce. Also, a custom ID must always be marked as an external ID across both organizations.
Input topic record format¶
The input topic record format is expected to be the same as the record format written to output topics by the Salesforce PushTopic Source connector. The Kafka key value is not required. Please refer to the Sample Records section for an example.
Read-Only fields¶
Salesforce SObject fields may not be writable by insert, update, or upsert
operation because the fields are set with creatable=false
or
updatable=false
attributes within Salesforce. If a write is attempted to a
field with these attributes set, the sink connector excludes the field in the
operation rather than fail the entire operation. This behavior is not
configurable.
Event Type¶
The Salesforce Bulk API sink connector Kafka record format contains an
_EventType
field. This field describes the type of PushTopic event that
generated the record, if the record was created by the Salesforce
PushTopic Source connector. Types are
created
, updated
, and deleted
. When processing records, the sink
connector (by default) maps the _EventType
to either an insert
,
update
, or delete
operation on the configured SObject. This behavior can
be overridden using the override.event.type=true
and
salesforce.sink.object.operation=<sink operation>
fields. Overriding the
event type ignores the _EventType
field in the record and obeys the
salesforce.sink.object.operation
for every record.
API Limits¶
- The Salesforce Bulk API sink connector is limited by number of batches to execute, records per batch, and length of the batch. For detailed limitations, see Bulk API Limits.
- The Salesforce Bulk API supports only upsert operation with External Id field.
Quick Start¶
In this quick start, the Salesforce PushTopic source connector is used to get data into Kafka and the Salesforce Bulk API sink connector is used to export data from Kafka to Salesforce.
Note
For an example of how to get Kafka Connect connected to Confluent Cloud, see Distributed Cluster in Connect Kafka Connect to Confluent Cloud.
Populate a Kafka topic¶
Complete the steps in the Salesforce PushTopic Source Connector Example to get data into Kafka. Note the following:
- After creating the
Lead
and before checking the topic for theLead
object, edit the name of theLead
. Two records should be present when you check the contents of the Kafka topic: a record of _EventTypecreated
and another ofupdated
. - Complete the Salesforce Account steps to create a second account in a separate Salesforce organization from the sink connector. This account is used to configure the sink connector.
Configure the Connector¶
- Prerequisites
- Confluent Platform
- Confluent CLI (requires separate installation)
Note
Add the following Single Message Transform (SMT) to the connector configuration to process records generated by the Salesforce Bulk API Source connector.
"transforms" : "InsertField",
"transforms.InsertField.type" : "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.InsertField.static.field" : "_EventType",
"transforms.InsertField.static.value" : "created"
Create a configuration file named
salesforce-bulk-api-leads-sink-config.json
with the following contents. Make sure to enter a real username, password, security token, consumer key, and consumer secret. For details about configuration properties, see Configuration Properties.{ "name" : "SalesforceBulkApiSinkConnector", "config" : { "connector.class" : "io.confluent.connect.salesforce.SalesforceBulkApiSinkConnector", "tasks.max" : "1", "topics" : "sfdc-pushtopic-lead", "salesforce.object" : "Lead", "salesforce.password" : "< Required Configuration >", "salesforce.password.token" : "< Required Configuration >", "salesforce.username" : "< Required Configuration: secondary organization username >", "reporter.result.topic.replication.factor" : "1", "reporter.error.topic.replication.factor" : "1", "reporter.bootstrap.servers" : "localhost:9092", "confluent.topic.bootstrap.servers": "localhost:9092", "confluent.topic.replication.factor": "1", "confluent.license": " Omit to enable trial mode " } }
Note
For details about using this connector with Kafka Connect Reporter, see Connect Reporter.
Enter the Confluent CLI command to start the Salesforce sink connector.
Caution
You must include a double dash (
--
) between the topic name and your flag. For more information, see this post.Tip
The command syntax for the Confluent CLI development commands changed in 5.3.0. These commands have been moved to
confluent local
. For example, the syntax forconfluent start
is nowconfluent local start
. For more information, see confluent local.confluent local load SalesforceBulkApiSinkConnector -- -d salesforce-bulk-api-leads-sink-config.json
Your output should resemble:
{ "name": "SalesforceBulkApiSinkConnector", "config": { "connector.class" : "io.confluent.connect.salesforce.SalesforceBulkApiSinkConnector", "tasks.max" : "1", "topics" : "sfdc-pushtopic-leads", "salesforce.object" : "Lead", "salesforce.username" : "<Required>" "salesforce.password" : "<Required>", "salesforce.password.token" : "<Required>", "reporter.result.topic.replication.factor" : "1", "reporter.error.topic.replication.factor" : "1", "reporter.bootstrap.servers" : "localhost:9092", "confluent.topic.bootstrap.servers": "localhost:9092", "confluent.topic.replication.factor": "1", "confluent.license": " Omit to enable trial mode " }, "tasks": [ ... ], "type": null }
Tip
The
tasks
field may include information about the one started task.
View Leads in Salesforce¶
Log into the secondary Salesforce organization and verify that the Leads
object exists with the correct name. It should match the primary Salesforce organization.
Running both connectors concurrently¶
- Running both the sink connector and the source connector in separate workers allows for synchronizing SObject changes in near real-time. If running in standalone mode, add a custom port to one of the workers using the
rest.port
configuration property. - Add and change leads as necessary in the primary organization. The source connector captures your changes and writes them to the same topic that the sink connector reads from, and uses as a source for operations.