Salesforce Bulk API Sink Connector for Confluent Platform

The Kafka Connect Salesforce Bulk API Sink connector integrates Salesforce with Apache Kafka®. The connector performs CRUD operations (insert, update, delete) on Salesforce SObjects using records available in Kafka topics and writes them to Salesforce. These records can be written into Kafka topic using Salesforce PushTopic Source connector.

For example, given two Salesforce.com organizations or Instances: Instance A and Instance B. If a Data Engineer wants to synchronize Salesforce Lead objects from A to B, the Data Engineer can configure and deploy the PushTopic Source connector to stream a Salesforce Lead from Instance A into a single Kafka topic, while the sink connector may be configured to stream a Lead from that topic into Instance B. Depending upon the configuration, all changes to Lead SObjects may be synchronized across organizations. This connector can be used with standalone and distributed Connect workers.

Note

The connector’s salesforce.object property indicates the name of the SObject to operate on. The structure and format of input messages for the sink connector is identical to the output format of the Salesforce PushTopic Source connector for Confluent Platform.

Considerations

Note the following when using the Salesforce Bulk API Sink connector.

Unexpected errors

When the connector is performing operations on Salesforce SObjects, unexpected errors can occur that will be reported. The following lists several reasons why errors may occur:

  • Attempting to insert a duplicate record. Rules for determining duplicates are configurable in Salesforce.
  • Attempting to delete, update, or upsert a record that does not exist because the Id field does not match.
  • Attempting an operation on a field where the Id field value matches a previously deleted Id field value.

ID field semantics

When the Salesforce Bulk API Sink connector consumes records on Kafka topics which originated from the Salesforce PushTopic Source connector for Confluent Platform, an Id field is included that is a sibling of the other fields in the body of the SObject. Note that the Id is only valid within the Salesforce organization from which the record was streamed. For upsert, delete, and update operations, attempting to rely on the Id field causes failures when used on different Salesforce organizations. Inserts always ignore the Id field because Id fields are internally managed in Salesforce. Upsert operations must be used with the external id config options salesforce.use.custom.id.field=true and salesforce.custom.id.field.name=<externalIdField>.

Caution

For update and delete operations across Salesforce organizations, an external ID must be configured in Salesforce. Also, a custom ID must always be marked as an external ID across both organizations.

Input topic record format

The input topic record format is expected to be the same as the record format written to output topics by the Salesforce PushTopic Source connector for Confluent Platform. The Kafka key value is not required. For an example, see the Salesforce PushTopic Source connector for Confluent Platform.

Read-Only fields

Salesforce SObject fields may not be writable by insert, update, or upsert operation because the fields are set with creatable=false or updatable=false attributes within Salesforce. If a write is attempted to a field with these attributes set, the sink connector excludes the field in the operation rather than fail the entire operation. This behavior is not configurable.

Event Type

The Salesforce Bulk API Sink connector Kafka record format contains an _EventType field. This field describes the type of PushTopic event that generated the record, if the record was created by the Salesforce PushTopic Source connector for Confluent Platform. Types are created, updated, and deleted. When processing records, the sink connector (by default) maps the _EventType to either an insert, update, or delete operation on the configured SObject. This behavior can be overridden using the override.event.type=true and salesforce.sink.object.operation=<sink operation> fields. Overriding the event type ignores the _EventType field in the record and obeys the salesforce.sink.object.operation for every record.

API Limits

  • The Salesforce Bulk API Sink connector is limited by number of batches to execute, records per batch, and length of the batch. For detailed limitations, see Bulk API Limits.
  • The Salesforce Bulk API supports only upsert operation with External Id field.

License

You can use this connector for a 30-day trial period without a license key.

After 30 days, you must purchase a connector subscription which includes Confluent enterprise license keys to subscribers, along with enterprise-level support for Confluent Platform and your connectors. If you are a subscriber, you can contact Confluent Support at support@confluent.io for more information.

For license properties, see Confluent Platform license. For information about the license topic, see License topic configuration.

Configuration Properties

For a complete list of configuration properties for the sink connector, see Salesforce Bulk API Sink Connector Configuration Properties.

Install the Salesforce Bulk API Sink Connector

You can install this connector by using the Confluent Hub client installation instructions or by manually downloading the ZIP file.

Prerequisites

Important

You must install the connector on every machine where Connect will run.

  • Kafka Broker: Confluent Platform 3.3.0 or later, or Kafka 0.11.0 or later.

  • Connect: Confluent Platform 4.0.0 or later, or Kafka 1.0.0 or later.

  • Java Version: 1.8.

  • Confluent CLI to run Quick Start (requires separate installation).

  • An installation of the Confluent Hub Client. This is installed by default with Confluent Enterprise.

  • An installation of the latest (latest) connector version.

    To install the latest connector version, navigate to your Confluent Platform installation directory and run the following command:

    confluent-hub install confluentinc/kafka-connect-salesforce-bulk-api:latest
    

    You can install a specific version by replacing latest with a version number as shown in the following example:

    confluent-hub install confluentinc/kafka-connect-salesforce-bulk-api:2.0.7
    

Quick Start

In this quick start, the Salesforce PushTopic Source connector is used to get data into Kafka and the Salesforce Bulk API Sink connector is used to export data from Kafka to Salesforce.

For an example of how to get Kafka Connect connected to Confluent Cloud, see Connect Self-Managed Kafka Connect to Confluent Cloud.

Populate a Kafka topic

Complete the steps in the Example: Configure Salesforce PushTopic Source connector to get data into Kafka. Note the following:

  • After creating the Lead and before checking the topic for the Lead object, edit the name of the Lead. Two records should be present when you check the contents of the Kafka topic: a record of _EventType created and another of updated.
  • Complete the Salesforce Account steps to create a second account in a separate Salesforce organization from the sink connector. This account is used to configure the sink connector.

Configure the connector

Prerequisites

To configure the connector, complete the following steps:

Note

You can add the following Single Message Transform (SMT) to the connector configuration to process records generated by the Salesforce Bulk API Source connector.

"transforms" : "InsertField",
"transforms.InsertField.type" : "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.InsertField.static.field" : "_EventType",
"transforms.InsertField.static.value" : "created"
  1. Create a configuration file named salesforce-bulk-api-leads-sink-config.json with the following contents. Ensure you enter a real username, password, security token, consumer key, and consumer secret. For details about configuration properties, see Configuration Properties.

    {
       "name" : "SalesforceBulkApiSinkConnector",
       "config" : {
    
         "connector.class" : "io.confluent.connect.salesforce.SalesforceBulkApiSinkConnector",
         "tasks.max" : "1",
         "topics" : "sfdc-pushtopic-lead",
         "salesforce.object" : "Lead",
         "salesforce.password" : "< Required Configuration >",
         "salesforce.password.token" : "< Required Configuration >",
         "salesforce.username" : "< Required Configuration: secondary organization username >",
         "reporter.result.topic.replication.factor" : "1",
         "reporter.error.topic.replication.factor" : "1",
         "reporter.bootstrap.servers" : "localhost:9092",
         "confluent.topic.bootstrap.servers": "localhost:9092",
         "confluent.topic.replication.factor": "1",
         "confluent.license": " Omit to enable trial mode "
       }
     }
    

    For details about using this connector with Kafka Connect Reporter, see Connect Reporter.

  2. Enter the Confluent CLI command to start the Salesforce Sink connector.

    Caution

    You must include a double dash (--) between the topic name and your flag. For more information, see this post.

    confluent local services connect connector load SalesforceBulkApiSinkConnector -- -d salesforce-bulk-api-leads-sink-config.json
    

    Your output should resemble:

    {
       "name": "SalesforceBulkApiSinkConnector",
        "config": {
            "connector.class" : "io.confluent.connect.salesforce.SalesforceBulkApiSinkConnector",
            "tasks.max" : "1",
            "topics" : "sfdc-pushtopic-leads",
            "salesforce.object" : "Lead",
            "salesforce.username" : "<Required>"
            "salesforce.password" : "<Required>",
            "salesforce.password.token" : "<Required>",
            "reporter.result.topic.replication.factor" : "1",
            "reporter.error.topic.replication.factor" : "1",
            "reporter.bootstrap.servers" : "localhost:9092",
            "confluent.topic.bootstrap.servers": "localhost:9092",
            "confluent.topic.replication.factor": "1",
            "confluent.license": " Omit to enable trial mode "
        },
        "tasks": [
            ...
        ],
        "type": null
    }
    

    Tip

    The tasks field may include information about the one started task.

View Leads in Salesforce

Log into the secondary Salesforce organization and verify the Leads object exists with the correct name. It should match the primary Salesforce organization.

Running Both Connectors Concurrently

Running both the sink connector and the source connector in separate workers allows for synchronizing SObject changes in near real-time. If running in standalone mode, add a custom port to one of the workers using the listeners configuration property.

Add and change leads as necessary in the primary organization. The source connector captures your changes and writes them to the same topic that the sink connector reads from, and uses as a source for operations.