Salesforce Bulk API Sink Connector for Confluent Platform¶
The Kafka Connect Salesforce Bulk API Sink connector integrates Salesforce with Apache Kafka®. The connector performs CRUD operations (insert, update, delete) on Salesforce SObjects using records available in Kafka topics and writes them to Salesforce. These records can be written into Kafka topic using Salesforce PushTopic Source connector.
For example, given two Salesforce.com organizations or Instances: Instance A
and Instance B. If a Data Engineer wants to synchronize Salesforce Lead
objects from A to B, the Data Engineer can configure and deploy the PushTopic
Source connector to stream a Salesforce Lead
from Instance A into a single
Kafka topic, while the sink connector may be configured to stream a Lead
from
that topic into Instance B. Depending upon the configuration, all changes to
Lead
SObjects may be synchronized across organizations. This connector can
be used with standalone and distributed Connect workers.
Note
The connector’s salesforce.object
property indicates the name of the
SObject to operate on. The structure and format of input messages for the
sink connector is identical to the output format of the Salesforce PushTopic
Source connector for Confluent Platform.
Considerations¶
Note the following when using the Salesforce Bulk API Sink connector.
Unexpected errors¶
When the connector is performing operations on Salesforce SObjects, unexpected errors can occur that will be reported. The following lists several reasons why errors may occur:
- Attempting to insert a duplicate record. Rules for determining duplicates are configurable in Salesforce.
- Attempting to delete, update, or upsert a record that does not exist because
the
Id
field does not match. - Attempting an operation on a field where the
Id
field value matches a previously deletedId
field value.
ID field semantics¶
When the Salesforce Bulk API Sink connector consumes records on Kafka topics
which originated from the Salesforce PushTopic Source connector for Confluent
Platform,
an Id
field is included that is a sibling of the other fields in the body of
the SObject. Note that the Id
is only valid within the Salesforce
organization from which the record was streamed. For upsert, delete, and update
operations, attempting to rely on the Id
field causes failures when used on
different Salesforce organizations. Inserts always ignore the Id
field
because Id
fields are internally managed in Salesforce. Upsert operations
must be used with the external id config options
salesforce.use.custom.id.field=true
and
salesforce.custom.id.field.name=<externalIdField>
.
Caution
For update and delete operations across Salesforce organizations, an external ID must be configured in Salesforce. Also, a custom ID must always be marked as an external ID across both organizations.
Input topic record format¶
The input topic record format is expected to be the same as the record format written to output topics by the Salesforce PushTopic Source connector for Confluent Platform. The Kafka key value is not required. For an example, see the Salesforce PushTopic Source connector for Confluent Platform.
Read-Only fields¶
Salesforce SObject fields may not be writable by insert, update, or upsert
operation because the fields are set with creatable=false
or
updatable=false
attributes within Salesforce. If a write is attempted to a
field with these attributes set, the sink connector excludes the field in the
operation rather than fail the entire operation. This behavior is not
configurable.
Event Type¶
The Salesforce Bulk API Sink connector Kafka record format contains an
_EventType
field. This field describes the type of PushTopic event that
generated the record, if the record was created by the Salesforce PushTopic
Source connector for Confluent Platform.
Types are created
, updated
, and deleted
. When processing records,
the sink connector (by default) maps the _EventType
to either an insert
,
update
, or delete
operation on the configured SObject. This behavior can
be overridden using the override.event.type=true
and
salesforce.sink.object.operation=<sink operation>
fields. Overriding the
event type ignores the _EventType
field in the record and obeys the
salesforce.sink.object.operation
for every record.
API Limits¶
- The Salesforce Bulk API Sink connector is limited by number of batches to execute, records per batch, and length of the batch. For detailed limitations, see Bulk API Limits.
- The Salesforce Bulk API supports only upsert operation with External Id field.
License¶
You can use this connector for a 30-day trial period without a license key.
After 30 days, you must purchase a connector subscription which includes Confluent enterprise license keys to subscribers, along with enterprise-level support for Confluent Platform and your connectors. If you are a subscriber, you can contact Confluent Support at support@confluent.io for more information.
For license properties, see Confluent Platform license. For information about the license topic, see License topic configuration.
Configuration Properties¶
For a complete list of configuration properties for the sink connector, see Salesforce Bulk API Sink Connector Configuration Properties.
Install the Salesforce Bulk API Sink Connector¶
You can install this connector by using the Confluent Hub client installation instructions or by manually downloading the ZIP file.
Prerequisites¶
Important
You must install the connector on every machine where Connect will run.
Kafka Broker: Confluent Platform 3.3.0 or later, or Kafka 0.11.0 or later.
Connect: Confluent Platform 4.0.0 or later, or Kafka 1.0.0 or later.
Java Version: 1.8.
Confluent CLI to run Quick Start (requires separate installation).
An installation of the Confluent Hub Client. This is installed by default with Confluent Enterprise.
An installation of the latest (
latest
) connector version.To install the
latest
connector version, navigate to your Confluent Platform installation directory and run the following command:confluent-hub install confluentinc/kafka-connect-salesforce-bulk-api:latest
You can install a specific version by replacing
latest
with a version number as shown in the following example:confluent-hub install confluentinc/kafka-connect-salesforce-bulk-api:2.0.7
Quick Start¶
In this quick start, the Salesforce PushTopic Source connector is used to get data into Kafka and the Salesforce Bulk API Sink connector is used to export data from Kafka to Salesforce.
For an example of how to get Kafka Connect connected to Confluent Cloud, see Connect Self-Managed Kafka Connect to Confluent Cloud.
Populate a Kafka topic¶
Complete the steps in the Example: Configure Salesforce PushTopic Source connector to get data into Kafka. Note the following:
- After creating the
Lead
and before checking the topic for theLead
object, edit the name of theLead
. Two records should be present when you check the contents of the Kafka topic: a record of _EventTypecreated
and another ofupdated
. - Complete the Salesforce Account steps to create a second account in a separate Salesforce organization from the sink connector. This account is used to configure the sink connector.
Configure the connector¶
- Prerequisites
- Confluent Platform
- Confluent CLI (requires separate installation)
To configure the connector, complete the following steps:
Note
You can add the following Single Message Transform (SMT) to the connector configuration to process records generated by the Salesforce Bulk API Source connector.
"transforms" : "InsertField",
"transforms.InsertField.type" : "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.InsertField.static.field" : "_EventType",
"transforms.InsertField.static.value" : "created"
Create a configuration file named
salesforce-bulk-api-leads-sink-config.json
with the following contents. Ensure you enter a real username, password, security token, consumer key, and consumer secret. For details about configuration properties, see Configuration Properties.{ "name" : "SalesforceBulkApiSinkConnector", "config" : { "connector.class" : "io.confluent.connect.salesforce.SalesforceBulkApiSinkConnector", "tasks.max" : "1", "topics" : "sfdc-pushtopic-lead", "salesforce.object" : "Lead", "salesforce.password" : "< Required Configuration >", "salesforce.password.token" : "< Required Configuration >", "salesforce.username" : "< Required Configuration: secondary organization username >", "reporter.result.topic.replication.factor" : "1", "reporter.error.topic.replication.factor" : "1", "reporter.bootstrap.servers" : "localhost:9092", "confluent.topic.bootstrap.servers": "localhost:9092", "confluent.topic.replication.factor": "1", "confluent.license": " Omit to enable trial mode " } }
For details about using this connector with Kafka Connect Reporter, see Connect Reporter.
Enter the Confluent CLI command to start the Salesforce Sink connector.
Caution
You must include a double dash (
--
) between the topic name and your flag. For more information, see this post.confluent local services connect connector load SalesforceBulkApiSinkConnector -- -d salesforce-bulk-api-leads-sink-config.json
Your output should resemble:
{ "name": "SalesforceBulkApiSinkConnector", "config": { "connector.class" : "io.confluent.connect.salesforce.SalesforceBulkApiSinkConnector", "tasks.max" : "1", "topics" : "sfdc-pushtopic-leads", "salesforce.object" : "Lead", "salesforce.username" : "<Required>" "salesforce.password" : "<Required>", "salesforce.password.token" : "<Required>", "reporter.result.topic.replication.factor" : "1", "reporter.error.topic.replication.factor" : "1", "reporter.bootstrap.servers" : "localhost:9092", "confluent.topic.bootstrap.servers": "localhost:9092", "confluent.topic.replication.factor": "1", "confluent.license": " Omit to enable trial mode " }, "tasks": [ ... ], "type": null }
Tip
The
tasks
field may include information about the one started task.
View Leads in Salesforce¶
Log into the secondary Salesforce organization and verify the Leads
object
exists with the correct name. It should match the primary Salesforce
organization.
Running Both Connectors Concurrently¶
Running both the sink connector and the source connector in separate workers
allows for synchronizing SObject changes in near real-time. If running in
standalone mode, add a custom port to one of the workers using the listeners
configuration property.
Add and change leads as necessary in the primary organization. The source connector captures your changes and writes them to the same topic that the sink connector reads from, and uses as a source for operations.