.. _salesforce-sobject-sink-connector: Salesforce SObject Sink Connector for |cp| ========================================== The |kconnect-long| Salesforce SObject Sink Connector performs CRUD operations (create, update, delete, and insert) on Salesforce SObjects using data available on |ak-tm| topics. This Connector can be thought of as the inverse of the Salesforce PushTopic Source Connector because it can operate on data created from the PushTopic Connector. For example, given two Salesforce.com organizations or Instances: Instance A) and Instance B). Suppose a Data Engineer wants to synchronize Salesforce ``Lead`` objects from A to B. The Data Engineer can configure and deploy the PushTopic Source connector to stream a Salesforce ``Lead`` from Instance A into a single Kafka topic, while the Sink Connector may be configured to stream a ``Lead`` from that topic into Instance B. Depending upon the configuration, all changes to ``Lead`` SObjects may be synchronized across organizations. This connector can be used with either standalone or distributed Connect workers. .. note:: The connector's ``salesforce.object`` property indicates the name of the SObject to operate on. The structure and format of input messages for the sink connector is identical to the output format of the PushTopic Source connector. .. _salesforce-SObject-sink-connector-considerations: Considerations -------------- Note the following when using the Salesforce SObject Sink Connector. .. _salesforce-sobject-sink-connector-considerations-errors: Unexpected errors ^^^^^^^^^^^^^^^^^ When the connector is performing operations on Salesforce SObjects, unexpected errors can occur that halt the connector and must be addressed. The following lists several causes: #. Attempting to insert a record that is a duplicate. Rules for determining duplicates are `configurable in Salesforce `_ #. Attempting to delete, update, or upsert a record that does not exist because the ``Id`` field does not match. #. Attempting an operation on a field where the ``Id`` field value matches a previously deleted ``Id`` field value. #. Attempting an operation using the ``salesforce.custom.id.field.name=`` configuration property and the field does not exist in the schema for the SObject. .. _salesforce-sojbect-sink-connector-considerations-id-semantics: Id field semantics ^^^^^^^^^^^^^^^^^^ SObject auto-generate Id """""""""""""""""""""""" When the Salesforce SObject Sink Connector consumes records on |ak| topics which originated from the Confluent Pushtopic Source Connector, an ``Id`` field is included that is a sibling of the other fields in the body of the SObject. Users of the sink connector should understand that the ``Id`` is only valid within the Salesforce organization from which the record was streamed. For upsert, delete, and update operations, attempting to rely on the ``Id`` field causes failures when used on different Salesforce organizations. Inserts always ignore the ``Id`` field because ``Id`` fields are internally managed in Salesforce. Upsert operations must be used with the external id config options ``salesforce.use.custom.id.field=true`` and ``salesforce.custom.id.field.name=``. External Id """"""""""" The Salesforce SObject Sink Connector can be configured to operate using external id fields. Under this configuration, the user specifies an external id field name to use via the ``salesforce.custom.id.field.name=`` configuration property. This external id field is then populated with the value of the SObject ``Id`` field in the |ak| record by the connector during an operation. The external id field must exist in the SObject schema in Salesforce. If the external id field does not exist, the connector fails upon trying to reference the external id field during sink operations. The field may be created via the Salesforce UI or other interfaces. Reference Id """"""""""""" ``Id`` reference fields are fields that associate an SObject to another. These fields may also cause errors when written to an organization from which the ``Id`` field value did not originate. The configuration option ``salesforce.ignore.reference.fields=true`` solves this issue. .. _salesforce-sobject-sink-connector-considerations-format: Input topic record format ^^^^^^^^^^^^^^^^^^^^^^^^^ The input topic record format is expected to be the same as the record format written to output topics by the Confluent Salesforce Platform Events Source connector. The |ak| key value is not required. Please refer to the :ref:`Sample Records section` for a precise example. .. _salesforce-sobject-sink-connector-considerations-ro: Read-Only fields ^^^^^^^^^^^^^^^^ Salesforce SObject fields may not be writable by insert, update, or upsert operation because the fields are set with ``creatable=false`` or ``updatable=false`` attributes within Salesforce. If a write is attempted to a field with these attributes set, the Sink connector excludes the field in the operation rather than fail the entire operation. This behavior is **not configurable**. .. _salesforce-sobject-sink-connector-considerations-event-type: Event Type ^^^^^^^^^^ The Confluent Salesforce SObject Sink Connector |ak| record format contains an ``_EventType`` field. This field describes the type of PushTopic event that generated the record, if the record was created by the Confluent Salesforce PushTopic Source Connector. Types are ``create``, ``update``, and ``delete``. By default, the sink connector, when processing records, maps the ``_EventType`` to either an ``insert``, ``update``, or ``delete`` operation on the configured SObject. This behavior can be overridden using the ``override.event.type=true`` and ``salesforce.sink.object.operation=`` fields. Overriding the event type ignores the ``_EventType`` field in the record and obeys the ``salesforce.sink.object.operation`` for every record. .. _salesforce-sobject-sink-connector-considerations-api-limits: API Limits ^^^^^^^^^^ `Allocations and limits `_ of the Salesforce account apply to the Salesforce SObject Sink Connector . For example, free org accounts have a cap on the number of api calls that can occur within a 24-hour period. In addition, `storage data limits `_ apply by organization type. Examples -------- .. note:: ``salesforce.consumer.key`` and ``salesforce.consumer.secret`` are required properties used for OAuth2 secure authentication by Saleforce.com. Additional information and tutorials are available at `Salesforce.com `_. Property-based example ^^^^^^^^^^^^^^^^^^^^^^ This configuration is typically used with :ref:`standalone workers `. This configuration overrides the record ``_EventType`` to perform upsert operations using an external id field named ``CustomId__c``. The config ignores the field ``CleanStatus`` in the |ak| source record. .. sourcecode:: text :emphasize-lines: 4,5,6,7,8,9,10,11,12,13,14,15 name=SalesforceSObjectSinkConnector1 connector.class=io.confluent.salesforce.SalesforceSObjectSinkConnector tasks.max=1 topics=LeadsTopic< Required Configuration > salesforce.consumer.key=< Required Configuration > salesforce.consumer.secret=< Required Configuration > salesforce.object=< Required Configuration > salesforce.password=< Required Configuration > salesforce.password.token=< Required Configuration > salesforce.push.topic.name=< Required Configuration > salesforce.username=< Required Configuration > salesforce.ignore.fields=CleanStatus salesforce.ignore.reference.fields=true salesforce.custom.id.field.name=CustomId__c salesforce.use.custom.id.field=true salesforce.sink.object.operation=upsert override.event.type=true confluent.topic.bootstrap.servers=localhost:9092 confluent.topic.replication.factor=1 confluent.license= .. note:: Change the ``confluent.topic.bootstrap.servers`` property to include your broker address(es), and change the ``confluent.topic.replication.factor`` to 3 for staging or production use. When working on a downloaded Confluent development cluster, or any single broker cluster, please use a ``confluent.topic.replication.factor`` of 1. REST-based example ^^^^^^^^^^^^^^^^^^ This configuration typically is used with :ref:`distributed workers `. Write the following JSON to ``connector.json``, configure all of the required values, and use the command below to post the configuration to one the distributed connect worker(s). See |kconnect-long| :ref:`REST API ` for more information. .. sourcecode:: text :emphasize-lines: 6,7,8,9,10,11,12,13,14,15,16,17 { "config" : { "name" : "SalesforceSObjectSinkConnector1", "connector.class" : "io.confluent.salesforce.SalesforceSObjectSinkConnector", "tasks.max" : "1", "topics" : "< Required Configuration >", "salesforce.consumer.key" : "< Required Configuration >", "salesforce.consumer.secret" : "< Required Configuration >", "salesforce.object" : "< Required Configuration >", "salesforce.password" : "< Required Configuration >", "salesforce.password.token" : "< Required Configuration >", "salesforce.username" : "< Required Configuration >", "confluent.topic.bootstrap.servers": "localhost:9092", "confluent.topic.replication.factor": "1", "salesforce.sink.object.operation": "delete", "override.event.type": "true", "confluent.license": " Omit to enable trial mode " } } .. note:: Change the ``confluent.topic.bootstrap.servers`` property to include your broker address(es), and change the ``confluent.topic.replication.factor`` to 3 for staging or production use. Use curl to post a configuration to one of the |kconnect-long| Workers. Change ``http://localhost:8083/`` to the endpoint of one of your |kconnect-long| worker(s). :: curl -s -X POST -H 'Content-Type: application/json' --data @connector.json http://localhost:8083/connectors :: curl -s -X PUT -H 'Content-Type: application/json' --data @connector.json http://localhost:8083/connectors/SalesforcePushTopicSourceConnector1/config Additional Documentation ------------------------ .. toctree:: :maxdepth: 1 salesforce_sobject_sink_connector_config salesforce_sobject_sink_connector_quickstart