Salesforce SObject Sink Connector for Confluent Platform

The Kafka Connect Salesforce SObject Sink connector performs CRUD operations (create, read, update, delete) on Salesforce SObjects using data available on Apache Kafka® topics. This connector can be thought of as the inverse of the Salesforce PushTopic Source connector because it can operate on data created from the PushTopic connector.

For example, suppose there are two Salesforce.com organizations or instances (instance A and B) and a data engineer wants to synchronize Salesforce Lead objects from A to B. The data engineer can configure and deploy the PushTopic Source connector to stream a Salesforce Lead from instance A into a single Kafka topic, while the sink connector may be configured to stream a Lead from that topic into instance B. Depending upon the configuration, all changes to Lead SObjects may be synchronized across organizations. This connector can be used with either standalone or distributed Connect workers.

Note that the connector’s salesforce.object property indicates the name of the SObject to operate on. The structure and format of input messages for the sink connector is identical to the output format of the PushTopic Source connector.

Note

The connector supports Salesforce up to API version 65.0.

Features

The Salesforce SObject Sink connector includes the following features:

At least once delivery

This connector guarantees that records are delivered at least once from the Kafka topic.

Dead Letter Queue

This connector supports the Dead Letter Queue (DLQ) functionality. For information about accessing and using the DLQ, see Confluent Platform Dead Letter Queue.

Multiple tasks

The Salesforce SObject Sink connector supports running one or more tasks. You can specify the number of tasks in the tasks.max configuration parameter. This can lead to performance gains when multiple files need to be parsed

Input data formats

The Salesforce SObject Sink connector for Confluent Platform supports the following input data formats: Avro, JSON Schema, and Protobuf. Note that it does not support JSON (schemaless) format.

Client-side encryption

This connector supports Client-Side Field Level Encryption (CSFLE) and Client-Side Payload Encryption (CSPE). For more information, see Manage Client Side Encryption.

Salesforce relationship fields

The connector supports Salesforce relationship fields, that allows you to integrate external data fields within Salesforce sObjects, enhancing data aggregation and business processes. This is useful for populating Lookup relationships using values from external systems (External IDs).

Note

This connector does not currently support polymorphic fields.

Prerequisites

  • A Salesforce lookup field must already exist on your target sObject (for example, RelatedAccount__c pointing to the Account object).

  • The related object must have an indexed field or an External ID field (for example, External_Account_ID__c) to facilitate the relationship mapping.

Process relationship fields

Follow the steps below to process Salesforce relationship fields in your connector:

Enable relationship field

When you configure a sink connector, you must enable relationship field support. In your connector configuration, set the following parameters to false:

  • "skip.object1.relationship.fields": "false"

  • "salesforce.object1.ignore.reference.fields": "false"

Sample Configuration: Below is a sample configuration for a sink connector targeting a custom object (RelationshipDemo__c)

{
  "config": {
    "topics": "RelationshipDemoTopic",
    "schema.context.name": "default",
    "input.data.format": "AVRO",
    "connector.class": "SalesforceSObjectSink",
    "name": "SalesforceSObjectSinkConnector_0",
    "kafka.auth.mode": "KAFKA_API_KEY",
    "kafka.api.key": "********************",
    "kafka.api.secret": "***********************",
    "salesforce.enable.batching": "false",
    "salesforce.grant.type": "CLIENT_CREDENTIALS",
    "salesforce.instance": "https://login.salesforce.com",
    "salesforce.consumer.key": "*************************************************************************************",
    "salesforce.consumer.secret": "****************************************************************",
    "salesforce.object.num": "1",
    "salesforce.object1": "RelationshipDemo__c",
    "salesforce.object1.override.event.type": "false",
    "salesforce.object1.sink.operation": "insert",
    "salesforce.object1.ignore.reference.fields": "false",
    "skip.object1.relationship.fields": "false",
    "salesforce.object1.use.custom.id.field": "false",
    "connection.timeout": "30000",
    "behavior.on.api.errors": "ignore",
    "request.max.retries.time.ms": "30000",
    "behavior.on.null.record": "fail",
    "max.poll.interval.ms": "300000",
    "max.poll.records": "500",
    "tasks.max": "1",
    "value.converter.value.schema.id.deserializer": "io.confluent.kafka.serializers.schema.id.DualSchemaIdDeserializer",
    "value.converter.reference.subject.name.strategy": "DefaultReferenceSubjectNameStrategy",
    "errors.tolerance": "all",
    "value.converter.ignore.default.for.nullables": "false",
    "key.converter.key.schema.id.deserializer": "io.confluent.kafka.serializers.schema.id.DualSchemaIdDeserializer",
    "value.converter.decimal.format": "BASE64",
    "value.converter.value.subject.name.strategy": "TopicNameStrategy",
    "key.converter.key.subject.name.strategy": "TopicNameStrategy",
    "auto.restart.on.user.error": "true"
  }
}
Associated Avro schema for sink topic

To use a relationship field, the Avro schema for the sink topic must define the relationship object (RelatedAccount__r) and the specific external field used for the lookup. In the schema below, RelatedAccount__r is used to resolve and populate the RelatedAccount__c field in Salesforce. The relationship field, RelatedAccount__r, is controlled by the external ID field, External_Account_ID__c.

Avro Schema: The following schema defines the structure for the RelationshipDemo__c sink topic

{
  "connect.name": "io.confluent.salesforce.RelationshipDemo__c",
  "fields": [
    {
      "name": "Id",
      "type": {
        "connect.doc": "Unique identifier for the object.",
        "type": "string"
      }
    },
    {
      "default": null,
      "name": "OwnerId",
      "type": [
        "null",
        "string"
      ]
    },
    {
      "default": null,
      "name": "IsDeleted",
      "type": [
        "null",
        "boolean"
      ]
    },
    {
      "default": null,
      "name": "Name",
      "type": [
        "null",
        "string"
      ]
    },
    {
      "default": null,
      "name": "CreatedDate",
      "type": [
        "null",
        {
          "connect.name": "org.apache.kafka.connect.data.Timestamp",
          "connect.version": 1,
          "logicalType": "timestamp-millis",
          "type": "long"
        }
      ]
    },
    {
      "default": null,
      "name": "CreatedById",
      "type": [
        "null",
        "string"
      ]
    },
    {
      "default": null,
      "name": "LastModifiedDate",
      "type": [
        "null",
        {
          "connect.name": "org.apache.kafka.connect.data.Timestamp",
          "connect.version": 1,
          "logicalType": "timestamp-millis",
          "type": "long"
        }
      ]
    },
    {
      "default": null,
      "name": "LastModifiedById",
      "type": [
        "null",
        "string"
      ]
    },
    {
      "default": null,
      "name": "SystemModstamp",
      "type": [
        "null",
        {
          "connect.name": "org.apache.kafka.connect.data.Timestamp",
          "connect.version": 1,
          "logicalType": "timestamp-millis",
          "type": "long"
        }
      ]
    },
    {
      "default": null,
      "name": "LastViewedDate",
      "type": [
        "null",
        {
          "connect.name": "org.apache.kafka.connect.data.Timestamp",
          "connect.version": 1,
          "logicalType": "timestamp-millis",
          "type": "long"
        }
      ]
    },
    {
      "default": null,
      "name": "LastReferencedDate",
      "type": [
        "null",
        {
          "connect.name": "org.apache.kafka.connect.data.Timestamp",
          "connect.version": 1,
          "logicalType": "timestamp-millis",
          "type": "long"
        }
      ]
    },
    {
      "name": "RelatedAccount__r",
      "type": {
        "connect.name": "RelatedAccount__r",
        "fields": [
          {
            "default": null,
            "name": "External_Account_ID__c",
            "type": [
              "null",
              "string"
            ]
          }
        ],
        "name": "RelatedAccount__r",
        "type": "record"
      }
    },
    {
      "default": null,
      "name": "_ObjectType",
      "type": [
        "null",
        "string"
      ]
    },
    {
      "default": null,
      "name": "_EventType",
      "type": [
        "null",
        "string"
      ]
    }
  ],
  "name": "RelationshipDemo__c",
  "namespace": "io.confluent.salesforce",
  "type": "record"
}
Sample sink record

When sending records to the sink connector for RelationshipDemo__c, use the relationship field (RelatedAccount__r) to specify the external ID of the related record.

{
  "Id": "a00gL00000P85iUQAW",
  "OwnerId": {
    "string": "005gL000005tlt3QAA"
  },
  "IsDeleted": {
    "boolean": false
  },
  "Name": {
    "string": "DemoEntity"
  },
  "CreatedDate": {
    "long": 1761736099000
  },
  "CreatedById": {
    "string": "005gL000005tlt3QAA"
  },
  "LastModifiedDate": {
    "long": 1761736099000
  },
  "LastModifiedById": {
    "string": "005gL000005tlt3QAA"
  },
  "SystemModstamp": {
    "long": 1761736099000
  },
  "LastViewedDate": null,
  "LastReferencedDate": null,
  "RelatedAccount__r": {
    "External_Account_ID__c": {
      "string": "123456"
    }
  },
  "_ObjectType": {
    "string": "RelationshipDemo__c"
  },
  "_EventType": {
    "string": "created"
  }
}

Upon processing this record, a new RelationshipDemo sObject named DemoEntity is created. The connector automatically populates the RelatedAccount__c field with the accountId of the DemoAccount that matches the External_Account_ID__c value of 123456.

Common Issues and Resolutions

Issue

Potential Cause

Resolution

Cannot specify both an external ID reference RelatedAccount__r and a salesforce id, RelatedAccount__c

The record or schema includes both the lookup field (__c) and the relationship reference (__r). Salesforce requires only one identifier for sink operations.

Ensure the record schema uses only one field type. If source data is mixed, route different field types to separate topics for processing.

Field name provided, Name is not an External ID or indexed field for Account

The field used to define the relationship is not an indexed or External ID field.

Use an indexed or External ID field when referencing relationships. Note that idLookup fields are only supported if the referenced field is the same object type as the parent; otherwise, use a field explicitly marked as an External Id.

Limitations

The Salesforce SObject Sink connector has the following limitation(s):

Configuration properties

For a complete list of configuration properties for this connector, see Salesforce SObject Sink Connector Configuration Properties.

Considerations

Note the following when using the Salesforce SObject Sink Connector.

Unexpected errors

When the connector is performing operations on Salesforce SObjects, unexpected errors can occur that halt the connector and must be addressed. The following lists several causes:

  1. Attempting to insert a record that is a duplicate. Rules for determining duplicates are configurable in Salesforce

  2. Attempting to delete, update, or upsert a record that does not exist because the Id field does not match.

  3. Attempting an operation on a field where the Id field value matches a previously deleted Id field value.

  4. Attempting an operation using the salesforce.custom.id.field.name=<externalIdField> configuration property and the field does not exist in the schema for the SObject.

ID field semantics

SObject auto-generate ID

When the Salesforce SObject Sink connector consumes records on Kafka topics which originated from the Confluent Pushtopic Source Connector, an Id field is included that is a sibling of the other fields in the body of the SObject. Users of the sink connector should understand that the Id is only valid within the Salesforce organization from which the record was streamed. For upsert, delete, and update operations, attempting to rely on the Id field causes failures when used on different Salesforce organizations. Inserts always ignore the Id field because Id fields are internally managed in Salesforce. Upsert operations must be used with the external ID config options salesforce.use.custom.id.field=true and salesforce.custom.id.field.name=<externalIdField>.

Caution

For update and delete operations across Salesforce organizations, an external ID must be configured in Salesforce. Also, a custom ID must always be marked as an external ID across both organizations.

External ID

The Salesforce SObject Sink connector can be configured to operate using external ID fields. Under this configuration, the user specifies an external ID field name to use by setting salesforce.custom.id.field.name=<externalIdField>. This external ID field is then populated with the value of the SObject Id field in the Kafka record by the connector during an operation. The external ID field must exist in the SObject schema in Salesforce. If the external ID field does not exist, the connector fails upon trying to reference the external ID field during sink operations. The external ID field may be created using the Salesforce UI or other interfaces.

Reference Id

Id reference fields are fields that associate an SObject to another. These fields may also cause errors when written to an organization from which the Id field value did not originate. The configuration option salesforce.ignore.reference.fields=true solves this issue.

Input topic record format

The input topic record format is expected to be the same as the record format written to output topics by the Confluent Salesforce Pushtopic Source connector. The Kafka key value is not required. For an example, see the Sample Records section.

Read-only fields

Salesforce SObject fields may not be writable by insert, update, or upsert operation because the fields are set with creatable=false or updatable=false attributes within Salesforce. If a write is attempted to a field with these attributes set, the Sink connector excludes the field in the operation rather than fail the entire operation. This behavior is not configurable.

Event type

The Confluent Salesforce SObject Sink Connector Kafka record format contains an _EventType field. This field describes the type of PushTopic event that generated the record, if the record was created by the Confluent Salesforce PushTopic Source Connector. Types are created, updated, and deleted. By default, the sink connector, when processing records, maps the _EventType to either an insert, update, or delete operation on the configured SObject. This behavior can be overridden using the override.event.type=true and salesforce.sink.object.operation=<sink operation> fields. Overriding the event type ignores the _EventType field in the record and obeys the salesforce.sink.object.operation for every record.

Retries

In case of failures caused by network issues, you can configure the connector to retry Salesforce requests (once authentication succeeds) using request.max.retries.time.ms parameters.

The backoff period for each retry attempt uses a randomization function that grows exponentially. But, if the total time spent retrying the request exceeds this duration (15 minutes by default), retries stop and the request fails. This will likely result in task failure.

Retries example

  1. Create a sfso-sink.properties file with the following contents:

    name=RetriesExample
    topics=sfso-messages
    tasks.max=1
    connector.class=io.confluent.salesforce.SalesforceSObjectSinkConnector
    key.converter=org.apache.kafka.connect.json.JsonConverter
    value.converter=org.apache.kafka.connect.json.JsonConverter
    confluent.topic.bootstrap.servers=localhost:9092
    confluent.topic.replication.factor=1
    salesforce.object=Lead
    salesforce.ignore.fields=CleanStatus
    request.max.retries.time.ms=20000
    
  2. Run and validate the connector.

  3. The Connector retries requests for an initial backoff duration of 20000 ms. Retries stop if a Salesforce request is successful.

Note that the default value for request.max.retries.time.ms is 900000 ms (15 minutes).

API Limits

Allocations and limits of the Salesforce account apply to the Salesforce SObject Sink Connector . For example, free org accounts have a cap on the number of api calls that can occur within a 24-hour period. In addition, storage data limits apply by organization type.

Examples

This section includes the following examples:

Note that the salesforce.consumer.key and salesforce.consumer.secret are required properties used for OAuth2 secure authentication by Salesforce.com. You can find more information and tutorials at Salesforce.com. To enable OAuth JWT bearer token support, set the following connector configuration properties:

  • salesforce.username

  • salesforce.consumer.key

  • salesforce.jwt.keystore.path

  • salesforce.jwt.keystore.password

Property-based example

This configuration is typically used with standalone workers. This configuration overrides the record _EventType to perform upsert operations using an external id field named CustomId__c. The config ignores the field CleanStatus in the Kafka source record.

 name=SalesforceSObjectSinkConnector1
 connector.class=io.confluent.salesforce.SalesforceSObjectSinkConnector
 tasks.max=1
 topics=LeadsTopic< Required Configuration >
 salesforce.consumer.key=< Required Configuration >
 salesforce.consumer.secret=< Required Configuration >
 salesforce.object=< Required Configuration >
 salesforce.password=< Required Configuration >
 salesforce.password.token=< Required Configuration >
 salesforce.push.topic.name=< Required Configuration >
 salesforce.username=< Required Configuration >
 salesforce.ignore.fields=CleanStatus
 salesforce.ignore.reference.fields=true
 salesforce.custom.id.field.name=CustomId__c
 salesforce.use.custom.id.field=true
 salesforce.sink.object.operation=upsert
 override.event.type=true
 confluent.topic.bootstrap.servers=localhost:9092
 confluent.topic.replication.factor=1
 confluent.license=

To include your broker, change the confluent.topic.bootstrap.servers property address(es), and for staging or production use, change the confluent.topic.replication.factor to 3. When working on a downloaded Confluent development cluster, or any single broker cluster, use a confluent.topic.replication.factor of 1.

For details about using this connector with Kafka Connect Reporter, see Connect Reporter.

REST-based example

This configuration typically is used with distributed workers. Write the following JSON to connector.json, configure all of the required values, and use the command below to post the configuration to one the distributed connect worker(s). For more information, see Kafka Connect REST API .

 {
   "config" : {
     "name" : "SalesforceSObjectSinkConnector1",
     "connector.class" : "io.confluent.salesforce.SalesforceSObjectSinkConnector",
     "tasks.max" : "1",
     "topics" : "< Required Configuration >",
     "salesforce.consumer.key" : "< Required Configuration >",
     "salesforce.consumer.secret" : "< Required Configuration >",
     "salesforce.object" : "< Required Configuration >",
     "salesforce.password" : "< Required Configuration >",
     "salesforce.password.token" : "< Required Configuration >",
     "salesforce.username" : "< Required Configuration >",
     "confluent.topic.bootstrap.servers": "localhost:9092",
     "confluent.topic.replication.factor": "1",
     "salesforce.sink.object.operation": "delete",
      "override.event.type": "true",
     "confluent.license": " Omit to enable trial mode "
   }
 }

To include your broker address(es), change the confluent.topic.bootstrap.servers property. For staging or production use, change the confluent.topic.replication.factor to 3.

For details about using this connector with Kafka Connect Reporter, see Connect Reporter. Use curl to post a configuration to one of the Kafka Connect Workers. Change http://localhost:8083/ to the endpoint of one of your Kafka Connect worker(s).

curl -s -X POST -H 'Content-Type: application/json' --data @connector.json http://localhost:8083/connectors
curl -s -X PUT -H 'Content-Type: application/json' --data @connector.json http://localhost:8083/connectors/SalesforcePushTopicSourceConnector1/config