Salesforce Bulk API Source Connector for Confluent Cloud

The Salesforce Bulk API Source connector for Confluent Cloud integrates Salesforce.com with Apache Kafka®. This connector pulls records and captures changes from Salesforce.com using the Salesforce Bulk Query API.

Important

If you are still on Confluent Cloud Enterprise, please contact your Confluent Account Executive for more information about using this connector.

Features

The Salesforce Bulk API Source connector provides the following features:

  • At least once delivery: The connector guarantees that records are delivered at least once to the Kafka topic. If the connector restarts, there could be duplicate records in the Kafka topic.
  • Supported data formats: The connector supports Avro, JSON Schema, Protobuf, or JSON (schemaless) output data. Schema Registry must be enabled to use a Schema Registry-based format (for example, Avro, JSON_SR (JSON Schema), or Protobuf).
  • Tasks per connector: Organizations can run multiple connectors with a limit of one task per connector (that is, "tasks.max": "1").
  • Supported SObjects: See the following lists for supported and unsupported Salesforce objects. See Confluent Cloud connector limitations for additional information.

The following Salesforce objects are supported by this connector:

  • Account
  • Campaign
  • CampaignMember
  • Case
  • Contact
  • Contract
  • Event
  • Group
  • Lead
  • Opportunity
  • OpportunityContactRole
  • OpportunityLineItem
  • Period
  • PricebookEntry
  • Product2
  • Task
  • TaskFeed
  • TaskRelation
  • User
  • UserRole

For more information and examples to use with the Confluent Cloud API for Connect, see the Confluent Cloud API for Connect section.

See Configuration Properties for configuration property values and descriptions. See Confluent Cloud connector limitations for additional information.

Quick Start

Use this quick start to get up and running with the Salesforce Bulk API Source connector. The quick start provides the basics of selecting the connector and configuring it to capture records and record changes from Salesforce.

Prerequisites
  • Authorized access to a Confluent Cloud cluster on Amazon Web Services (AWS), Microsoft Azure (Azure), or Google Cloud Platform (GCP).
  • Salesforce account credentials.
  • The Confluent Cloud CLI installed and configured for the cluster. See Install the Confluent Cloud CLI.
  • Schema Registry must be enabled to use a Schema Registry-based format (for example, Avro, JSON_SR (JSON Schema), or Protobuf).
  • At least one topic must exist in your Confluent Cloud cluster before creating the connector.
  • For networking considerations, see Internet access to resources. To use static egress IPs, see Static Egress IP Addresses.
  • Kafka cluster credentials. You can use one of the following ways to get credentials:
    • Create a Confluent Cloud API key and secret. To create a key and secret, you can use the Confluent Cloud CLI or you can autogenerate the API key and secret directly in the Cloud Console when setting up the connector.
    • Create a Confluent Cloud service account for the connector. Make sure to review the ACL entries required in the service account documentation. Some connectors have specific ACL requirements.

Using the Confluent Cloud Console

Step 1: Launch your Confluent Cloud cluster.

See the Quick Start for Apache Kafka using Confluent Cloud for installation instructions.

Step 2: Add a connector.

In the left navigation menu, click Data integration, and then click Connectors. If you already have connectors in your cluster, click + Add connector.

Step 3: Select your connector.

Click the Salesforce Bulk API Source connector icon.

Salesforce Bulk API Source Connector Icon

Important

At least one topic must exist in your Confluent Cloud cluster before creating the connector.

Step 4: Set up the connection.

Complete the following steps and click Continue.

Note

  • Make sure you have all your prerequisites completed.
  • An asterisk ( * ) designates a required entry.
  1. Enter a connector name.
  2. Enter your Kafka Cluster credentials. The credentials are either the API key and secret or the service account API key and secret.
  3. Add your Salesforce connection details.
    • Salesforce instance is optional. If not entered, this property defaults to https://login.salesforce.com. The connector uses the endpoint specified in the authentication response from Salesforce. The remaining Salesforce credentials are required.
    • Salesforce Object is the SObject that the connector polls for new and changed records.
    • Poll interval (ms) is optional. Enter a time in milliseconds (ms) that the connector waits before polling the SObject again. The default is 30000 ms (30 seconds). The maximum value allowed is 300000 (5 minutes).
    • Enable Batching is optional. Enable batching to use PK Chunking for batching records. The default value is false.
    • Enter a Salesforce since date. The connector pulls data starting from this date. The required format is yyyy-MM-dd.
  4. Select an Output message format (data coming from the connector): AVRO, JSON (schemaless), JSON_SR (JSON Schema), or PROTOBUF. A valid schema must be available in Schema Registry to use a schema-based message format (for example, Avro, JSON_SR (JSON Schema), or Protobuf).
  5. Enter the number of tasks in use by the connector. Organizations can run multiple connectors with a limit of one task per connector (that is, "tasks.max": "1").
  6. Transforms and Predicates: See the Single Message Transforms (SMT) documentation for details.

See Configuration Properties for configuration property values and descriptions.

Step 5: Launch the connector.

Verify the connection details by previewing the running configuration. Once you’ve validated that the properties are configured to your satisfaction, click Launch.

Tip

For information about previewing your connector output, see Connector Data Previews.

Launch the connector

Step 6: Check the connector status.

The status for the connector should go from Provisioning to Running. It may take a few minutes.

Check the connector status

Step 7: Check the Kafka topic.

After the connector is running, verify that messages are populating your Kafka topic.

For more information and examples to use with the Confluent Cloud API for Connect, see the Confluent Cloud API for Connect section.

See also

For an example that shows fully-managed Confluent Cloud connectors in action with Confluent Cloud ksqlDB, see the Cloud ETL Demo. This example also shows how to use Confluent Cloud CLI to manage your resources in Confluent Cloud.

../_images/topology.png

Using the Confluent Cloud CLI

Complete the following steps to set up and run the connector using the Confluent Cloud CLI.

Important

  • Make sure you have all your prerequisites completed.
  • At least one topic must exist in your Confluent Cloud cluster before creating the connector.

Step 1: List the available connectors.

Enter the following command to list available connectors:

ccloud connector-catalog list

Step 2: Show the required connector configuration properties.

Enter the following command to show the required connector properties:

ccloud connector-catalog describe <connector-catalog-name>

For example:

ccloud connector-catalog describe SalesforceBulkApiSource

Example output:

Following are the required configs:
connector.class: SalesforceBulkApiSource
name
kafka.api.key
kafka.api.secret
kafka.topic
salesforce.username
salesforce.password
salesforce.password.token
salesforce.object
output.data.format
tasks.max

Step 3: Create the connector configuration file.

Create a JSON file that contains the connector configuration properties. The following example shows the required connector properties.

{
  "connector.class": "SalesforceBulkApiSource,
  "name": "SalesforceBulkApiSource_0",
  "kafka.api.key": "****************",
  "kafka.api.secret": "**********************************************",
  "kafka.topic": "TestBulkAPI",
  "salesforce.username": "<my-username>",
  "salesforce.password": "**************",
  "salesforce.password.token": "************************",
  "salesforce.object": "<SObject-name>","
  "output.data.format": "JSON",
  "tasks.max": "1"
}

Note the following property definitions:

  • "connector.class": Identifies the connector plugin name.
  • "name": Sets a name for your new connector.
  • ""kafka.topic": Enter a Kafka topic name. A topic must exist before launching the connector.
  • ""salesforce.<...>"": Enter the required Salesforce connection details.
  • ""salesforce.object"": The SObject that the connector polls for new and changed records.
  • "output.data.format": Sets the output message format (data coming from the connector). Valid entries are AVRO, JSON_SR, PROTOBUF, or JSON. You must have Confluent Cloud Schema Registry configured if using a schema-based message format (for example, Avro, JSON_SR (JSON Schema), or Protobuf).
  • "tasks.max": Enter the number of tasks in use by the connector. Organizations can run multiple connectors with a limit of one task per connector (that is, "tasks.max": "1").

Single Message Transforms: See the Single Message Transforms (SMT) documentation for details about adding SMTs using the CLI.

See Configuration Properties for configuration property values and descriptions.

Step 4: Load the properties file and create the connector.

Enter the following command to load the configuration and start the connector:

ccloud connector create --config <file-name>.json

For example:

ccloud connector create --config salesforce-bulk-api-source.json

Example output:

Created connector SalesforceBulkApiSource_0 lcc-aj3qr

Step 5: Check the connector status.

Enter the following command to check the connector status:

ccloud connector list

Example output:

ID          |            Name              | Status  |  Type
+-----------+------------------------------+---------+-------+
lcc-aj3qr   | SalesforceBulkApiSource_0    | RUNNING | source

Step 6: Check the Kafka topic.

After the connector is running, verify that messages are populating your Kafka topic.

For more information and examples to use with the Confluent Cloud API for Connect, see the Confluent Cloud API for Connect section.

Configuration Properties

The following entries provide configuration property values and descriptions.

salesforce.instance

The URL of the Salesforce endpoint to use. This directs the connector to use the endpoint specified in the authentication response. If left blank, this defaults to https://login.salesforce.com.

  • Type: string
  • Default: https://login.salesforce.com
  • Valid Values: Valid URL with a scheme of https or http
  • Importance: high
salesforce.username

The Salesforce username the connector should use.

  • Type: string
  • Importance: high
salesforce.password

The Salesforce password the connector should use.

  • Type: password
  • Importance: high
salesforce.password.token

The Salesforce security token associated with the username.

  • Type: password
  • Importance: high
salesforce.object

The SObject that the connector polls for new and changed records.

  • Type: string
  • Importance: high
salesforce.since

The connector pulls starting from this date. The required format is yyyy-MM-dd.

  • Type: string
  • Importance: high
poll.interval.ms

How often the connector queries Salesforce for new and changed records.

  • Type: int
  • Default: 30000 (30 seconds)
  • Valid Values: [min: 8700, max: 300000]
  • Importance: low
batch.enable

Enable batching by applying PK Chunking.

  • Type: boolean
  • Default: false
  • Importance: low

Next Steps

See also

For an example that shows fully-managed Confluent Cloud connectors in action with Confluent Cloud ksqlDB, see the Cloud ETL Demo. This example also shows how to use Confluent Cloud CLI to manage your resources in Confluent Cloud.

../_images/topology.png