Salesforce PushTopic Source Connector for Confluent Platform¶
The Kafka Connect Salesforce PushTopic Source connector provides the ability to subscribe to create, update, delete, and undelete events related to Salesforce Objects (SObjects). It writes these events as records in an Apache Kafka® topic. This connector can be used with either standalone or distributed Connect workers.
Note that the connector’s salesforce.push.topic.name
property indicates the name of
the PushTopic. If it does not exist when the connector is started, a PushTopic
with that name is created. For more information, see the Salesforce Developer
Guide.
Features¶
The Salesforce PushTopic Source connector includes the following features:
At least once delivery¶
This connector guarantees that records are delivered at least once to the Kafka topic. If the connector restarts, there may be some duplicate records in the Kafka topic.
Supports one task¶
The Salesforce PushTopic Source Connector supports running only one task.
Limitations¶
The Salesforce PushTopics connector includes the following limitations:
- The connector is limited to one task only.
- The Salesforce connector is limited by the allocations and limits of the Salesforce account. For example, free org accounts have a cap on the number of events that can occur within a 24-hour period.
- The connector skips long text area fields from the value schema to prevent certain issues from arising (for example, MessageTooLargeExceptions). These fields could arbitrarily contain large values which may cause the Kafka messages to become too large.
Configuration properties¶
For a complete list of configuration properties for this connector, see Salesforce PushTopic Source Connector Configuration Properties.
Considerations¶
Note the following when using the Salesforce PushTopic Source connector.
Message replay¶
PushTopics currently retain events for 24 hours. When you create a connector, you can control how stored events are treated when the connector starts for the first time.
- Consume only new events that arrive in the PushTopic after the connector
starts up by setting
salesforce.initial.start
tolatest
. This is the default behavior, due largely to API limits. - Consume all of the events in the PushTopic by setting
salesforce.initial.start
toall
.
Restarts¶
When the connector operates, it periodically records the replay ID of the last record written to Kafka. When the connector is stopped and then restarted within 24 hours, it continues consuming the PushTopic where it stopped, with no missed events. However, if the connector stops for more than 24 hours, some events are discarded in Salesforce before the connector can read them.
If the connector stops unexpectedly due to a failure, it may not record the replay ID of the last record successfully written to Kafka. When the connector restarts, it resumes from the last recorded replay ID. This means that some events may be duplicated in Kafka.
Retries¶
In case of failures caused by network issues, you can configure the connector to
retry Salesforce requests (once authentication succeeds) using
request.max.retries.time.ms
parameters.
The backoff period for each retry attempt uses a randomization function that grows exponentially. But, if the total time spent retrying the request exceeds this duration (15 minutes by default), retries stop and the request fails. This will likely result in task failure.
Retries example¶
Create a
sfpt-source.properties
file with the following contents:name=RetriesExample topics=sfpt-messages tasks.max=1 connector.class=io.confluent.salesforce.SalesforcePushTopicSourceConnector key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter confluent.topic.bootstrap.servers=localhost:9092 confluent.topic.replication.factor=1 salesforce.object=Lead salesforce.push.topic.name=LeadsPushTopic salesforce.push.topic.notify.create=true salesforce.push.topic.notify.delete=false salesforce.push.topic.notify.update=false salesforce.push.topic.notify.undelete=false request.max.retries.time.ms=20000
Run and validate the connector.
The connector retries requests for an initial backoff duration of 20000 ms. Retries stop if a Salesforce request is successful.
Note that the default value for request.max.retries.time.ms
is 900000 ms (15 minutes).
Message order¶
Changes to data in your organization happen in a sequential manner. However,
Salesforce clients (like the connector) may receive out-of-order event
notifications. The connector writes events in the same order that it receives
them. If required, you can set a createdDate
property for topic consumers
that puts the notification messages in event order.
Visibility¶
Only the events captured by a PushTopic are visible to the connector. These events are retained for 24 hours.
Create PushTopics¶
The connector is able to use an existing PushTopic. Or, if no PushTopics with the name exists, it creates one when the connector is started using the following syntax:
SELECT ID, <fieldNames> FROM <objectName>
where <fieldNames>
is the comma-separated list of non-textarea fields, and
<objectName>
is the name of the SObject (for example, Lead
, Case
,
Account
, and so forth). If you want the push topic to be defined
differently, you can create it using the Salesforce UI before running the connector. Note that the
connector discards textarea fields while creating a PushTopic.
Examples¶
Note that salesforce.consumer.key
and salesforce.consumer.secret
are
required properties used for OAuth2 secure authentication by Saleforce.com.
Additional information and tutorials are available at Salesforce.com.
Tip
You can set the following connector configuration properties to enable OAuth JWT bearer token support:
salesforce.username
salesforce.consumer.key
salesforce.jwt.keystore.path
salesforce.jwt.keystore.password
Property-based example¶
This configuration is used typically along with standalone workers.
name=SalesforcePushTopicSourceConnector1
connector.class=io.confluent.salesforce.SalesforcePushTopicSourceConnector
tasks.max=1
kafka.topic=< Required Configuration >
salesforce.consumer.key=< Required Configuration >
salesforce.consumer.secret=< Required Configuration >
salesforce.object=< Required Configuration >
salesforce.password=< Required Configuration >
salesforce.password.token=< Required Configuration >
salesforce.push.topic.name=< Required Configuration >
salesforce.username=< Required Configuration >
salesforce.initial.start=all
confluent.topic.bootstrap.servers=localhost:9092
confluent.topic.replication.factor=1
confluent.license=
To include your broker address(es), change the
confluent.topic.bootstrap.servers
property, and you can change the
confluent.topic.replication.factor
to 3 for staging or production use.
REST-based example¶
This configuration is used typically along with distributed workers.
Create a file named
connector.json
using the following JSON configuration example:Connect Distributed REST example with Push Topic:
{ "name" : "SalesforcePushTopicSourceConnector1", "config" : { "connector.class" : "io.confluent.salesforce.SalesforcePushTopicSourceConnector", "tasks.max" : "1", "kafka.topic" : "< Required Configuration >", "salesforce.consumer.key" : "< Required Configuration >", "salesforce.consumer.secret" : "< Required Configuration >", "salesforce.object" : "< Required Configuration >", "salesforce.password" : "< Required Configuration >", "salesforce.password.token" : "< Required Configuration >", "salesforce.push.topic.name" : "< Required Configuration >", "salesforce.username" : "< Required Configuration >", "salesforce.initial.start" : "all", "confluent.topic.bootstrap.servers": "localhost:9092", "confluent.topic.replication.factor": "1", "confluent.license": " Omit to enable trial mode " } }
To include your broker address(es), change the
confluent.topic.bootstrap.servers
property. You can change theconfluent.topic.replication.factor
to 3 for staging or production use.Use
curl
to post a configuration to one of the Connect workers. Changehttp://localhost:8083/
to the endpoint of one of your Connect worker(s). For more information, see Connect REST API .Create a new connector:
curl -s -X POST -H 'Content-Type: application/json' --data @connectorPushTopic.json http://localhost:8083/connectors
Update an existing connector:
curl -s -X PUT -H 'Content-Type: application/json' --data @connector.json http://localhost:8083/connectors/SalesforcePushTopicSourceConnector1/config
Sample data formats¶
The following are examples of a Salesforce Lead
SObject that is received by
the connector in the original Salesforce JSON format, then converted to a Kafka
record, and stored in a topic in JSON Format.
Raw PushTopic Event received from Salesforce:
{
"data": {
"event": {
"createdDate": "2019-03-25T22:58:26.749Z",
"replayId": 798,
"type": "created"
},
"sobject": {
"LastModifiedDate": "2019-03-25T22:58:26.000Z",
"Company": "Acme Ind",
"Email": "jill.doe@gmail.com",
"Address": {
"GeocodeAccuracy": null,
"State": "Fl",
"Street": "123 Elm Street",
"PostalCode": "33133",
"Country": null,
"Latitude": null,
"City": "Centerville",
"Longitude": null
},
"Latitude": null,
"ProductInterest__c": null,
"MobilePh one": null,
"Name": "Jill Doe",
"Industry": "Communications",
"CreatedById": "0051U000004EE5wQAG",
"SICCode__c": null,
"DandbCompanyId": null,
"PhotoUrl": null,
"ConvertedOpportunityId": null,
"MasterRecordId": null,
"Status": "Open - Not Contacted",
"Co nvertedAccountId": null,
"IsDeleted": false,
"IsConverted": false,
"LastViewedDate": null,
"City": "Centerville",
"Longitude": null,
"CleanStatus": "5",
"LeadSource": "Phone Inquiry",
"GeocodeAccuracy": null,
"Primary__c": null,
"State": "Fl",
"CreatedDate ": "2019-03-25T22:58:26.000Z",
"Country": null,
"LastName": "Doe",
"Id": "00Q1U000006f5Q9UAI",
"AnnualRevenue": null,
"Jigsaw": null,
"EmailBouncedDate": null,
"ConvertedDate": null,
"Rating": "Warm",
"Website": null,
"PostalCode": "33133",
"LastReferenced Date": null,
"NumberOfEmployees": null,
"CompanyDunsNumber": null,
"Salutation": null,
"ConvertedContactId": null,
"OwnerId": "0051U000004EE5wQAG",
"Phone": "555-1212",
"NumberofLocations__c": null,
"EmailBouncedReason": null,
"FirstName": "Jill",
"IsUnr eadByOwner": true,
"Title": null,
"SystemModstamp": "2019-03-25T22:58:26.000Z",
"CurrentGenerators__c": null,
"LastActivityDate": null,
"Fax": null,
"LastModifiedById": "0051U000004EE5wQAG",
"JigsawContactId": null
}
},
"channel": "/topic/LeadsPushTopic "
}
Kafka record key for the Salesforce PushTopic Event:
{
"Id": "00Q1U000006f5Q9UAI"
}
Kafka record value for the Salesforce PushTopic Event:
{
"Id": "00Q1U000006f5Q9UAI",
"IsDeleted": {
"boolean": false
},
"MasterRecordId": null,
"LastName": {
"string": "Doe"
},
"FirstName": {
"string": "Jill"
},
"Salutation": null,
"Name": {
"string": "Jill Doe"
},
"Title": null,
"Company": {
"string": "Acme Ind"
},
"City": {
"string": "Centerville"
},
"State": {
"string": "Fl"
},
"PostalCode": {
"string": "33133"
},
"Country": null,
"Latitude": null,
"Longitude": null,
"GeocodeAccuracy": null,
"Address": {
"io.confluent.salesforce.Address": {
"GeocodeAccuracy": null,
"State": {
"string": "Fl"
},
"Street": {
"string": "123 Elm Street"
},
"PostalCode": {
"string": "33133"
},
"Country": null,
"Latitude": null,
"City": {
"string": "Centerville"
},
"Longitude": null
}
},
"Phone": {
"string": "555-1212"
},
"MobilePhone": null,
"Fax": null,
"Email": {
"string": "jill.doe@gmail.com"
},
"Website": null,
"PhotoUrl": null,
"LeadSource": {
"string": "Phone Inquiry"
},
"Status": {
"string": "Open - Not Contacted"
},
"Industry": {
"string": "Communications"
},
"Rating": {
"string": "Warm"
},
"AnnualRevenue": null,
"NumberOfEmployees": null,
"OwnerId": {
"string": "0051U000004EE5wQAG"
},
"IsConverted": {
"boolean": false
},
"ConvertedDate": null,
"ConvertedAccountId": null,
"ConvertedContactId": null,
"ConvertedOpportunityId": null,
"IsUnreadByOwner": {
"boolean": true
},
"CreatedDate": {
"long": 1553554706000
},
"CreatedById": {
"string": "0051U000004EE5wQAG"
},
"LastModifiedDate": {
"long": 1553554706000
},
"LastModifiedById": {
"string": "0051U000004EE5wQAG"
},
"SystemModstamp": {
"long": 1553554706000
},
"LastActivityDate": null,
"LastViewedDate": null,
"LastReferencedDate": null,
"Jigsaw": null,
"JigsawContactId": null,
"CleanStatus": {
"string": "5"
},
"CompanyDunsNumber": null,
"DandbCompanyId": null,
"EmailBouncedReason": null,
"EmailBouncedDate": null,
"SICCode__c": null,
"ProductInterest__c": null,
"Primary__c": null,
"CurrentGenerators__c": null,
"NumberofLocations__c": null,
"_ObjectType": {
"string": "Lead"
},
"_EventType": {
"string": "created"
}
}
Troubleshoot connector and task failures¶
You can use the Kafka Connect REST API to check
the status of the connectors and tasks. If a task or connector has failed, the
trace
field will include a reason and a stack trace.
Authorization Failures
The Salesforce connector must authenticate with Salesforce and establish a connection. If a connection fails because of authentication or authorization errors, the connector will stop immediately. These errors require changes in your Salesforce account which may include creating OAuth tokens. Try to rerun your connector after you make the account changes.
Salesforce Error Codes
Whenever the connector or task fails, it captures a message that includes the Salesforce error code and error message. The message may also include a failure reason or suggested fix. See the Salesforce error codes for more detail about each error code.
Repeated Connection Failures
The Salesforce connector uses a long-lived connection to the Salesforce service. Periodically this connection is lost due to network lag, service disruptions, or network issues. When the connector loses its connection, it automatically attempts to reconnect so that the connector can continue working. In some cases, however, the error reported by Salesforce indicates that the connector is unable to continue. In these cases, the connector task fails and includes the Salesforce error and reason in the task failure message.
If the connector stops frequently because of connection timeouts, consider changing the following connector configuration properties and restarting the connector:
- The
connection.timeout
defaults to 30 seconds (30000 milliseconds) and dictates the maximum amount of time that the connector should wait for a single connection attempt to succeed. If the initial connection takes longer than this, the connector task will fail. - The
request.max.retries.time.ms
defaults to 15 minutes (900000 milliseconds) and specifies the maximum amount of time that the connector should continue to retry a failed request. The actual duration of the delay is random and it grows exponentially with each retry. If all retries take longer than this value, the connector task will fail. This does not apply to initial connection attempts, but it does apply to subsequent requests to reconnect. Any requests that result in authorization or authentication failures will not be retried.
If the failure rate is not affected by tuning these properties, it may be necessary to look at the Connect worker logs for errors, warnings, and exceptions. The log messages typically have a more detail than the connector status. The log messages preceding failure log messages can provide additional context and may provide you with information that identifies the failure cause.
Enabling Debug Logging¶
The Connect worker log configuration controls how much detail is included in the logs. By default, the worker logs include enough detail to identify basic functionality. Enable DEBUG logs in the Connect worker’s log configuration to include more details. This change must be made on each worker and only takes effect upon worker startup. After you change the log configuration as outlined below on each Connect worker, restart all of the Connect workers. A rolling restart can be used if necessary.
Note
Trace level logging is verbose and contains many more details, and may be
useful to solve certain failures. Trace level logging is enabled like debug
level logging is enabled, except TRACE
is used instead of DEBUG
.
On-Premises Installation
For local or on-premises installations of Confluent Platform, the etc/kafka/connect-log4j.properties
file defines the logging configuration of the Connect worker process. To enable DEBUG on just the Salesforce connector,
modify the etc/kafka/connect-log4j.properties
file to include the following line:
log4j.logger.io.confluent.salesforce=DEBUG
To enable DEBUG on all of the Connect worker’s code, including all connectors, change the log4j.rootLogger=
line to use DEBUG
instead of INFO
. For example, the default log configuration for Connect includes this line:
log4j.rootLogger=INFO, stdout
Change this line to the following to enable DEBUG on all of the Connect worker code:
log4j.rootLogger=DEBUG, stdout
Docker
For Docker containers, the logging configuration is set using environment variables. To enable DEBUG on just the Salesforce connector, use the following environment variable when starting your Confluent Platform Connect container:
CONNECT_LOG4J_ROOT_LOGLEVEL=DEBUG
To enable DEBUG log messages on all Connect worker code, including all connectors, use the following environment variable when starting your Confluent Platform Connect container:
CONNECT_LOG4J_LOGGERS="log4j.logger.io.confluent.salesforce=DEBUG"
The value of this environment variable is a comma-separated list of key-value pairs. For example, the following enables DEBUG on the Salesforce connector and the Connect framework:
CONNECT_LOG4J_LOGGERS="log4j.logger.io.confluent.salesforce=DEBUG,org.apache.kafka.connect=DEBUG"