Salesforce Platform Events Sink Connector for Confluent Platform

Salesforce Platform Events are user-defined publish/subscribe events. The Kafka Connect Salesforce Platform Events sink connector can be used to publish Platform Events from Apache Kafka® topics to Salesforce. This connector can be used with either standalone or distributed Connect workers. The Platform Events Sink Connector can be thought of as the inverse of the Platform Events Source Connector. It is designed to read Platform Event records from a Kafka topic with data streamed from the Source connector and publish this data as new Platform Events to the configured organization.

Note

The connector’s salesforce.platform.event.name property indicates the name of the event. Event names must end with __e. For example, salesforce.platform.event.name=UserLogout__e. Platform event definitions must exist in Salesforce before publishing events to Salesforce. See the Salesforce Developer Guide for more information.

Configuration Properties

For a complete list of configuration properties for this connector, see Salesforce Platform Events Sink Connector Configuration Properties.

Considerations

Note the following when using the Salesforce Platform Events sink connector.

API Limits

The Salesforce connector is limited by the allocations and limits of the Salesforce account. For example, free org accounts have a cap on the number of events that can occur within a 24-hour period.

Retries

In case of failures caused by network issues, you can configure the connector to retry Salesforce requests (once authentication succeeds) using request.max.retries.time.ms parameters.

The backoff period for each retry attempt uses a randomization function that grows exponentially. But, if the total time spent retrying the request exceeds this duration (15 minutes by default), retries stop and the request fails. This will likely result in task failure.

Retries Example

  1. Create a sfpe-sink.properties file with the following contents:

    name=RetriesExample
    topics=sfpe-messages
    tasks.max=1
    connector.class=io.confluent.salesforce.SalesforcePlatformEventSinkConnector
    key.converter=org.apache.kafka.connect.json.JsonConverter
    value.converter=org.apache.kafka.connect.json.JsonConverter
    confluent.topic.bootstrap.servers=localhost:9092
    confluent.topic.replication.factor=1
    salesforce.platform.event.name=ConfluentEvent__e
    request.max.retries.time.ms=20000
    
  2. Run and validate the connector.

  3. The Connector retries requests for an initial backoff duration of 20000 ms. Retries stop if a Salesforce request is successful.

Note that the default value for request.max.retries.time.ms is 900000 ms (15 minutes).

Data Fields

The Platform Event Sink Connector publishes only custom fields that are part of the Platform event definition. For example, see the following sample Kafka record in JSON format:

Kafka record value for a Salesforce Platform Event
{
  "ReplayId": {
    "string": "44479"
  },
  "CreatedDate": {
    "long": 1553542991336
  },
  "CreatedById": {
    "string": "0051U000004EE5wQAG"
  },
  "TextField__c": {
    "string": "This is a Text field"
  },
  "NumberField__c": {
    "double": 33
  },
  "BooleanField__c": {
    "boolean": true
  },
  "_ObjectType": {
    "string": "ConfluentEvent__e"
  },
  "_EventType": {
    "string": "VrdbV-4eqHB3hllFiCjIrw"
  }
}

The fields defined in the Platorm Event definition in Salesforce are the fields ending in __c. The connector does not publish other fields such as ReplayId or CreatedDate.

Examples

Note

salesforce.consumer.key and salesforce.consumer.secret are required properties used for OAuth2 secure authentication by Saleforce.com. Additional information and tutorials are available at Salesforce.com.

Property-based example

This configuration is used typically along with standalone workers.

 name=SFDCPlatformEventsSink1
 connector.class=io.confluent.salesforce.SalesforcePlatformEventSinkConnector
 tasks.max=1
 kafka.topic=< Required Configuration >
 salesforce.consumer.key=< Required Configuration >
 salesforce.consumer.secret=< Required Configuration >
 salesforce.password=< Required Configuration >
 salesforce.password.token=< Required Configuration >
 salesforce.platform.event.name=< Required Configuration >
 salesforce.username=< Required Configuration >
 salesforce.initial.start=all
 confluent.topic.bootstrap.servers=localhost:9092
 confluent.topic.replication.factor=1
 confluent.license=

Note

Change the confluent.topic.bootstrap.servers property to include your broker address(es) and change the confluent.topic.replication.factor to 3 for staging or production use.

REST-based example

This configuration is typically used with distributed workers. Write the following JSON to connector.json, configure all of the required values, and use the command below to post the configuration to one of the distributed connect worker(s). See Kafka Connect REST API for more information.

Connect Distributed REST example with Platform Event

 {
   "name" : "SFDCPlatformEventsSink1",
   "config" : {
     "connector.class": "io.confluent.salesforce.SalesforcePlatformEventSinkConnector",
     "tasks.max" : "1",
     "topics" : "< Required Configuration >",
     "salesforce.consumer.key" : "< Required Configuration >",
     "salesforce.consumer.secret" : "< Required Configuration >",
     "salesforce.password" : "< Required Configuration >",
     "salesforce.password.token" : "< Required Configuration >",
     "salesforce.platform.event.name" : "< Required Configuration >",
     "salesforce.username" : "< Required Configuration >",
     "confluent.topic.bootstrap.servers": "localhost:9092",
     "confluent.topic.replication.factor": "1",
     "confluent.license": " Omit to enable trial mode "
   }
 }

Note

Change the confluent.topic.bootstrap.servers property to include your broker address(es), and change the confluent.topic.replication.factor to 3 for staging or production use.

Use curl to post a configuration to one of the Kafka Connect Workers. Change http://localhost:8083/ to the endpoint of one of your Kafka Connect worker(s).

Create a new connector:

curl -s -X POST -H 'Content-Type: application/json' --data @connector.json http://localhost:8083/connectors

Update an existing connector:

curl -s -X PUT -H 'Content-Type: application/json' --data @connector.json http://localhost:8083/connectors/SFDCPlatformEventsSink1/config