Salesforce Platform Events Sink Connector for Confluent Platform¶
Salesforce Platform Events are user-defined publish/subscribe events. The Kafka Connect Salesforce Platform Events Sink connector can be used to publish Platform Events from Apache Kafka® topics to Salesforce. This connector can be used with either standalone or distributed Kafka Connect workers.
The Platform Events Sink connector can be thought of as the inverse of the Platform Events Source connector. It is designed to read Platform Event records from a Kafka topic with data, streamed from the source connector, and then publish this data as new Platform Events to the configured organization.
Note that the connector’s salesforce.platform.event.name
property indicates
the name of the event. Event names must end with __e
. For example,
salesforce.platform.event.name=UserLogout__e
. Platform event definitions
must exist in Salesforce before publishing events to Salesforce. For more
information, see the Salesforce Developer Guide.
Features¶
The Salesforce Platform Events Sink connector includes the following features:
At least once delivery¶
This connector guarantees that records are delivered at least once from the Kafka topic.
Dead Letter Queue¶
This connector supports the Dead Letter Queue (DLQ) functionality. For information about accessing and using the DLQ, see Confluent Platform Dead Letter Queue.
Multiple tasks¶
The Platform Events Sink connector supports running one or more tasks. You can specify
the number of tasks in the tasks.max
configuration parameter. This can lead
to performance gains when multiple files need to be parsed.
Input data formats¶
The Platform Events Sink connector for Confluent Platform supports the following input data formats:
- Avro
- JSON Schema
- Protobuf
Configuration Properties¶
For a complete list of configuration properties for this connector, see Salesforce Platform Events Sink Connector Configuration Properties.
Considerations¶
Note the following when using the Salesforce Platform Events Sink connector.
API limits¶
The Salesforce connector is limited by the allocations and limits of the Salesforce account. For example, free organization accounts have a cap on the number of events that can occur within a 24-hour period.
Retries¶
In case of failures caused by network issues, you can configure the connector to
retry Salesforce requests (once authentication succeeds) using the
request.max.retries.time.ms
parameter.
The backoff period for each retry attempt uses a randomization function that grows exponentially. But, if the total time spent retrying the request exceeds this duration (15 minutes by default), retries stop and the request fails. This will likely result in task failure.
Retries example¶
Create a
sfpe-sink.properties
file with the following contents:name=RetriesExample topics=sfpe-messages tasks.max=1 connector.class=io.confluent.salesforce.SalesforcePlatformEventSinkConnector key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter confluent.topic.bootstrap.servers=localhost:9092 confluent.topic.replication.factor=1 salesforce.platform.event.name=ConfluentEvent__e request.max.retries.time.ms=20000
Run and validate the connector.
The connector retries requests for an initial backoff duration of 20000 milliseconds. Retries stop if a Salesforce request is successful.
Note that the default value for request.max.retries.time.ms
is 900000
milliseconds, or 15 minutes.
Data fields¶
The Platform Event Sink connector publishes only custom fields that are part of the Platform event definition. For example, see the following sample Kafka record in JSON format:
{
"ReplayId": {
"string": "44479"
},
"CreatedDate": {
"long": 1553542991336
},
"CreatedById": {
"string": "0051U000004EE5wQAG"
},
"TextField__c": {
"string": "This is a Text field"
},
"NumberField__c": {
"double": 33
},
"BooleanField__c": {
"boolean": true
},
"_ObjectType": {
"string": "ConfluentEvent__e"
},
"_EventType": {
"string": "VrdbV-4eqHB3hllFiCjIrw"
}
}
The fields defined in the Platform Event definition in Salesforce are the fields
ending in __c
. The connector doesn’t publish other fields such as
ReplayId
or CreatedDate
.
Examples¶
Note that the salesforce.consumer.key
and salesforce.consumer.secret
are
required properties used for OAuth2 secure authentication by Saleforce.com.
Additional information and tutorials are available at Salesforce.com.
You can set the following connector configuration properties to enable OAuth JWT bearer token support:
salesforce.username
salesforce.consumer.key
salesforce.jwt.keystore.path
salesforce.jwt.keystore.password
Property-based example¶
This configuration is used typically along with standalone workers.
name=SFDCPlatformEventsSink1
connector.class=io.confluent.salesforce.SalesforcePlatformEventSinkConnector
tasks.max=1
kafka.topic=< Required Configuration >
salesforce.consumer.key=< Required Configuration >
salesforce.consumer.secret=< Required Configuration >
salesforce.password=< Required Configuration >
salesforce.password.token=< Required Configuration >
salesforce.platform.event.name=< Required Configuration >
salesforce.username=< Required Configuration >
salesforce.initial.start=all
confluent.topic.bootstrap.servers=localhost:9092
confluent.topic.replication.factor=1
confluent.license=
To include your broker address(es), change the
confluent.topic.bootstrap.servers
property and change the
confluent.topic.replication.factor
to 3 for staging or production use.
For details about using this connector with Kafka Connect Reporter, see Connect Reporter.
REST-based example¶
This configuration is typically used with distributed workers.
Write the following JSON to
connector.json
and configure all the required values:Connect Distributed REST example with Platform Event
{ "name" : "SFDCPlatformEventsSink1", "config" : { "connector.class": "io.confluent.salesforce.SalesforcePlatformEventSinkConnector", "tasks.max" : "1", "topics" : "< Required Configuration >", "salesforce.consumer.key" : "< Required Configuration >", "salesforce.consumer.secret" : "< Required Configuration >", "salesforce.password" : "< Required Configuration >", "salesforce.password.token" : "< Required Configuration >", "salesforce.platform.event.name" : "< Required Configuration >", "salesforce.username" : "< Required Configuration >", "confluent.topic.bootstrap.servers": "localhost:9092", "confluent.topic.replication.factor": "1", "confluent.license": " Omit to enable trial mode " } }
Note
- Change the
confluent.topic.bootstrap.servers
property to include your broker address(es), and change theconfluent.topic.replication.factor
to 3 for staging or production use. - For details about using this connector with Kafka Connect Reporter, see Connect Reporter.
- Change the
Use curl to post a configuration to one of the Kafka Connect Workers. Change
http://localhost:8083/
to the endpoint of one of your Kafka Connect worker(s). For more information, see Kafka Connect REST API.Create a new connector:
curl -s -X POST -H 'Content-Type: application/json' --data @connector.json http://localhost:8083/connectors
Update an existing connector:
curl -s -X PUT -H 'Content-Type: application/json' --data @connector.json http://localhost:8083/connectors/SFDCPlatformEventsSink1/config