Important
You are viewing documentation for an older version of Confluent Platform. For the latest, click here.
Salesforce SObject Sink Connector for Confluent Platform¶
The Kafka Connect Salesforce SObject Sink Connector performs CRUD operations (create, update, delete, and insert) on Salesforce SObjects using data available on Apache Kafka® topics. This Connector can be thought of as the inverse of the Salesforce PushTopic Source Connector because it can operate on data created from the PushTopic Connector.
For example, given two Salesforce.com organizations or Instances: Instance A) and Instance B). Suppose a Data Engineer wants to synchronize Salesforce Lead
objects from A to B. The Data Engineer can configure and deploy the PushTopic Source connector to stream a Salesforce Lead
from Instance A into a single Kafka topic, while the Sink Connector may be configured to stream a Lead
from that topic into Instance B. Depending upon the configuration, all changes to Lead
SObjects may be synchronized across organizations. This connector can be used with either standalone or distributed Connect workers.
Note
The connector’s salesforce.object
property indicates the name of the SObject to operate on. The structure and format of input messages for the sink connector is identical to the output format of the PushTopic Source connector.
Configuration Properties¶
For a complete list of configuration properties for this connector, see Salesforce SObject Sink Connector Configuration Properties.
Considerations¶
Note the following when using the Salesforce SObject Sink Connector.
Unexpected errors¶
When the connector is performing operations on Salesforce SObjects, unexpected errors can occur that halt the connector and must be addressed. The following lists several causes:
- Attempting to insert a record that is a duplicate. Rules for determining duplicates are configurable in Salesforce
- Attempting to delete, update, or upsert a record that does not exist because the
Id
field does not match. - Attempting an operation on a field where the
Id
field value matches a previously deletedId
field value. - Attempting an operation using the
salesforce.custom.id.field.name=<externalIdField>
configuration property and the field does not exist in the schema for the SObject.
ID field semantics¶
SObject auto-generate ID¶
When the Salesforce SObject Sink Connector consumes records on Kafka topics which originated from the Confluent Pushtopic Source Connector, an Id
field is included that is a sibling of the other fields in the body of the SObject. Users of the sink connector should understand that the Id
is only valid within the Salesforce organization from which the record was streamed. For upsert, delete, and update operations, attempting to rely on the Id
field causes failures when used on different Salesforce organizations. Inserts always ignore the Id
field because Id
fields are internally managed in Salesforce. Upsert operations must be used with the external ID config options salesforce.use.custom.id.field=true
and salesforce.custom.id.field.name=<externalIdField>
.
Caution
For update and delete operations across Salesforce organizations, an external ID must be configured in Salesforce. Also, a custom ID must always be marked as an external ID across both organizations.
External ID¶
The Salesforce SObject Sink Connector can be configured to operate using external ID fields. Under this configuration, the user specifies an external ID field name to use via the salesforce.custom.id.field.name=<externalIdField>
configuration property. This external ID field is then populated with the value of the SObject Id
field in the Kafka record by the connector during an operation. The external ID field must exist in the SObject schema in Salesforce. If the external ID field does not exist, the connector fails upon trying to reference the external ID field during sink operations. The external ID field may be created via the Salesforce UI or other interfaces.
Reference Id¶
Id
reference fields are fields that associate an SObject to another. These fields may also cause errors when written to an organization from which the Id
field value did not originate. The configuration option salesforce.ignore.reference.fields=true
solves this issue.
Input topic record format¶
The input topic record format is expected to be the same as the record format written to output topics by the Confluent Salesforce Pushtopic Source connector. The Kafka key value is not required. Please refer to the Sample Records section for a precise example.
Read-Only fields¶
Salesforce SObject fields may not be writable by insert, update, or upsert operation because the fields are set with creatable=false
or updatable=false
attributes within Salesforce. If a write is attempted to a field with these attributes set, the Sink connector excludes the field in the operation rather than fail the entire operation. This behavior is not configurable.
Event Type¶
The Confluent Salesforce SObject Sink Connector Kafka record format contains an _EventType
field. This field describes the type of PushTopic event that generated the record, if the record was created by the Confluent Salesforce PushTopic Source Connector. Types are created
, updated
, and deleted
. By default, the sink connector, when processing records, maps the _EventType
to either an insert
, update
, or delete
operation on the configured SObject. This behavior can be overridden using the override.event.type=true
and salesforce.sink.object.operation=<sink operation>
fields. Overriding the event type ignores the _EventType
field in the record and obeys the salesforce.sink.object.operation
for every record.
Retries¶
In case of failures caused by network issues, you can configure the connector to retry Salesforce requests (once authentication succeeds) using request.max.retries.time.ms
parameters.
The backoff period for each retry attempt uses a randomization function that grows exponentially. But, if the total time spent retrying the request exceeds this duration (15 minutes by default), retries stop and the request fails. This will likely result in task failure.
Retries Example¶
Create a
sfso-sink.properties
file with the following contents:name=RetriesExample topics=sfso-messages tasks.max=1 connector.class=io.confluent.salesforce.SalesforceSObjectSinkConnector key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter confluent.topic.bootstrap.servers=localhost:9092 confluent.topic.replication.factor=1 salesforce.object=Lead salesforce.ignore.fields=CleanStatus request.max.retries.time.ms=20000
Run and validate the connector.
The Connector retries requests for an initial backoff duration of 20000 ms. Retries stop if a Salesforce request is successful.
Note that the default value for request.max.retries.time.ms
is 900000 ms (15 minutes).
API Limits¶
Allocations and limits of the Salesforce account apply to the Salesforce SObject Sink Connector . For example, free org accounts have a cap on the number of api calls that can occur within a 24-hour period. In addition, storage data limits apply by organization type.
Examples¶
Note
salesforce.consumer.key
and salesforce.consumer.secret
are required properties used for OAuth2 secure authentication by Saleforce.com. Additional information and tutorials are available at Salesforce.com.
Property-based example¶
This configuration is typically used with standalone workers. This configuration overrides the record _EventType
to perform upsert operations using an external id field named CustomId__c
. The config ignores the field CleanStatus
in the Kafka source record.
name=SalesforceSObjectSinkConnector1
connector.class=io.confluent.salesforce.SalesforceSObjectSinkConnector
tasks.max=1
topics=LeadsTopic< Required Configuration >
salesforce.consumer.key=< Required Configuration >
salesforce.consumer.secret=< Required Configuration >
salesforce.object=< Required Configuration >
salesforce.password=< Required Configuration >
salesforce.password.token=< Required Configuration >
salesforce.push.topic.name=< Required Configuration >
salesforce.username=< Required Configuration >
salesforce.ignore.fields=CleanStatus
salesforce.ignore.reference.fields=true
salesforce.custom.id.field.name=CustomId__c
salesforce.use.custom.id.field=true
salesforce.sink.object.operation=upsert
override.event.type=true
confluent.topic.bootstrap.servers=localhost:9092
confluent.topic.replication.factor=1
confluent.license=
Note
Change the confluent.topic.bootstrap.servers
property to include your broker address(es), and change the confluent.topic.replication.factor
to 3 for staging or production use. When working on a downloaded Confluent development cluster, or any single broker cluster, please use a confluent.topic.replication.factor
of 1.
Note
For details about using this connector with Kafka Connect Reporter, see Connect Reporter.
REST-based example¶
This configuration typically is used with distributed workers.
Write the following JSON to connector.json
, configure all of the required values, and use the command below to
post the configuration to one the distributed connect worker(s).
See Kafka Connect REST API for more information.
{
"config" : {
"name" : "SalesforceSObjectSinkConnector1",
"connector.class" : "io.confluent.salesforce.SalesforceSObjectSinkConnector",
"tasks.max" : "1",
"topics" : "< Required Configuration >",
"salesforce.consumer.key" : "< Required Configuration >",
"salesforce.consumer.secret" : "< Required Configuration >",
"salesforce.object" : "< Required Configuration >",
"salesforce.password" : "< Required Configuration >",
"salesforce.password.token" : "< Required Configuration >",
"salesforce.username" : "< Required Configuration >",
"confluent.topic.bootstrap.servers": "localhost:9092",
"confluent.topic.replication.factor": "1",
"salesforce.sink.object.operation": "delete",
"override.event.type": "true",
"confluent.license": " Omit to enable trial mode "
}
}
Note
Change the confluent.topic.bootstrap.servers
property to include your broker address(es), and change the confluent.topic.replication.factor
to 3 for staging or production use.
Note
For details about using this connector with Kafka Connect Reporter, see Connect Reporter.
Use curl to post a configuration to one of the Kafka Connect Workers. Change http://localhost:8083/
to the endpoint of
one of your Kafka Connect worker(s).
curl -s -X POST -H 'Content-Type: application/json' --data @connector.json http://localhost:8083/connectors
curl -s -X PUT -H 'Content-Type: application/json' --data @connector.json http://localhost:8083/connectors/SalesforcePushTopicSourceConnector1/config