Salesforce Source V2 Connector for Confluent Cloud

The fully-managed Salesforce Source V2 connector for Confluent Cloud optionally performs an initial historical load from Salesforce using Bulk API 2.0. The connector then either streams real-time Change Data Capture (CDC) events using the Pub/Sub API or periodically polls Salesforce for changes using Bulk API 2.0, and delivers data to the same Kafka topic per SObject with zero data loss. The connector supports Salesforce up to API version 65.0.

Features

The Salesforce Source V2 connector provides the following features:

  • Real-time ingestion modes: The connector supports an optional historical snapshot for initial data loads using Bulk API 2.0. For real-time ingestion, configure one of the following modes:

    • Event-Driven Sync: Streams real-time CDC events using the Pub/Sub API. This is the default real-time ingestion mode.

    • Periodic Polling: Periodically polls Salesforce for changes using Bulk API 2.0.

    When the historical snapshot is enabled, the connector automatically transitions from snapshot to real-time ingestion on the same Kafka topic with zero data loss.

  • Multi-SObject ingestion: The connector ingests data from up to five Salesforce SObjects per connector instance, each with its own Kafka topic and independent Pub/Sub channel. Topics follow the naming pattern {prefix}.{SObject}, for example, salesforce.Account. The connector processes each SObject with independent state — schema cache, offsets, bulk job, and Pub/Sub subscription — so each SObject’s progress and failures are isolated.

  • Modern Pub/Sub API: The connector uses Salesforce’s recommended Pub/Sub API, which uses gRPC, HTTP/2, and binary Avro encoding, instead of the legacy CometD-based Streaming API.

  • Full record retrieval on update: By default, CDC UPDATE events emit only the changed fields, following Salesforce’s partial-payload semantics. When cdc.full.record.on.update is set to true, the connector calls the Salesforce REST API once per UPDATE event to fetch the complete post-image of the record and emits all field values to Kafka. CDC CREATE events already carry the full new record, and DELETE events have no field payload, so this option affects only UPDATE events, and only when real.time.ingestion.mode is set to Event-Driven Sync (using Pub/Sub API). The option is suited for low-to-moderate update volumes because of the extra REST API call per UPDATE event. High-throughput organizations can exhaust their Salesforce daily API quota.

  • GAP event reconciliation: The connector automatically detects GAP events, which are notifications from the Pub/Sub API that it missed events. For more information, see Gap events in the Salesforce documentation. The connector handles these events according to the gap.event.recovery property:

    • Resync (the default): The connector switches the affected SObject to an incremental Bulk API 2.0 re-sync to recover missed events while other SObjects continue streaming without interruption. To determine the re-sync start time, the connector selects the later timestamp between the commit time of the last successful CDC event and the completion time of the historical snapshot, and then subtracts a safety buffer. For example, if the last event occurs at 10:15 AM, the snapshot finishes at 10:00 AM, and the safety buffer is 5 minutes, the connector selects 10:15 AM and subtracts the buffer to start the re-sync at 10:10 AM.

    • Latest: Skips the gap and resubscribes from the newest available events. Events that occur during the gap are lost.

    • Fail: Stops the task for manual intervention.

    If a GAP event arrives before the connector has any watermark to anchor against — for example, when the historical snapshot is disabled and no CDC events have been emitted yet — the Resync policy falls back to a full table re-fetch using Bulk API 2.0 to guarantee coverage. This prevents data loss but can be expensive on large SObjects.

  • Streaming initial start: The event.sync.start.point property controls the replay position when a Pub/Sub streaming subscription opens with no stored replay ID. This is the case on a cold start or after the offsets are reset or deleted. As soon as the connector emits an event, it stores that event’s replay ID in its offsets and, on any subsequent restart, resumes from the stored replay ID using the CUSTOM preset — event.sync.start.point is not consulted in that case. Note that resetting or deleting the connector’s offsets removes the stored replay ID, so the next start again falls back to event.sync.start.point.

    • latest: The default. Opens with the Salesforce LATEST preset, starting from new events only.

    • all: Opens with the Salesforce EARLIEST replay preset, replaying everything still in the 72-hour Pub/Sub retention buffer.

  • Changed and null field detection: The connector identifies changed and nulled fields by decoding Salesforce CDC hex bitmaps. It exposes the resolved field names as Kafka record headers:

    • source.changed.fields: comma-separated names of fields whose value changed.

    • cdc.nulled.fields: comma-separated names of fields explicitly set to null.

    Downstream consumers such as Flink SQL and ksqlDB can use these headers to distinguish “field unchanged” from “field set to null”.

  • CSFLE and CSPE support: The connector supports Client-Side Field Level Encryption (CSFLE) and client-side per-event encryption (CSPE) for sensitive data. For more information about CSFLE or CSPE setup, see the connector configuration.

  • Supported data formats: The connector supports Avro, JSON Schema, and Protobuf output data. You must enable Schema Registry to use a Schema Registry-based format, for example, Avro, JSON_SR (JSON Schema), or Protobuf.

  • Configurable date and datetime representation: By default, the connector emits Salesforce date and datetime fields as epoch milliseconds. To keep them as ISO 8601 strings, list the field names in skip.epoch.conversion.fields. Changing this list alters the Connect schema type for those fields — int64 for epoch milliseconds, string for ISO 8601 — so ensure your Schema Registry compatibility settings allow the resulting schema evolution.

  • Tombstone records: When emit.tombstone.on.delete is set to true, DELETE CDC events produce tombstone records with a null value and non-null key for Kafka topic compaction. The default is false.

  • Soft-delete handling: When include.deleted.records is set to true, the connector uses the Bulk API queryAll call to include soft-deleted records in historical snapshot and periodic polling modes. In CDC modes, the Pub/Sub API surfaces deletes as DELETE events.

  • Automatic retries: The connector classifies each error by HTTP or gRPC status and retries transient failures with exponential backoff and jitter. Transient failures include:

    • HTTP 429 and 5xx responses.

    • Bulk API daily-quota responses such as HTTP 403 REQUEST_LIMIT_EXCEEDED.

    • Responses with gRPC status UNAVAILABLE, DEADLINE_EXCEEDED, or RESOURCE_EXHAUSTED.

    • Network timeouts.

    The total retry budget is capped by request.max.retries.time.ms, which defaults to 30 seconds.

  • Offset management: The connector supports custom offset management. For more information, see Manage custom offsets.

For more information and examples to use with the Confluent Cloud API for Connect, see the Confluent Cloud API for Connect Usage Examples section.

Historical snapshot to streaming handoff

When historical.snapshot is set to true and real.time.ingestion.mode is set to Event-Driven Sync (using Pub/Sub API), the connector performs a one-time historical backfill using Bulk API 2.0 and then sustains real-time replication using the Pub/Sub API for Change Data Capture. The snapshot and streaming phases overlap in time, so the handoff requires no manual intervention.

Avoiding event loss at the start of streaming

While the bulk job runs, the connector opens a parallel Pub/Sub subscription with the LATEST preset as a one-shot probe. On the first event, the connector stores the replayId in the offset and closes the subscription. At handoff to event-driven sync, the connector opens a fresh subscription with the CUSTOM preset using the stored replayId, so streaming resumes at or before the point the snapshot ended. No events are lost between the end of the bulk job and the start of streaming.

Avoiding duplicate events at the handoff

The brief overlap between the snapshot and streaming phases is protected by a deduplication watermark. When the bulk job completes, the connector records the snapshot completion timestamp, which is the upper bound of the last bulk job. Any Change Data Capture event whose change timestamp is strictly before the snapshot completion timestamp is dropped, because the snapshot already captured that record’s latest state.

GAP event recovery resync

When a GAP event arrives and gap.event.recovery is set to resync, the connector switches the affected SObject to a Bulk API 2.0 re-sync to fetch the missed events. The re-sync starting point depends on whether a CDC event has been emitted for the SObject:

  • Last commit time stored: At least one CDC event has been emitted for the SObject. The connector re-syncs from the commit time of the latest emitted CDC event, minus a safety buffer set by gap.resync.buffer.ms, which defaults to 60 seconds. This is the tightest possible re-fetch.

  • No last commit time: No CDC event has been emitted yet. The connector falls back to the snapshot completion timestamp minus gap.resync.buffer.ms, re-fetching everything since the snapshot ended.

Limitations

Review the following:

Manage custom offsets

You can manage the offsets for this connector. Offsets provide information on the point in the system from which the connector is accessing data. For more information, see Manage Offsets for Fully-Managed Connectors in Confluent Cloud.

To manage offsets:

The Salesforce Source V2 connector emits one offset entry per Salesforce SObject. Each entry tracks the SObject’s progress through the connector’s loading lifecycle. The mode field has three values:

  • Historical Snapshot

  • Periodic Polling

  • Event-Driven Sync

To get the current offset, make a GET request that specifies the environment, Kafka cluster, and connector name.

GET /connect/v1/environments/{environment_id}/clusters/{kafka_cluster_id}/connectors/{connector_name}/offsets
Host: https://api.confluent.cloud

Response:

Successful calls return HTTP 200 with a JSON payload that describes the offset.

{
    "id": "lcc-example123",
    "name": "{connector_name}",
    "offsets": [
      {
        "partition": {
          "sobject": "Contact"
        },
        "offset": {
          "commitNumber": 1779986819752870000,
          "commitTimestamp": 1779986819000,
          "mode": "Event-Driven Sync",
          "offset.version": 1,
          "replayId": "AAAAAAARXWcAAA==",
          "snapshotCompletionTimestamp": 1779986505317
        }
      },
      {
        "partition": {
          "sobject": "Account"
        },
        "offset": {
          "lastId": "001gL00001MoGikQAF",
          "lastSystemModstamp": "2026-05-27T05:48:54.000Z",
          "mode": "Historical Snapshot",
          "offset.version": 1,
          "snapshotCompletionTimestamp": 1779986499361
        }
      }
    ],
    "metadata": {
        "observed_at": "2026-05-28T16:48:53.969580851Z"
    }
}

Responses include the following information:

  • The current loading phase of each SObject, represented by mode in the offset payload.

  • The position within that phase. For the Bulk API-based modes, such as Historical Snapshot and Periodic Polling, the position is represented by lastSystemModstamp, lastId, jobId, and locator. For Event-Driven Sync, the position is represented by snapshotCompletionTimestamp, replayId, commitTimestamp, and commitNumber.

  • The observed time of the offset in the metadata portion of the payload. The observed_at time indicates a snapshot in time for when the API retrieved the offset.

  • Information about the connector.

To update the offset, make a POST request that specifies the environment, Kafka cluster, and connector name. Include a JSON payload that specifies the new offset and a patch type.

POST /connect/v1/environments/{environment_id}/clusters/{kafka_cluster_id}/connectors/{connector_name}/offsets/request
Host: https://api.confluent.cloud

 {
     "type": "PATCH",
     "offsets": [
       {
         "partition": { "sobject": "Contact" },
         "offset": {
           "offset.version": 1,
           "mode": "Event-Driven Sync",
           "snapshotCompletionTimestamp": 1777904179496,
           "replayId": "AAAAAAAAALY=",
           "commitTimestamp": 1777904180123,
           "commitNumber": 12345
         }
       }
     ]
 }

Considerations:

  • You can only make one offset change at a time for a given connector.

  • This is an asynchronous request. To check the status of this request, you must use the check offset status API. For more information, see Get the status of an offset request.

  • For source connectors, the connector attempts to read from the position defined by the requested offsets. For the Salesforce Source V2 connector, the mode field in the offset determines the position: Historical Snapshot continues the initial Bulk API 2.0 historical backfill from lastSystemModstamp and lastId. When those fields are empty, the snapshot starts from the since configuration. Periodic Polling continues incremental Bulk polling from lastSystemModstamp and lastId. Event-Driven Sync resumes Pub/Sub event consumption from replayId.

  • A PATCH updates only the SObjects listed in the request. Omitted SObjects are unaffected. To reset a single SObject without disturbing others, send a null offset for that SObject’s partition.

  • The replayId field is the base64-encoded Salesforce Pub/Sub replay ID for the last consumed CDC event. For more information, see Replay in the Salesforce Pub/Sub API documentation.

  • The connector discards events outside the 72-hour Salesforce retention period. If the requested replayId falls outside the retention window, the connector applies the policy configured by gap.event.recovery.

  • PATCHing mode to Historical Snapshot has no effect in pure streaming mode, that is, when historical.snapshot is false and real.time.ingestion.mode is Event-Driven Sync (using Pub/Sub API). The connector forces streaming at startup regardless of the offset’s mode. Update the connector configuration to enable historical.snapshot if you need to run a Bulk snapshot.

Response:

Successful calls return HTTP 202 Accepted with a JSON payload that describes the offset.

{
    "id": "lcc-example123",
    "name": "{connector_name}",
    "offsets": [
      {
        "partition": {
          "sobject": "Contact"
        },
        "offset": {
          "commitNumber": 12345,
          "commitTimestamp": 1777904180123,
          "mode": "Event-Driven Sync",
          "offset.version": 1,
          "replayId": "AAAAAAAAALY=",
          "snapshotCompletionTimestamp": 1777904179496
        }
      }
    ],
    "requested_at": "2026-05-28T16:51:39.735139217Z",
    "type": "PATCH"
}

Responses include the following information:

  • The requested position of the offsets in the source.

  • The time of the request to update the offset.

  • Information about the connector.

To delete the offset, make a POST request that specifies the environment, Kafka cluster, and connector name. Include a JSON payload that specifies the delete type.

 POST /connect/v1/environments/{environment_id}/clusters/{kafka_cluster_id}/connectors/{connector_name}/offsets/request
 Host: https://api.confluent.cloud

{
  "type": "DELETE"
}

Considerations:

  • Delete requests delete the offset for the provided partition and reset to the base state. A delete request resets the connector to its base state, equivalent to creating a new connector.

  • A DELETE resets the offsets for all SObjects tracked by the connector. To reset a single SObject, send a per-partition null offset using a PATCH request instead.

  • This is an asynchronous request. To check the status of this request, you must use the check offset status API. For more information, see Get the status of an offset request.

  • Do not issue delete and patch requests at the same time.

  • For source connectors, the connector attempts to read from the position defined in the base state.

Response:

Successful calls return HTTP 202 Accepted with a JSON payload that describes the result.

{
  "id": "lcc-example123",
  "name": "{connector_name}",
  "offsets": [],
  "requested_at": "2026-05-04T14:55:15.179333302Z",
  "type": "DELETE"
}

Responses include the following information:

  • Empty offsets.

  • The time of the request to delete the offset.

  • Information about Kafka cluster and connector.

  • The type of request.

To get the status of a previous offset request, make a GET request that specifies the environment, Kafka cluster, and connector name.

GET /connect/v1/environments/{environment_id}/clusters/{kafka_cluster_id}/connectors/{connector_name}/offsets/request/status
Host: https://api.confluent.cloud

The status endpoint always shows the status of the most recent PATCH or DELETE operation.

Response:

Successful calls return HTTP 200 with a JSON payload that describes the result. The following example shows an applied delete.

{
   "request": {
      "id": "lcc-example123",
      "name": "{connector_name}",
      "offsets": [],
      "requested_at": "2026-05-04T14:55:15.179333302Z",
      "type": "DELETE"
   },
   "status": {
      "phase": "APPLIED",
      "message": "The offsets for this connector have been reset successfully."
   },
   "previous_offsets": [
     {
       "partition": {
         "sobject": "Contact"
       },
       "offset": {
         "commitNumber": 12345,
         "commitTimestamp": 1777904180123,
         "mode": "Event-Driven Sync",
         "offset.version": 1,
         "replayId": "AAAAAAAAALY=",
         "snapshotCompletionTimestamp": 1777904179496
       }
     },
     {
       "partition": {
         "sobject": "Account"
       },
       "offset": {
         "lastId": "001gL00001MoGikQAF",
         "lastSystemModstamp": "2026-05-27T05:48:54.000Z",
         "mode": "Historical Snapshot",
         "offset.version": 1,
         "snapshotCompletionTimestamp": 1779986499361
       }
     }
   ],
   "applied_at": "2026-05-04T14:55:34.115765732Z"
}

Responses include the following information:

  • The original request, including the time it was made.

  • The status of the request: applied, pending, or failed.

  • The time you issued the status request.

  • The previous offsets, which represent the offset state before the most recent update. Use these to restore the state of your connector if a patch update causes your connector to fail, or to return a connector to its previous state after rolling back.

JSON payload

The following table describes the unique fields in the JSON payload for managing offsets of the Salesforce Source V2 connector.

Field

Definition

Required or optional

mode

The current loading phase of the SObject. Valid values are Historical Snapshot, Periodic Polling, and Event-Driven Sync. Historical Snapshot continues the initial Bulk API 2.0 historical backfill from lastSystemModstamp and lastId. When those fields are empty, the snapshot starts from the since configuration. Periodic Polling continues incremental Bulk polling from lastSystemModstamp and lastId. Event-Driven Sync resumes Pub/Sub event consumption from replayId.

Required

offset.version

Offset schema version. Always 1.

Required

lastId

Salesforce record ID of the last ingested record (15 or 18 characters). Used in Historical Snapshot and Periodic Polling modes.

Optional

lastSystemModstamp

Timestamp of the last ingested record, formatted as yyyy-MM-dd'T'HH:mm:ss.SSSZ. Used in Historical Snapshot and Periodic Polling modes.

Optional

jobId

Bulk API 2.0 job ID of an in-flight query job. Empty string when no job is currently active. Present on offsets emitted during the snapshot-style modes when the connector is mid-Bulk-poll.

Optional

locator

Bulk API 2.0 result locator for paginated reads of an in-flight job. Empty string when no job is currently active.

Optional

snapshotCompletionTimestamp

Unix timestamp in milliseconds when the Bulk snapshot completed.

Optional

replayId

Base64-encoded Salesforce Pub/Sub API replay ID for the last consumed CDC event. Used in Event-Driven Sync mode. For more information, see Replay in the Salesforce Pub/Sub API documentation.

Optional

commitTimestamp

Unix timestamp in milliseconds of the last consumed CDC event commit. Used in Event-Driven Sync mode.

Optional

commitNumber

Salesforce commit number of the last consumed CDC event. Used in Event-Driven Sync mode.

Optional

Quick start

Use this quick start to get up and running with the Confluent Cloud Salesforce Source V2 connector. The quick start provides the basics of selecting the connector and configuring it to stream events from Salesforce to Kafka topics.

If you’re migrating from the Salesforce Bulk API 2.0 Source or Salesforce CDC Source connector, see Migrate from Salesforce Bulk API 2.0 Source or CDC Source to Salesforce Source V2.

Prerequisites
  • Authorized access to a Confluent Cloud cluster on Amazon Web Services (AWS), Microsoft Azure (Azure), or Google Cloud.

  • A Salesforce account with View All Data, API Enabled, View Setup and Configuration, and Read access to the target objects. When real.time.ingestion.mode is set to Event-Driven Sync (using Pub/Sub API), explicitly enable Change Data Capture for every SObject listed in sobject.names. OAuth2 Client Credentials or JWT Bearer authentication must be configured in Salesforce.

  • The Confluent CLI installed and configured for the cluster. For installation steps, see Install the Confluent CLI.

  • Schema Registry must be enabled to use a Schema Registry-based format, for example, Avro, JSON_SR (JSON Schema), or Protobuf.

  • For networking considerations, see Networking and DNS. To use a set of public egress IP addresses, see Public Egress IP Addresses for Confluent Cloud Connectors.

  • Kafka cluster credentials. The following lists the different ways you can provide credentials.

    • Enter an existing service account resource ID.

    • Create a Confluent Cloud service account for the connector. Make sure to review the ACL entries required in the service account documentation. Some connectors have specific ACL requirements.

    • Create a Confluent Cloud API key and secret. To create a key and secret, you can use confluent api-key create or you can autogenerate the API key and secret directly in the Cloud Console when setting up the connector.

Using the Confluent Cloud Console

Step 1: Launch your Confluent Cloud cluster

To create and launch a Kafka cluster in Confluent Cloud, see Create a kafka cluster in Confluent Cloud.

Step 2: Add a connector

In the left navigation menu, click Connectors. If you already have connectors in your cluster, click + Add connector.

Step 3: Select your connector

Click the Salesforce Source V2 connector card.

Salesforce Source V2 Connector Card

Step 4: Enter the connector details

Ensure you have all your prerequisites completed. An asterisk ( * ) designates a required entry.

At the Add Salesforce Source V2 Connector screen, complete the following:

Select the topic you want to send data to from the Topics list.

To create a new topic, click +Add new topic.

  1. Select the way you want to provide Kafka Cluster credentials. You can choose one of the following options:

    • My account: This setting allows your connector to globally access everything that you have access to. With a user account, the connector uses an API key and secret to access the Kafka cluster. This option is not recommended for production.

    • Service account: This setting limits the access for your connector by using a service account. This option is recommended for production.

    • Use an existing API key: This setting allows you to specify an API key and a secret pair. You can use an existing pair or create a new one. This method is not recommended for production environments.

    Note

    Freight clusters support only service accounts for Kafka authentication.

  2. Click Continue.

  1. Configure the authentication properties:

    • Salesforce grant type: Salesforce grant type. Valid options are PASSWORD, CLIENT_CREDENTIALS, and JWT_BEARER.

    Salesforce details

    • Salesforce instance: The URL of the Salesforce endpoint to use. When using the CLIENT_CREDENTIALS grant type, provide your Salesforce domain URL. The default is https://login.salesforce.com, which directs the connector to use the endpoint specified in the authentication response.

    • Salesforce username: The Salesforce username the connector uses.

    • Salesforce password: The Salesforce password the connector uses.

    • Salesforce password token: The Salesforce security token associated with the username.

    • Salesforce consumer key: The client ID (consumer key) for the Salesforce Connected App.

    • Salesforce consumer secret: The client secret (consumer secret) for the Salesforce Connected App.

    • Salesforce JWT keystore file: The Salesforce JWT keystore file that contains the private key.

    • Salesforce JWT keystore password: The password the connector uses to access the JWT keystore file.

    Note

    The following properties are used based on the Salesforce grant type you choose.

    • JWT_BEARER: Requires username, consumer key, JWT keystore file, and JWT keystore password.

    • PASSWORD: Requires username, password, password token, consumer key, and consumer secret.

    • CLIENT_CREDENTIALS: Requires consumer key, consumer secret (client ID and client secret of a Salesforce connected application) and Salesforce domain URL in Salesforce instance option. The default value https://login.salesforce.com does not work for this option. To use CLIENT_CREDENTIALS, you must enable the Client Credentials flow in your connected Salesforce application and assign an integration user.

  2. Click Continue.

Properties not shown in the Cloud Console use the default values. For all property values and definitions, see Configuration properties.

  • Select output record value format: Sets the output Kafka record value format. Valid entries are AVRO, JSON_SR, or PROTOBUF.

    Note

    You need to have Confluent Cloud Schema Registry configured if using a schema-based message format like AVRO, JSON_SR, or PROTOBUF.

Historical snapshot

  • Historical snapshot: When enabled, the connector uses Bulk API 2.0 to retrieve existing Salesforce records starting from the Since date.

  • Since: Specifies the CreatedDate in yyyy-MM-dd format from which the connector pulls existing Salesforce records. Applies only when historical snapshot is enabled and defaults to today.

Real-time ingestion

  • Real-time ingestion mode: Select the mode for continuous ingestion: Periodic Polling using Bulk API 2.0 or Event-Driven Sync using Pub/Sub API.

  • Topic prefix: Prefix added to topic names for multi-SObject configuration, for example, prefix.Account or prefix.Contact.

  • SObject names: Comma-separated list of Salesforce SObjects to ingest, such as Account, Contact, or Lead.

Data encryption

  • Enable Client-Side Field Level Encryption for data encryption. Specify a Service Account to access the Schema Registry and associated encryption rules or keys with that schema. For more information on CSFLE or CSPE setup, see Manage encryption for connectors.

Show advanced configurations
  • Schema context: Select a schema context to use for this connector, if using a schema-based data format. This property defaults to the Default context, which configures the connector to use the default schema set up for Schema Registry in your Confluent Cloud environment. A schema context allows you to use separate schemas (like schema sub-registries) tied to topics in different Kafka clusters that share the same Schema Registry environment. For example, if you select a non-default context, a Source connector uses only that schema context to register a schema and a Sink connector uses only that schema context to read from. For more information about setting up a schema context, see What are schema contexts and when should you use them?.

  • Event sync start point: When set to all, the connector replays buffered events from up to the last 72 hours. When set to latest, it streams the latest events only.

  • Gap event recovery: Defines what happens when the connector detects missed streaming events (gap events). Choose resync to run an incremental Bulk API resync from the last record’s commit timestamp, latest to skip missed events and continue from latest events only, or fail to stop the connector task.

  • Full record on update: When enabled, CDC update events read full records instead of only changed fields. Applies only to Event-Driven Sync mode.

  • Emit tombstone on delete: When enabled, delete events emit Kafka tombstones. Applies only to Event-Driven Sync mode.

  • Gap resync safety buffer (ms): Safety buffer in milliseconds that the connector subtracts from the last commit time during incremental Bulk API resync. Applies only when gap event recovery is set to resync. For tuning guidance, see the Salesforce Source V2 connector documentation.

  • Poll interval (ms): Set the time in milliseconds to wait for new change events when no data is returned. Default is 500 ms.

  • Max records per result set: Maximum number of records returned per Bulk API 2.0 result set across all SObjects. The default is 1000.

  • Include deleted records: Whether Bulk API queries must include soft-deleted records by using queryAll.

  • Skip epoch conversion fields: Date or datetime field names that stay as ISO 8601 strings instead of epoch milliseconds across all SObjects.

Additional Configs

  • Value Converter Decimal Format: Specifies the JSON or JSON_SR serialization format for Connect DECIMAL logical type values with two allowed literals: BASE64 to serialize DECIMAL logical types as base64 encoded binary data, and NUMERIC to serialize DECIMAL logical type values in JSON or JSON_SR as a number representing the decimal value.

  • value.converter.replace.null.with.default: Whether to replace fields that have a default value and that are null to the default value. When set to true, the default value is used, otherwise null is used. Applicable for JSON Converter.

  • Key Converter Schema ID Serializer: The class name of the schema ID serializer for keys. This is used to serialize schema IDs in the message headers.

  • Value Converter Reference Subject Name Strategy: Sets the subject reference name strategy for values. Valid entries are DefaultReferenceSubjectNameStrategy or QualifiedReferenceSubjectNameStrategy. You can use this strategy only with PROTOBUF format; the default strategy is DefaultReferenceSubjectNameStrategy.

  • value.converter.schemas.enable: Include schemas within each of the serialized values. Input messages must contain schema and payload fields and may not contain additional fields. For plain JSON data, set this to false. Applicable for JSON Converter.

  • errors.tolerance: Use this property if you would like to configure the connector’s error handling behavior. WARNING: This property should be used with CAUTION for SOURCE CONNECTORS as it may lead to dataloss. If you set this property to ‘all’, the connector will not fail on errant records, but will instead log them (and send to DLQ for Sink Connectors) and continue processing. If you set this property to ‘none’, the connector task will fail on errant records.

  • Value Converter Connect Meta Data: Enables the Connect converter to add its metadata to the output schema. Applies to Avro converters.

  • Value Converter Value Subject Name Strategy: Determines how to construct the subject name under which the value schema is registered with Schema Registry.

  • Key Converter Key Subject Name Strategy: Determines how to construct the subject name for key schema registration.

  • value.converter.ignore.default.for.nullables: When set to true, this property ensures that the corresponding record in Kafka is NULL, instead of showing the default column value. Applicable for AVRO,PROTOBUF and JSON_SR Converters.

  • Value Converter Schema ID Serializer: The class name of the schema ID serializer for values. This is used to serialize schema IDs in the message headers.

Auto-restart policy

  • Enable Connector Auto-restart: Enables the auto-restart behavior of the connector and its task in the event of user-actionable errors. Defaults to true, enabling the connector to automatically restart in case of user-actionable errors. Set this property to false to disable auto-restart for failed connectors. In such cases, you would need to manually restart the connector.

Connection details

  • Max retry time (ms): Maximum time in milliseconds until the connector stops retrying failed Salesforce requests. Default is 30000 and minimum is 1000.

Transforms

Processing position

  • Set offsets: Click Set offsets to define a specific offset for this connector to begin procession data from. For more information on managing offsets, see Manage offsets.

For all property values and definitions, see Configuration properties.

  • Click Continue.

Tasks are distributed across SObjects using round-robin assignment. Each task owns its assigned SObjects through both the snapshot and streaming phases. The connector does not reduce to a single task when streaming begins. Setting tasks.max higher than the number of configured SObjects does not increase parallelism. Extra task slots remain idle.

  1. To change the number of tasks, enter the number of tasks for the connector to use in the Tasks field.

  2. Click Continue.

  1. Verify the connection details.

  2. Click Launch.

    The status for the connector should go from Provisioning to Running.

Step 5: Check the Kafka topic

After the connector is running, verify that messages are populating your Kafka topics. When the historical snapshot is enabled, the connector loads historical data into topics named {prefix}.{SObject}, for example, salesforce.Account, and then automatically switches to real-time CDC streaming on the same topics.

For more information and examples to use with the Confluent Cloud API for Connect, see the Confluent Cloud API for Connect Usage Examples section.

Using the Confluent CLI

Complete the following steps to set up and run the connector using the Confluent CLI.

Ensure you have all your prerequisites completed.

Step 1: List the available connectors

Enter the following command to list available connectors:

confluent connect plugin list

Step 2: List the connector configuration properties

Enter the following command to show the connector configuration properties:

confluent connect plugin describe <connector-plugin-name>

The command output shows the required and optional configuration properties.

Step 3: Create the connector configuration file

Create a JSON file that contains the connector configuration properties. The following example shows the required connector properties using OAuth2 Client Credentials authentication with the historical snapshot enabled and Event-Driven Sync as the real-time mode.

{
  "name": "SalesforceSourceV2_0",
  "config": {
    "connector.class": "SalesforceSourceV2",
    "name": "SalesforceSourceV2_0",
    "kafka.auth.mode": "KAFKA_API_KEY",
    "kafka.api.key": "<my-kafka-api-key>",
    "kafka.api.secret": "<my-kafka-api-secret>",
    "salesforce.grant.type": "CLIENT_CREDENTIALS",
    "salesforce.instance": "https://<your-domain>.my.salesforce.com",
    "salesforce.consumer.key": "<my-consumer-key>",
    "salesforce.consumer.secret": "<my-consumer-secret>",
    "sobject.names": "Account,Contact",
    "topic.prefix": "salesforce",
    "historical.snapshot": "true",
    "real.time.ingestion.mode": "Event-Driven Sync (using Pub/Sub API)",
    "output.data.format": "AVRO",
    "tasks.max": "1"
  }
}

Note the following property definitions:

  • "name": Sets a name for your new connector.

  • "connector.class": Identifies the connector plugin name.

  • "kafka.auth.mode": Identifies the connector authentication mode you want to use. There are two options: SERVICE_ACCOUNT or KAFKA_API_KEY (the default). To use an API key and secret, specify the configuration properties kafka.api.key and kafka.api.secret, as shown in the example configuration (above). To use a service account, specify the Resource ID in the property kafka.service.account.id=<service-account-resource-ID>. To list the available service account resource IDs, use the following command:

    confluent iam service-account list
    

    For example:

    confluent iam service-account list
    
       Id     | Resource ID |       Name        |    Description
    +---------+-------------+-------------------+-------------------
       123456 | sa-l1r23m   | sa-1              | Service account 1
       789101 | sa-l4d56p   | sa-2              | Service account 2
    
  • "salesforce.grant.type": Sets the authentication grant type. Valid entries are CLIENT_CREDENTIALS for OAuth2 Client Credentials (recommended), JWT_BEARER for Salesforce JSON Web Token, or PASSWORD for username and password authentication.

    Note

    The following properties are used based on the Salesforce grant type you choose.

    • JWT_BEARER: Requires username, consumer key, JWT keystore file, and JWT keystore password.

    • PASSWORD: Requires username, password, password token, consumer key, and consumer secret.

    • CLIENT_CREDENTIALS: Requires consumer key, consumer secret (client ID and client secret of a Salesforce connected application) and Salesforce domain URL in Salesforce instance option. The default value https://login.salesforce.com does not work for this option. To use CLIENT_CREDENTIALS, you must enable the Client Credentials flow in your connected Salesforce application and assign an integration user.

  • "salesforce.instance": The URL of your Salesforce instance. When using CLIENT_CREDENTIALS, provide your Salesforce domain URL, for example, https://your-domain.my.salesforce.com. The default value https://login.salesforce.com does not work for CLIENT_CREDENTIALS.

  • "salesforce.consumer.key": The consumer key for your Salesforce Connected App, also called the client ID. Required for CLIENT_CREDENTIALS and JWT_BEARER grant types.

  • "salesforce.consumer.secret": The consumer secret for your Salesforce Connected App, also called the client secret. Required for the CLIENT_CREDENTIALS grant type.

  • "sobject.names": A comma-separated list of Salesforce SObject API names to ingest, for example, Account,Contact,Lead. Specify SObject names using their exact Salesforce API name with appropriate casing.

  • "topic.prefix": The topic name prefix. Topics are named {prefix}.{SObject}, for example, salesforce.Account.

  • "historical.snapshot": When set to true, the connector performs an initial historical load of existing Salesforce data using Bulk API 2.0 before switching to real-time streaming. Defaults to true.

  • "real.time.ingestion.mode": Sets the real-time ingestion mode. Event-Driven Sync (using Pub/Sub API) is the default and uses the Pub/Sub API for real-time CDC. Periodic Polling (using Bulk API 2.0) uses Bulk API 2.0 for periodic polling.

  • "output.data.format": Sets the output Kafka record value format. Valid entries are AVRO, JSON_SR, or PROTOBUF. You must have Confluent Cloud Schema Registry configured if using a schema-based message format.

Note

To enable CSFLE or CSPE for data encryption, specify the following properties:

  • csfle.enabled: Flag to indicate whether the connector honors CSFLE or CSPE rules.

  • sr.service.account.id: A Service Account to access the Schema Registry and associated encryption rules or keys with that schema.

For more information on CSFLE or CSPE setup, see Manage encryption for connectors.

  • "tasks.max": Maximum tasks for the connector. Tasks are distributed across SObjects and each task owns its assigned SObjects through both snapshot and streaming phases. If tasks.max exceeds the number of configured SObjects, extra task slots remain idle.

For all property values and descriptions, see Configuration properties.

Step 4: Load the configuration file and create the connector

Enter the following command to load the configuration and start the connector:

confluent connect cluster create --config-file <file-name>.json

For example:

confluent connect cluster create --config-file salesforce-source-v2-config.json

Example output:

Created connector SalesforceSourceV2_0 lcc-ix4dl

Step 5: Check the connector status

Enter the following command to check the connector status:

confluent connect cluster list

Example output:

ID          |          Name               | Status  | Type
+-----------+-----------------------------+---------+------+
lcc-ix4dl   | SalesforceSourceV2_0        | RUNNING | source

Step 6: Check the Kafka topics

After the connector is running, verify that messages are populating your Kafka topics. Topics follow the naming pattern {prefix}.{SObject}, for example, salesforce.Account and salesforce.Contact.

For more information and examples to use with the Confluent Cloud API for Connect, see the Confluent Cloud API for Connect Usage Examples section.

Migrate from Salesforce Bulk API 2.0 Source or CDC Source to Salesforce Source V2

This version introduces changes that are not directly backward compatible, so Confluent provides Salesforce Source V2 as a separate connector type on Confluent Cloud.

Before you migrate

The Salesforce Source V2 connector is not a direct replacement for every Salesforce Bulk API 2.0 Source or Salesforce CDC Source. The following configurations cannot migrate directly:

  • For Bulk API 2.0 Source, you cannot migrate custom SOQL queries, per-object since dates, per-object include.deleted.records settings, or custom per-object topic names.

  • For CDC Source, you cannot migrate custom channels or configurations that rely on Streaming API replay-ID semantics.

The following changes apply to every migration:

  • Topic names change to the format <topic.prefix>.<SObjectName>. You must repoint downstream consumers to the new topics.

  • Record schemas might differ for some fields, which requires Confluent Schema Registry compatibility evaluation.

  • Salesforce CDC Source offsets cannot be directly translated because the Streaming API and Pub/Sub API have independent replay-ID spaces.

  • The Salesforce Source V2 connector supports only AVRO, JSON_SR, and PROTOBUF output formats.

  • Downstream applications must be idempotent to handle potential duplicates during the cutover window.

Migrate from Salesforce Bulk API 2.0 Source to Salesforce Source V2

Use the following procedure to migrate. Implement and validate the configuration in a pre-production environment before you promote it to production.

  1. Pause the Salesforce Bulk API 2.0 Source connector to stabilize its stored offsets.

  2. Fetch the connector offsets by using the Confluent Cloud Connect offsets API. The offset uses the following structure:

    {
      "partition": {"table": "<SObjectName>"},
      "offset": {
        "time": <epoch-ms>,
        "jobId": "<jobId-or-null>",
        "locator": "<resultsLocator-when-non-null>",
        "offsetCheckpoint": "<watermark>",
        "lastModifiedCheckpoint": "<watermark>"
      }
    }
    

    You must obtain the lastModifiedCheckpoint property value for each SObject.

    For more information, see the Connect offsets API reference and Manage Offsets for Fully-Managed Connectors in Confluent Cloud.

  3. Create a configuration file with connector configurations and offsets.

    {
      "name": "<v2-connector-name>",
      "config": {
        "connector.class": "SalesforceSourceV2",
        "topic.prefix": "<your-prefix>",
        "sobject.names": "Account,Lead",
        "historical.snapshot": "false",
        "real.time.ingestion.mode": "Periodic Polling (using Bulk API 2.0)",
        "salesforce.grant.type": "...",
        "salesforce.instance": "...",
        "salesforce.consumer.key": "...",
        "salesforce.consumer.secret": "...",
        "output.data.format": "AVRO",
        "tasks.max": "1"
      },
      "offsets": [
        {
          "partition": {"sobject": "Account"},
          "offset": {
            "offset.version": 1,
            "mode": "Periodic Polling",
            "lastSystemModstamp":
              "<lastModifiedCheckpoint for Account>",
            "lastId": ""
          }
        },
        {
          "partition": {"sobject": "Lead"},
          "offset": {
            "offset.version": 1,
            "mode": "Periodic Polling",
            "lastSystemModstamp":
              "<lastModifiedCheckpoint for Lead>",
            "lastId": ""
          }
        }
      ]
    }
    
  4. Create the connector by running the following command:

    confluent connect cluster create --config-file config.json
    

    On startup, the connector reads each injected offset and runs its first-poll SOQL query by using the watermark filter branch, such as WHERE SystemModstamp > <lastSystemModstamp>.

  5. Verify that the connector emits records to the new topics as expected.

  6. Repoint downstream consumers to the new topics, which use the format <topic.prefix>.<SObjectName>.

  7. Delete the Salesforce Bulk API 2.0 Source connector.

Migrate from Salesforce CDC Source to Salesforce Source V2

A parallel run is necessary because the Salesforce Pub/Sub API and Streaming API use independent replay ID spaces, meaning the Salesforce Source V2 connector cannot resume from the Salesforce CDC Source replay ID. Running both connectors in parallel ensures that the Salesforce Source V2 stream starts before the Salesforce CDC Source connector stops, which prevents event loss during the transition. Downstream deduplication removes any temporary duplicate records caused by this overlap.

  1. Create the Salesforce Source V2 connector while the CDC connector is still running. Configure the Salesforce Source V2 connector for pure streaming from the latest point in the Pub/Sub stream, and add the list of objects that the CDC connector tracks:

    {
      "name": "<v2-connector-name>",
      "config": {
        "connector.class": "SalesforceSourceV2",
        "topic.prefix": "<your-prefix>",
        "sobject.names": "<SObject list>",
        "historical.snapshot": "false",
        "real.time.ingestion.mode": "Event-Driven Sync (using Pub/Sub API)",
        "event.sync.start.point": "latest",
        "salesforce.grant.type": "...",
        "salesforce.instance": "...",
        "salesforce.consumer.key": "...",
        "salesforce.consumer.secret": "...",
        "output.data.format": "AVRO",
        "tasks.max": "1"
      }
    }
    
  2. Deploy the connector by running the following command:

    confluent connect cluster create --config-file config.json
    

    The Salesforce Source V2 connector opens a Pub/Sub API subscription at the next event after the subscription becomes active and begins emitting events while the old CDC connector continues to run in parallel.

  3. Run both connectors in parallel for a buffer period of two to five minutes. Verify that both connectors receive the same change events in their respective topics.

  4. Switch your downstream consumers to the Salesforce Source V2 topic. Your downstream applications must deduplicate these records by using a key such as the Salesforce record ID, the change type, and the commit timestamp. To avoid losing events during the switch, configure the consumer to read the Salesforce Source V2 topic from the earliest offset or from an Kafka offset that corresponds to the timestamp when the consumer stopped reading the Salesforce CDC Source topic.

  5. Stop the Salesforce CDC Source connector after downstream applications read reliably from the new topic.

  6. Delete the Salesforce CDC Source connector.

Considerations for deduplication and schema changes

Plan for downstream deduplication based on the Salesforce record ID and a change-marker field. For bulk-style flows, use the LastModifiedDate or SystemModstamp properties. For CDC flows, use the commit timestamp and the change type.

Evaluate schema compatibility for each subject before you promote the Salesforce Source V2 connector to production because the schemas might differ in field type mappings for certain Salesforce data types.

Configuration properties

Use the following configuration properties with the fully-managed connector. For self-managed connector property definitions and other details, see the connector docs in Self-managed connectors for Confluent Platform.

How should we connect to your data?

name

Sets a name for your connector.

  • Type: string

  • Valid Values: A string at most 64 characters long

  • Importance: high

Kafka Cluster credentials

kafka.auth.mode

Kafka Authentication mode. It can be one of KAFKA_API_KEY or SERVICE_ACCOUNT. It defaults to KAFKA_API_KEY mode, whenever possible.

  • Type: string

  • Valid Values: SERVICE_ACCOUNT, KAFKA_API_KEY

  • Importance: high

kafka.api.key

Kafka API Key. Required when kafka.auth.mode==KAFKA_API_KEY.

  • Type: password

  • Importance: high

kafka.service.account.id

The Service Account that will be used to generate the API keys to communicate with Kafka Cluster.

  • Type: string

  • Importance: high

kafka.api.secret

Secret associated with Kafka API key. Required when kafka.auth.mode==KAFKA_API_KEY.

  • Type: password

  • Importance: high

Schema Config

schema.context.name

Add a schema context name. A schema context represents an independent scope in Schema Registry. It is a separate sub-schema tied to topics in different Kafka clusters that share the same Schema Registry instance. If not used, the connector uses the default schema configured for Schema Registry in your Confluent Cloud environment.

  • Type: string

  • Default: default

  • Importance: medium

How should we connect to Salesforce?

salesforce.grant.type

OAuth grant type the connector uses for Salesforce authentication: PASSWORD, CLIENT_CREDENTIALS, or JWT_BEARER

  • Type: string

  • Default: PASSWORD

  • Importance: high

salesforce.instance

Salesforce endpoint URL the connector uses for API requests. For CLIENT_CREDENTIALS, use your Salesforce domain URL. Defaults to https://login.salesforce.com.

salesforce.username

Salesforce username for connector authentication

  • Type: string

  • Importance: high

salesforce.password

Salesforce account password for connector authentication

  • Type: password

  • Importance: high

salesforce.password.token

Security token associated with the Salesforce username

  • Type: password

  • Importance: high

salesforce.consumer.key

Client ID (consumer key) for the Salesforce connected app

  • Type: password

  • Importance: high

salesforce.consumer.secret

Client secret (consumer secret) for the Salesforce connected app

  • Type: password

  • Importance: medium

salesforce.jwt.keystore.file

Keystore file that stores the private key for JWT authentication

  • Type: password

  • Default: [hidden]

  • Importance: medium

salesforce.jwt.keystore.password

Password that unlocks the JWT keystore file

  • Type: password

  • Importance: medium

Historical snapshot

historical.snapshot

When enabled, the connector uses Bulk API 2.0 to retrieve existing Salesforce records starting from the Since date

  • Type: boolean

  • Default: true

  • Importance: high

since

CreatedDate in yyyy-MM-dd format from which the connector pulls existing Salesforce records. Applies only when historical snapshot is enabled and defaults to today.

  • Type: string

  • Importance: medium

Real-time ingestion

real.time.ingestion.mode

Select mode for continuous ingestion: Periodic Polling using Bulk API 2.0 or Event-Driven Sync using Pub/Sub API

  • Type: string

  • Default: Event-Driven Sync (using Pub/Sub API)

  • Importance: high

topic.prefix

Prefix added to topic names for multi-SObject configuration, for example prefix.Account or prefix.Contact.

  • Type: string

  • Importance: high

sobject.names

Comma-separated list of Salesforce SObjects to ingest, such as Account, Contact, or Lead.

  • Type: list

  • Importance: high

Connection details

request.max.retries.time.ms

Maximum time in milliseconds until the connector stops retrying failed Salesforce requests. Default is 30000 and minimum is 1000.

  • Type: long

  • Default: 30000 (30 seconds)

  • Valid Values: [1000,…,250000]

  • Importance: low

Output messages

output.data.format

Sets the output Kafka record value format. Valid entries are AVRO, JSON_SR, or PROTOBUF.

  • Type: string

  • Default: JSON

  • Importance: high

Number of tasks for this connector

tasks.max

Maximum number of tasks for the connector.

  • Type: int

  • Valid Values: [1,…]

  • Importance: high

Advanced Connector Configs

event.sync.start.point

When set to all, the connector replays buffered events from up to the last 72 hours. When set to latest, it streams the latest events only.

  • Type: string

  • Default: latest

  • Importance: medium

gap.event.recovery

Defines what happens when the connector detects missed streaming events (gap events). Choose resync to run an incremental Bulk API resync from the last record’s commit timestamp, latest to skip missed events and continue from latest events only, or fail to stop the connector task.

  • Type: string

  • Default: resync

  • Importance: high

cdc.full.record.on.update

When enabled, CDC update events read full records instead of only changed fields. Applies only to Event-Driven Sync mode.

  • Type: boolean

  • Default: false

  • Importance: medium

emit.tombstone.on.delete

When enabled, delete events emit Kafka tombstones. Applies only to Event-Driven Sync mode.

  • Type: boolean

  • Default: false

  • Importance: medium

gap.resync.buffer.ms

Safety buffer in milliseconds that the connector subtracts from the last commit time during incremental Bulk API resync. Applies only when gap event recovery is set to resync. See the Salesforce Source V2 connector documentation for tuning guidance.

  • Type: long

  • Default: 60000 (1 minute)

  • Importance: low

poll.interval.ms

Interval in milliseconds between consecutive polls. If the connector restarts during an interval, it starts a new full interval, which can make the actual wait longer than this value. See the Salesforce Source V2 connector documentation for examples and recommended values.

  • Type: long

  • Default: 30000 (30 seconds)

  • Valid Values: [8700,…,10800000]

  • Importance: medium

bulk.api.max.records.per.poll

Maximum number of records returned per Bulk API 2.0 result set across all SObjects. Default is 1000.

  • Type: int

  • Default: 1000

  • Valid Values: [1,…,3000]

  • Importance: medium

include.deleted.records

Whether Bulk API queries must include soft-deleted records by using queryAll

  • Type: boolean

  • Default: false

  • Importance: medium

skip.epoch.conversion.fields

Date or datetime field names that stay as ISO 8601 strings instead of epoch milliseconds across all SObjects.

  • Type: list

  • Default: “”

  • Importance: low

Additional Configs

header.converter

The converter class for the headers. This is used to serialize and deserialize the headers of the messages.

  • Type: string

  • Importance: low

producer.override.compression.type

The compression type for all data generated by the producer. Valid values are none, gzip, snappy, lz4, and zstd.

  • Type: string

  • Importance: low

producer.override.linger.ms

The producer groups together any records that arrive in between request transmissions into a single batched request. More details can be found in the documentation: https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html#linger-ms.

  • Type: long

  • Valid Values: [100,…,1000]

  • Importance: low

value.converter.allow.optional.map.keys

Allow optional string map key when converting from Connect Schema to Avro Schema. Applicable for Avro Converters.

  • Type: boolean

  • Importance: low

value.converter.auto.register.schemas

Specify if the Serializer should attempt to register the Schema.

  • Type: boolean

  • Importance: low

value.converter.connect.meta.data

Allow the Connect converter to add its metadata to the output schema. Applicable for Avro Converters.

  • Type: boolean

  • Importance: low

value.converter.enhanced.avro.schema.support

Enable enhanced schema support to preserve package information and Enums. Applicable for Avro Converters.

  • Type: boolean

  • Importance: low

value.converter.enhanced.protobuf.schema.support

Enable enhanced schema support to preserve package information. Applicable for Protobuf Converters.

  • Type: boolean

  • Importance: low

value.converter.flatten.unions

Whether to flatten unions (oneofs). Applicable for Protobuf Converters.

  • Type: boolean

  • Importance: low

value.converter.generate.index.for.unions

Whether to generate an index suffix for unions. Applicable for Protobuf Converters.

  • Type: boolean

  • Importance: low

value.converter.generate.struct.for.nulls

Whether to generate a struct variable for null values. Applicable for Protobuf Converters.

  • Type: boolean

  • Importance: low

value.converter.int.for.enums

Whether to represent enums as integers. Applicable for Protobuf Converters.

  • Type: boolean

  • Importance: low

value.converter.latest.compatibility.strict

Verify latest subject version is backward compatible when use.latest.version is true.

  • Type: boolean

  • Importance: low

value.converter.object.additional.properties

Whether to allow additional properties for object schemas. Applicable for JSON_SR Converters.

  • Type: boolean

  • Importance: low

value.converter.optional.for.nullables

Whether nullable fields should be specified with an optional label. Applicable for Protobuf Converters.

  • Type: boolean

  • Importance: low

value.converter.optional.for.proto2

Whether proto2 optionals are supported. Applicable for Protobuf Converters.

  • Type: boolean

  • Importance: low

value.converter.scrub.invalid.names

Whether to scrub invalid names by replacing invalid characters with valid characters. Applicable for Avro and Protobuf Converters.

  • Type: boolean

  • Importance: low

value.converter.use.latest.version

Use latest version of schema in subject for serialization when auto.register.schemas is false.

  • Type: boolean

  • Importance: low

value.converter.use.optional.for.nonrequired

Whether to set non-required properties to be optional. Applicable for JSON_SR Converters.

  • Type: boolean

  • Importance: low

value.converter.wrapper.for.nullables

Whether nullable fields should use primitive wrapper messages. Applicable for Protobuf Converters.

  • Type: boolean

  • Importance: low

value.converter.wrapper.for.raw.primitives

Whether a wrapper message should be interpreted as a raw primitive at root level. Applicable for Protobuf Converters.

  • Type: boolean

  • Importance: low

errors.tolerance

Use this property if you would like to configure the connector’s error handling behavior. WARNING: This property should be used with CAUTION for SOURCE CONNECTORS as it may lead to dataloss. If you set this property to ‘all’, the connector will not fail on errant records, but will instead log them (and send to DLQ for Sink Connectors) and continue processing. If you set this property to ‘none’, the connector task will fail on errant records.

  • Type: string

  • Default: none

  • Importance: low

key.converter.key.schema.id.serializer

The class name of the schema ID serializer for keys. This is used to serialize schema IDs in the message headers.

  • Type: string

  • Default: io.confluent.kafka.serializers.schema.id.PrefixSchemaIdSerializer

  • Importance: low

key.converter.key.subject.name.strategy

How to construct the subject name for key schema registration.

  • Type: string

  • Default: TopicNameStrategy

  • Importance: low

value.converter.decimal.format

Specify the JSON/JSON_SR serialization format for Connect DECIMAL logical type values with two allowed literals:

BASE64 to serialize DECIMAL logical types as base64 encoded binary data and

NUMERIC to serialize Connect DECIMAL logical type values in JSON/JSON_SR as a number representing the decimal value.

  • Type: string

  • Default: BASE64

  • Importance: low

value.converter.flatten.singleton.unions

Whether to flatten singleton unions. Applicable for Avro and JSON_SR Converters.

  • Type: boolean

  • Default: false

  • Importance: low

value.converter.ignore.default.for.nullables

When set to true, this property ensures that the corresponding record in Kafka is NULL, instead of showing the default column value. Applicable for AVRO,PROTOBUF and JSON_SR Converters.

  • Type: boolean

  • Default: false

  • Importance: low

value.converter.reference.subject.name.strategy

Set the subject reference name strategy for value. Valid entries are DefaultReferenceSubjectNameStrategy or QualifiedReferenceSubjectNameStrategy. Note that the subject reference name strategy can be selected only for PROTOBUF format with the default strategy being DefaultReferenceSubjectNameStrategy.

  • Type: string

  • Default: DefaultReferenceSubjectNameStrategy

  • Importance: low

value.converter.replace.null.with.default

Whether to replace fields that have a default value and that are null to the default value. When set to true, the default value is used, otherwise null is used. Applicable for JSON Converter.

  • Type: boolean

  • Default: true

  • Importance: low

value.converter.schemas.enable

Include schemas within each of the serialized values. Input messages must contain schema and payload fields and may not contain additional fields. For plain JSON data, set this to false. Applicable for JSON Converter.

  • Type: boolean

  • Default: false

  • Importance: low

value.converter.value.schema.id.serializer

The class name of the schema ID serializer for values. This is used to serialize schema IDs in the message headers.

  • Type: string

  • Default: io.confluent.kafka.serializers.schema.id.PrefixSchemaIdSerializer

  • Importance: low

value.converter.value.subject.name.strategy

Determines how to construct the subject name under which the value schema is registered with Schema Registry.

  • Type: string

  • Default: TopicNameStrategy

  • Importance: low

Auto-restart policy

auto.restart.on.user.error

Enable connector to automatically restart on user-actionable errors.

  • Type: boolean

  • Default: true

  • Importance: medium

Next steps

For an example that shows fully-managed Confluent Cloud connectors in action with Confluent Cloud for Apache Flink, see the Cloud ETL Demo. This example also shows how to use Confluent CLI to manage your resources in Confluent Cloud.

../_images/topology.png