Salesforce Source V2 Connector for Confluent Cloud
The fully-managed Salesforce Source V2 connector for Confluent Cloud optionally performs an initial historical load from Salesforce using Bulk API 2.0. The connector then either streams real-time Change Data Capture (CDC) events using the Pub/Sub API or periodically polls Salesforce for changes using Bulk API 2.0, and delivers data to the same Kafka topic per SObject with zero data loss. The connector supports Salesforce up to API version 65.0.
Features
The Salesforce Source V2 connector provides the following features:
Real-time ingestion modes: The connector supports an optional historical snapshot for initial data loads using Bulk API 2.0. For real-time ingestion, configure one of the following modes:
Event-Driven Sync: Streams real-time CDC events using the Pub/Sub API. This is the default real-time ingestion mode.
Periodic Polling: Periodically polls Salesforce for changes using Bulk API 2.0.
When the historical snapshot is enabled, the connector automatically transitions from snapshot to real-time ingestion on the same Kafka topic with zero data loss.
Multi-SObject ingestion: The connector ingests data from up to five Salesforce SObjects per connector instance, each with its own Kafka topic and independent Pub/Sub channel. Topics follow the naming pattern
{prefix}.{SObject}, for example,salesforce.Account. The connector processes each SObject with independent state — schema cache, offsets, bulk job, and Pub/Sub subscription — so each SObject’s progress and failures are isolated.Modern Pub/Sub API: The connector uses Salesforce’s recommended Pub/Sub API, which uses gRPC, HTTP/2, and binary Avro encoding, instead of the legacy CometD-based Streaming API.
Full record retrieval on update: By default, CDC
UPDATEevents emit only the changed fields, following Salesforce’s partial-payload semantics. Whencdc.full.record.on.updateis set totrue, the connector calls the Salesforce REST API once perUPDATEevent to fetch the complete post-image of the record and emits all field values to Kafka. CDCCREATEevents already carry the full new record, andDELETEevents have no field payload, so this option affects onlyUPDATEevents, and only whenreal.time.ingestion.modeis set toEvent-Driven Sync (using Pub/Sub API). The option is suited for low-to-moderate update volumes because of the extra REST API call perUPDATEevent. High-throughput organizations can exhaust their Salesforce daily API quota.GAP event reconciliation: The connector automatically detects GAP events, which are notifications from the Pub/Sub API that it missed events. For more information, see Gap events in the Salesforce documentation. The connector handles these events according to the
gap.event.recoveryproperty:Resync (the default): The connector switches the affected SObject to an incremental Bulk API 2.0 re-sync to recover missed events while other SObjects continue streaming without interruption. To determine the re-sync start time, the connector selects the later timestamp between the commit time of the last successful CDC event and the completion time of the historical snapshot, and then subtracts a safety buffer. For example, if the last event occurs at 10:15 AM, the snapshot finishes at 10:00 AM, and the safety buffer is 5 minutes, the connector selects 10:15 AM and subtracts the buffer to start the re-sync at 10:10 AM.
Latest: Skips the gap and resubscribes from the newest available events. Events that occur during the gap are lost.
Fail: Stops the task for manual intervention.
If a GAP event arrives before the connector has any watermark to anchor against — for example, when the historical snapshot is disabled and no CDC events have been emitted yet — the
Resyncpolicy falls back to a full table re-fetch using Bulk API 2.0 to guarantee coverage. This prevents data loss but can be expensive on large SObjects.Streaming initial start: The
event.sync.start.pointproperty controls the replay position when a Pub/Sub streaming subscription opens with no stored replay ID. This is the case on a cold start or after the offsets are reset or deleted. As soon as the connector emits an event, it stores that event’s replay ID in its offsets and, on any subsequent restart, resumes from the stored replay ID using theCUSTOMpreset —event.sync.start.pointis not consulted in that case. Note that resetting or deleting the connector’s offsets removes the stored replay ID, so the next start again falls back toevent.sync.start.point.latest: The default. Opens with the Salesforce
LATESTpreset, starting from new events only.all: Opens with the Salesforce
EARLIESTreplay preset, replaying everything still in the 72-hour Pub/Sub retention buffer.
Changed and null field detection: The connector identifies changed and nulled fields by decoding Salesforce CDC hex bitmaps. It exposes the resolved field names as Kafka record headers:
source.changed.fields: comma-separated names of fields whose value changed.cdc.nulled.fields: comma-separated names of fields explicitly set to null.
Downstream consumers such as Flink SQL and ksqlDB can use these headers to distinguish “field unchanged” from “field set to null”.
CSFLE and CSPE support: The connector supports Client-Side Field Level Encryption (CSFLE) and client-side per-event encryption (CSPE) for sensitive data. For more information about CSFLE or CSPE setup, see the connector configuration.
Supported data formats: The connector supports Avro, JSON Schema, and Protobuf output data. You must enable Schema Registry to use a Schema Registry-based format, for example, Avro, JSON_SR (JSON Schema), or Protobuf.
Configurable date and datetime representation: By default, the connector emits Salesforce date and datetime fields as epoch milliseconds. To keep them as ISO 8601 strings, list the field names in
skip.epoch.conversion.fields. Changing this list alters the Connect schema type for those fields —int64for epoch milliseconds,stringfor ISO 8601 — so ensure your Schema Registry compatibility settings allow the resulting schema evolution.Tombstone records: When
emit.tombstone.on.deleteis set totrue,DELETECDC events produce tombstone records with a null value and non-null key for Kafka topic compaction. The default isfalse.Soft-delete handling: When
include.deleted.recordsis set totrue, the connector uses the Bulk APIqueryAllcall to include soft-deleted records in historical snapshot and periodic polling modes. In CDC modes, the Pub/Sub API surfaces deletes asDELETEevents.Automatic retries: The connector classifies each error by HTTP or gRPC status and retries transient failures with exponential backoff and jitter. Transient failures include:
HTTP 429 and 5xx responses.
Bulk API daily-quota responses such as HTTP 403
REQUEST_LIMIT_EXCEEDED.Responses with gRPC status
UNAVAILABLE,DEADLINE_EXCEEDED, orRESOURCE_EXHAUSTED.Network timeouts.
The total retry budget is capped by
request.max.retries.time.ms, which defaults to 30 seconds.Offset management: The connector supports custom offset management. For more information, see Manage custom offsets.
For more information and examples to use with the Confluent Cloud API for Connect, see the Confluent Cloud API for Connect Usage Examples section.
Historical snapshot to streaming handoff
When historical.snapshot is set to true and real.time.ingestion.mode is set to Event-Driven Sync (using Pub/Sub API), the connector performs a one-time historical backfill using Bulk API 2.0 and then sustains real-time replication using the Pub/Sub API for Change Data Capture. The snapshot and streaming phases overlap in time, so the handoff requires no manual intervention.
Avoiding event loss at the start of streaming
While the bulk job runs, the connector opens a parallel Pub/Sub subscription with the LATEST preset as a one-shot probe. On the first event, the connector stores the replayId in the offset and closes the subscription. At handoff to event-driven sync, the connector opens a fresh subscription with the CUSTOM preset using the stored replayId, so streaming resumes at or before the point the snapshot ended. No events are lost between the end of the bulk job and the start of streaming.
Avoiding duplicate events at the handoff
The brief overlap between the snapshot and streaming phases is protected by a deduplication watermark. When the bulk job completes, the connector records the snapshot completion timestamp, which is the upper bound of the last bulk job. Any Change Data Capture event whose change timestamp is strictly before the snapshot completion timestamp is dropped, because the snapshot already captured that record’s latest state.
GAP event recovery resync
When a GAP event arrives and gap.event.recovery is set to resync, the connector switches the affected SObject to a Bulk API 2.0 re-sync to fetch the missed events. The re-sync starting point depends on whether a CDC event has been emitted for the SObject:
Last commit time stored: At least one CDC event has been emitted for the SObject. The connector re-syncs from the commit time of the latest emitted CDC event, minus a safety buffer set by
gap.resync.buffer.ms, which defaults to 60 seconds. This is the tightest possible re-fetch.No last commit time: No CDC event has been emitted yet. The connector falls back to the snapshot completion timestamp minus
gap.resync.buffer.ms, re-fetching everything since the snapshot ended.
Limitations
Review the following:
For connector limitations, see Salesforce Source V2 connector.
If you plan to use one or more Single Message Transforms (SMTs), see SMT Limitations.
Manage custom offsets
You can manage the offsets for this connector. Offsets provide information on the point in the system from which the connector is accessing data. For more information, see Manage Offsets for Fully-Managed Connectors in Confluent Cloud.
To manage offsets:
Manage offsets using Confluent Cloud APIs. For more information, see Connect offsets API reference.
The Salesforce Source V2 connector emits one offset entry per Salesforce SObject. Each entry tracks the SObject’s progress through the connector’s loading lifecycle. The mode field has three values:
Historical SnapshotPeriodic PollingEvent-Driven Sync
To get the current offset, make a GET request that specifies the environment, Kafka cluster, and connector name.
GET /connect/v1/environments/{environment_id}/clusters/{kafka_cluster_id}/connectors/{connector_name}/offsets
Host: https://api.confluent.cloud
Response:
Successful calls return HTTP 200 with a JSON payload that describes the offset.
{
"id": "lcc-example123",
"name": "{connector_name}",
"offsets": [
{
"partition": {
"sobject": "Contact"
},
"offset": {
"commitNumber": 1779986819752870000,
"commitTimestamp": 1779986819000,
"mode": "Event-Driven Sync",
"offset.version": 1,
"replayId": "AAAAAAARXWcAAA==",
"snapshotCompletionTimestamp": 1779986505317
}
},
{
"partition": {
"sobject": "Account"
},
"offset": {
"lastId": "001gL00001MoGikQAF",
"lastSystemModstamp": "2026-05-27T05:48:54.000Z",
"mode": "Historical Snapshot",
"offset.version": 1,
"snapshotCompletionTimestamp": 1779986499361
}
}
],
"metadata": {
"observed_at": "2026-05-28T16:48:53.969580851Z"
}
}
Responses include the following information:
The current loading phase of each SObject, represented by
modein the offset payload.The position within that phase. For the Bulk API-based modes, such as
Historical SnapshotandPeriodic Polling, the position is represented bylastSystemModstamp,lastId,jobId, andlocator. ForEvent-Driven Sync, the position is represented bysnapshotCompletionTimestamp,replayId,commitTimestamp, andcommitNumber.The observed time of the offset in the metadata portion of the payload. The
observed_attime indicates a snapshot in time for when the API retrieved the offset.Information about the connector.
To update the offset, make a POST request that specifies the environment, Kafka cluster, and connector name. Include a JSON payload that specifies the new offset and a patch type.
POST /connect/v1/environments/{environment_id}/clusters/{kafka_cluster_id}/connectors/{connector_name}/offsets/request
Host: https://api.confluent.cloud
{
"type": "PATCH",
"offsets": [
{
"partition": { "sobject": "Contact" },
"offset": {
"offset.version": 1,
"mode": "Event-Driven Sync",
"snapshotCompletionTimestamp": 1777904179496,
"replayId": "AAAAAAAAALY=",
"commitTimestamp": 1777904180123,
"commitNumber": 12345
}
}
]
}
Considerations:
You can only make one offset change at a time for a given connector.
This is an asynchronous request. To check the status of this request, you must use the check offset status API. For more information, see Get the status of an offset request.
For source connectors, the connector attempts to read from the position defined by the requested offsets. For the Salesforce Source V2 connector, the
modefield in the offset determines the position:Historical Snapshotcontinues the initial Bulk API 2.0 historical backfill fromlastSystemModstampandlastId. When those fields are empty, the snapshot starts from thesinceconfiguration.Periodic Pollingcontinues incremental Bulk polling fromlastSystemModstampandlastId.Event-Driven Syncresumes Pub/Sub event consumption fromreplayId.A
PATCHupdates only the SObjects listed in the request. Omitted SObjects are unaffected. To reset a single SObject without disturbing others, send a null offset for that SObject’s partition.The
replayIdfield is the base64-encoded Salesforce Pub/Sub replay ID for the last consumed CDC event. For more information, see Replay in the Salesforce Pub/Sub API documentation.The connector discards events outside the 72-hour Salesforce retention period. If the requested
replayIdfalls outside the retention window, the connector applies the policy configured bygap.event.recovery.PATCHing
modetoHistorical Snapshothas no effect in pure streaming mode, that is, whenhistorical.snapshotisfalseandreal.time.ingestion.modeisEvent-Driven Sync (using Pub/Sub API). The connector forces streaming at startup regardless of the offset’smode. Update the connector configuration to enablehistorical.snapshotif you need to run a Bulk snapshot.
Response:
Successful calls return HTTP 202 Accepted with a JSON payload that describes the offset.
{
"id": "lcc-example123",
"name": "{connector_name}",
"offsets": [
{
"partition": {
"sobject": "Contact"
},
"offset": {
"commitNumber": 12345,
"commitTimestamp": 1777904180123,
"mode": "Event-Driven Sync",
"offset.version": 1,
"replayId": "AAAAAAAAALY=",
"snapshotCompletionTimestamp": 1777904179496
}
}
],
"requested_at": "2026-05-28T16:51:39.735139217Z",
"type": "PATCH"
}
Responses include the following information:
The requested position of the offsets in the source.
The time of the request to update the offset.
Information about the connector.
To delete the offset, make a POST request that specifies the environment, Kafka cluster, and connector name. Include a JSON payload that specifies the delete type.
POST /connect/v1/environments/{environment_id}/clusters/{kafka_cluster_id}/connectors/{connector_name}/offsets/request
Host: https://api.confluent.cloud
{
"type": "DELETE"
}
Considerations:
Delete requests delete the offset for the provided partition and reset to the base state. A delete request resets the connector to its base state, equivalent to creating a new connector.
A
DELETEresets the offsets for all SObjects tracked by the connector. To reset a single SObject, send a per-partition null offset using aPATCHrequest instead.This is an asynchronous request. To check the status of this request, you must use the check offset status API. For more information, see Get the status of an offset request.
Do not issue delete and patch requests at the same time.
For source connectors, the connector attempts to read from the position defined in the base state.
Response:
Successful calls return HTTP 202 Accepted with a JSON payload that describes the result.
{
"id": "lcc-example123",
"name": "{connector_name}",
"offsets": [],
"requested_at": "2026-05-04T14:55:15.179333302Z",
"type": "DELETE"
}
Responses include the following information:
Empty offsets.
The time of the request to delete the offset.
Information about Kafka cluster and connector.
The type of request.
To get the status of a previous offset request, make a GET request that specifies the environment, Kafka cluster, and connector name.
GET /connect/v1/environments/{environment_id}/clusters/{kafka_cluster_id}/connectors/{connector_name}/offsets/request/status
Host: https://api.confluent.cloud
The status endpoint always shows the status of the most recent PATCH or DELETE operation.
Response:
Successful calls return HTTP 200 with a JSON payload that describes the result. The following example shows an applied delete.
{
"request": {
"id": "lcc-example123",
"name": "{connector_name}",
"offsets": [],
"requested_at": "2026-05-04T14:55:15.179333302Z",
"type": "DELETE"
},
"status": {
"phase": "APPLIED",
"message": "The offsets for this connector have been reset successfully."
},
"previous_offsets": [
{
"partition": {
"sobject": "Contact"
},
"offset": {
"commitNumber": 12345,
"commitTimestamp": 1777904180123,
"mode": "Event-Driven Sync",
"offset.version": 1,
"replayId": "AAAAAAAAALY=",
"snapshotCompletionTimestamp": 1777904179496
}
},
{
"partition": {
"sobject": "Account"
},
"offset": {
"lastId": "001gL00001MoGikQAF",
"lastSystemModstamp": "2026-05-27T05:48:54.000Z",
"mode": "Historical Snapshot",
"offset.version": 1,
"snapshotCompletionTimestamp": 1779986499361
}
}
],
"applied_at": "2026-05-04T14:55:34.115765732Z"
}
Responses include the following information:
The original request, including the time it was made.
The status of the request: applied, pending, or failed.
The time you issued the status request.
The previous offsets, which represent the offset state before the most recent update. Use these to restore the state of your connector if a patch update causes your connector to fail, or to return a connector to its previous state after rolling back.
JSON payload
The following table describes the unique fields in the JSON payload for managing offsets of the Salesforce Source V2 connector.
Field | Definition | Required or optional |
|---|---|---|
| The current loading phase of the SObject. Valid values are | Required |
| Offset schema version. Always | Required |
| Salesforce record ID of the last ingested record (15 or 18 characters). Used in | Optional |
| Timestamp of the last ingested record, formatted as | Optional |
| Bulk API 2.0 job ID of an in-flight query job. Empty string when no job is currently active. Present on offsets emitted during the snapshot-style modes when the connector is mid-Bulk-poll. | Optional |
| Bulk API 2.0 result locator for paginated reads of an in-flight job. Empty string when no job is currently active. | Optional |
| Unix timestamp in milliseconds when the Bulk snapshot completed. | Optional |
| Base64-encoded Salesforce Pub/Sub API replay ID for the last consumed CDC event. Used in | Optional |
| Unix timestamp in milliseconds of the last consumed CDC event commit. Used in | Optional |
| Salesforce commit number of the last consumed CDC event. Used in | Optional |
Quick start
Use this quick start to get up and running with the Confluent Cloud Salesforce Source V2 connector. The quick start provides the basics of selecting the connector and configuring it to stream events from Salesforce to Kafka topics.
If you’re migrating from the Salesforce Bulk API 2.0 Source or Salesforce CDC Source connector, see Migrate from Salesforce Bulk API 2.0 Source or CDC Source to Salesforce Source V2.
- Prerequisites
Authorized access to a Confluent Cloud cluster on Amazon Web Services (AWS), Microsoft Azure (Azure), or Google Cloud.
A Salesforce account with View All Data, API Enabled, View Setup and Configuration, and Read access to the target objects. When
real.time.ingestion.modeis set toEvent-Driven Sync (using Pub/Sub API), explicitly enable Change Data Capture for every SObject listed insobject.names. OAuth2 Client Credentials or JWT Bearer authentication must be configured in Salesforce.The Confluent CLI installed and configured for the cluster. For installation steps, see Install the Confluent CLI.
Schema Registry must be enabled to use a Schema Registry-based format, for example, Avro, JSON_SR (JSON Schema), or Protobuf.
For networking considerations, see Networking and DNS. To use a set of public egress IP addresses, see Public Egress IP Addresses for Confluent Cloud Connectors.
Kafka cluster credentials. The following lists the different ways you can provide credentials.
Enter an existing service account resource ID.
Create a Confluent Cloud service account for the connector. Make sure to review the ACL entries required in the service account documentation. Some connectors have specific ACL requirements.
Create a Confluent Cloud API key and secret. To create a key and secret, you can use confluent api-key create or you can autogenerate the API key and secret directly in the Cloud Console when setting up the connector.
Using the Confluent Cloud Console
Step 1: Launch your Confluent Cloud cluster
To create and launch a Kafka cluster in Confluent Cloud, see Create a kafka cluster in Confluent Cloud.
Step 2: Add a connector
In the left navigation menu, click Connectors. If you already have connectors in your cluster, click + Add connector.
Step 3: Select your connector
Click the Salesforce Source V2 connector card.

Step 4: Enter the connector details
Ensure you have all your prerequisites completed. An asterisk ( * ) designates a required entry.
At the Add Salesforce Source V2 Connector screen, complete the following:
Select the topic you want to send data to from the Topics list.
To create a new topic, click +Add new topic.
Select the way you want to provide Kafka Cluster credentials. You can choose one of the following options:
My account: This setting allows your connector to globally access everything that you have access to. With a user account, the connector uses an API key and secret to access the Kafka cluster. This option is not recommended for production.
Service account: This setting limits the access for your connector by using a service account. This option is recommended for production.
Use an existing API key: This setting allows you to specify an API key and a secret pair. You can use an existing pair or create a new one. This method is not recommended for production environments.
Note
Freight clusters support only service accounts for Kafka authentication.
Click Continue.
Configure the authentication properties:
Salesforce grant type: Salesforce grant type. Valid options are
PASSWORD,CLIENT_CREDENTIALS, andJWT_BEARER.
Salesforce details
Salesforce instance: The URL of the Salesforce endpoint to use. When using the
CLIENT_CREDENTIALSgrant type, provide your Salesforce domain URL. The default ishttps://login.salesforce.com, which directs the connector to use the endpoint specified in the authentication response.Salesforce username: The Salesforce username the connector uses.
Salesforce password: The Salesforce password the connector uses.
Salesforce password token: The Salesforce security token associated with the username.
Salesforce consumer key: The client ID (consumer key) for the Salesforce Connected App.
Salesforce consumer secret: The client secret (consumer secret) for the Salesforce Connected App.
Salesforce JWT keystore file: The Salesforce JWT keystore file that contains the private key.
Salesforce JWT keystore password: The password the connector uses to access the JWT keystore file.
Note
The following properties are used based on the Salesforce grant type you choose.
JWT_BEARER: Requires username, consumer key, JWT keystore file, and JWT keystore password.PASSWORD: Requires username, password, password token, consumer key, and consumer secret.CLIENT_CREDENTIALS: Requires consumer key, consumer secret (client ID and client secret of a Salesforce connected application) and Salesforce domain URL in Salesforce instance option. The default value https://login.salesforce.com does not work for this option. To useCLIENT_CREDENTIALS, you must enable the Client Credentials flow in your connected Salesforce application and assign an integration user.
Click Continue.
Properties not shown in the Cloud Console use the default values. For all property values and definitions, see Configuration properties.
Select output record value format: Sets the output Kafka record value format. Valid entries are
AVRO,JSON_SR, orPROTOBUF.Note
You need to have Confluent Cloud Schema Registry configured if using a schema-based message format like
AVRO,JSON_SR, orPROTOBUF.
Historical snapshot
Historical snapshot: When enabled, the connector uses Bulk API 2.0 to retrieve existing Salesforce records starting from the Since date.
Since: Specifies the
CreatedDateinyyyy-MM-ddformat from which the connector pulls existing Salesforce records. Applies only when historical snapshot is enabled and defaults to today.
Real-time ingestion
Real-time ingestion mode: Select the mode for continuous ingestion: Periodic Polling using Bulk API 2.0 or Event-Driven Sync using Pub/Sub API.
Topic prefix: Prefix added to topic names for multi-SObject configuration, for example,
prefix.Accountorprefix.Contact.SObject names: Comma-separated list of Salesforce SObjects to ingest, such as
Account,Contact, orLead.
Data encryption
Enable Client-Side Field Level Encryption for data encryption. Specify a Service Account to access the Schema Registry and associated encryption rules or keys with that schema. For more information on CSFLE or CSPE setup, see Manage encryption for connectors.
Show advanced configurations
Schema context: Select a schema context to use for this connector, if using a schema-based data format. This property defaults to the Default context, which configures the connector to use the default schema set up for Schema Registry in your Confluent Cloud environment. A schema context allows you to use separate schemas (like schema sub-registries) tied to topics in different Kafka clusters that share the same Schema Registry environment. For example, if you select a non-default context, a Source connector uses only that schema context to register a schema and a Sink connector uses only that schema context to read from. For more information about setting up a schema context, see What are schema contexts and when should you use them?.
Event sync start point: When set to
all, the connector replays buffered events from up to the last 72 hours. When set tolatest, it streams the latest events only.Gap event recovery: Defines what happens when the connector detects missed streaming events (gap events). Choose
resyncto run an incremental Bulk API resync from the last record’s commit timestamp,latestto skip missed events and continue from latest events only, orfailto stop the connector task.Full record on update: When enabled, CDC update events read full records instead of only changed fields. Applies only to
Event-Driven Syncmode.Emit tombstone on delete: When enabled, delete events emit Kafka tombstones. Applies only to
Event-Driven Syncmode.Gap resync safety buffer (ms): Safety buffer in milliseconds that the connector subtracts from the last commit time during incremental Bulk API resync. Applies only when gap event recovery is set to
resync. For tuning guidance, see the Salesforce Source V2 connector documentation.Poll interval (ms): Set the time in milliseconds to wait for new change events when no data is returned. Default is
500ms.Max records per result set: Maximum number of records returned per Bulk API 2.0 result set across all SObjects. The default is
1000.Include deleted records: Whether Bulk API queries must include soft-deleted records by using
queryAll.Skip epoch conversion fields: Date or datetime field names that stay as ISO 8601 strings instead of epoch milliseconds across all SObjects.
Additional Configs
Value Converter Decimal Format: Specifies the
JSONorJSON_SRserialization format for ConnectDECIMALlogical type values with two allowed literals:BASE64to serializeDECIMALlogical types as base64 encoded binary data, andNUMERICto serializeDECIMALlogical type values inJSONorJSON_SRas a number representing the decimal value.value.converter.replace.null.with.default: Whether to replace fields that have a default value and that are null to the default value. When set to true, the default value is used, otherwise null is used. Applicable for JSON Converter.
Key Converter Schema ID Serializer: The class name of the schema ID serializer for keys. This is used to serialize schema IDs in the message headers.
Value Converter Reference Subject Name Strategy: Sets the subject reference name strategy for values. Valid entries are
DefaultReferenceSubjectNameStrategyorQualifiedReferenceSubjectNameStrategy. You can use this strategy only withPROTOBUFformat; the default strategy isDefaultReferenceSubjectNameStrategy.value.converter.schemas.enable: Include schemas within each of the serialized values. Input messages must contain schema and payload fields and may not contain additional fields. For plain JSON data, set this to false. Applicable for JSON Converter.
errors.tolerance: Use this property if you would like to configure the connector’s error handling behavior. WARNING: This property should be used with CAUTION for SOURCE CONNECTORS as it may lead to dataloss. If you set this property to ‘all’, the connector will not fail on errant records, but will instead log them (and send to DLQ for Sink Connectors) and continue processing. If you set this property to ‘none’, the connector task will fail on errant records.
Value Converter Connect Meta Data: Enables the Connect converter to add its metadata to the output schema. Applies to Avro converters.
Value Converter Value Subject Name Strategy: Determines how to construct the subject name under which the value schema is registered with Schema Registry.
Key Converter Key Subject Name Strategy: Determines how to construct the subject name for key schema registration.
value.converter.ignore.default.for.nullables: When set to true, this property ensures that the corresponding record in Kafka is NULL, instead of showing the default column value. Applicable for AVRO,PROTOBUF and JSON_SR Converters.
Value Converter Schema ID Serializer: The class name of the schema ID serializer for values. This is used to serialize schema IDs in the message headers.
Auto-restart policy
Enable Connector Auto-restart: Enables the auto-restart behavior of the connector and its task in the event of user-actionable errors. Defaults to
true, enabling the connector to automatically restart in case of user-actionable errors. Set this property tofalseto disable auto-restart for failed connectors. In such cases, you would need to manually restart the connector.
Connection details
Max retry time (ms): Maximum time in milliseconds until the connector stops retrying failed Salesforce requests. Default is 30000 and minimum is 1000.
Transforms
Single Message Transforms: To add a new SMT, see Add transforms. For more information about unsupported SMTs, see Unsupported transformations.
Processing position
Set offsets: Click Set offsets to define a specific offset for this connector to begin procession data from. For more information on managing offsets, see Manage offsets.
For all property values and definitions, see Configuration properties.
Click Continue.
Tasks are distributed across SObjects using round-robin assignment. Each task owns its assigned SObjects through both the snapshot and streaming phases. The connector does not reduce to a single task when streaming begins. Setting tasks.max higher than the number of configured SObjects does not increase parallelism. Extra task slots remain idle.
To change the number of tasks, enter the number of tasks for the connector to use in the Tasks field.
Click Continue.
Verify the connection details.
Click Launch.
The status for the connector should go from Provisioning to Running.
Step 5: Check the Kafka topic
After the connector is running, verify that messages are populating your Kafka topics. When the historical snapshot is enabled, the connector loads historical data into topics named {prefix}.{SObject}, for example, salesforce.Account, and then automatically switches to real-time CDC streaming on the same topics.
For more information and examples to use with the Confluent Cloud API for Connect, see the Confluent Cloud API for Connect Usage Examples section.
Using the Confluent CLI
Complete the following steps to set up and run the connector using the Confluent CLI.
Ensure you have all your prerequisites completed.
Step 1: List the available connectors
Enter the following command to list available connectors:
confluent connect plugin list
Step 2: List the connector configuration properties
Enter the following command to show the connector configuration properties:
confluent connect plugin describe <connector-plugin-name>
The command output shows the required and optional configuration properties.
Step 3: Create the connector configuration file
Create a JSON file that contains the connector configuration properties. The following example shows the required connector properties using OAuth2 Client Credentials authentication with the historical snapshot enabled and Event-Driven Sync as the real-time mode.
{
"name": "SalesforceSourceV2_0",
"config": {
"connector.class": "SalesforceSourceV2",
"name": "SalesforceSourceV2_0",
"kafka.auth.mode": "KAFKA_API_KEY",
"kafka.api.key": "<my-kafka-api-key>",
"kafka.api.secret": "<my-kafka-api-secret>",
"salesforce.grant.type": "CLIENT_CREDENTIALS",
"salesforce.instance": "https://<your-domain>.my.salesforce.com",
"salesforce.consumer.key": "<my-consumer-key>",
"salesforce.consumer.secret": "<my-consumer-secret>",
"sobject.names": "Account,Contact",
"topic.prefix": "salesforce",
"historical.snapshot": "true",
"real.time.ingestion.mode": "Event-Driven Sync (using Pub/Sub API)",
"output.data.format": "AVRO",
"tasks.max": "1"
}
}
Note the following property definitions:
"name": Sets a name for your new connector."connector.class": Identifies the connector plugin name.
"kafka.auth.mode": Identifies the connector authentication mode you want to use. There are two options:SERVICE_ACCOUNTorKAFKA_API_KEY(the default). To use an API key and secret, specify the configuration propertieskafka.api.keyandkafka.api.secret, as shown in the example configuration (above). To use a service account, specify the Resource ID in the propertykafka.service.account.id=<service-account-resource-ID>. To list the available service account resource IDs, use the following command:confluent iam service-account list
For example:
confluent iam service-account list Id | Resource ID | Name | Description +---------+-------------+-------------------+------------------- 123456 | sa-l1r23m | sa-1 | Service account 1 789101 | sa-l4d56p | sa-2 | Service account 2
"salesforce.grant.type": Sets the authentication grant type. Valid entries areCLIENT_CREDENTIALSfor OAuth2 Client Credentials (recommended),JWT_BEARERfor Salesforce JSON Web Token, orPASSWORDfor username and password authentication.Note
The following properties are used based on the Salesforce grant type you choose.
JWT_BEARER: Requires username, consumer key, JWT keystore file, and JWT keystore password.PASSWORD: Requires username, password, password token, consumer key, and consumer secret.CLIENT_CREDENTIALS: Requires consumer key, consumer secret (client ID and client secret of a Salesforce connected application) and Salesforce domain URL in Salesforce instance option. The default value https://login.salesforce.com does not work for this option. To useCLIENT_CREDENTIALS, you must enable the Client Credentials flow in your connected Salesforce application and assign an integration user.
"salesforce.instance": The URL of your Salesforce instance. When usingCLIENT_CREDENTIALS, provide your Salesforce domain URL, for example,https://your-domain.my.salesforce.com. The default valuehttps://login.salesforce.comdoes not work forCLIENT_CREDENTIALS."salesforce.consumer.key": The consumer key for your Salesforce Connected App, also called the client ID. Required forCLIENT_CREDENTIALSandJWT_BEARERgrant types."salesforce.consumer.secret": The consumer secret for your Salesforce Connected App, also called the client secret. Required for theCLIENT_CREDENTIALSgrant type."sobject.names": A comma-separated list of Salesforce SObject API names to ingest, for example,Account,Contact,Lead. Specify SObject names using their exact Salesforce API name with appropriate casing."topic.prefix": The topic name prefix. Topics are named{prefix}.{SObject}, for example,salesforce.Account."historical.snapshot": When set totrue, the connector performs an initial historical load of existing Salesforce data using Bulk API 2.0 before switching to real-time streaming. Defaults totrue."real.time.ingestion.mode": Sets the real-time ingestion mode.Event-Driven Sync (using Pub/Sub API)is the default and uses the Pub/Sub API for real-time CDC.Periodic Polling (using Bulk API 2.0)uses Bulk API 2.0 for periodic polling."output.data.format": Sets the output Kafka record value format. Valid entries areAVRO,JSON_SR, orPROTOBUF. You must have Confluent Cloud Schema Registry configured if using a schema-based message format.
Note
To enable CSFLE or CSPE for data encryption, specify the following properties:
csfle.enabled: Flag to indicate whether the connector honors CSFLE or CSPE rules.sr.service.account.id: A Service Account to access the Schema Registry and associated encryption rules or keys with that schema.
For more information on CSFLE or CSPE setup, see Manage encryption for connectors.
"tasks.max": Maximum tasks for the connector. Tasks are distributed across SObjects and each task owns its assigned SObjects through both snapshot and streaming phases. Iftasks.maxexceeds the number of configured SObjects, extra task slots remain idle.
For all property values and descriptions, see Configuration properties.
Step 4: Load the configuration file and create the connector
Enter the following command to load the configuration and start the connector:
confluent connect cluster create --config-file <file-name>.json
For example:
confluent connect cluster create --config-file salesforce-source-v2-config.json
Example output:
Created connector SalesforceSourceV2_0 lcc-ix4dl
Step 5: Check the connector status
Enter the following command to check the connector status:
confluent connect cluster list
Example output:
ID | Name | Status | Type
+-----------+-----------------------------+---------+------+
lcc-ix4dl | SalesforceSourceV2_0 | RUNNING | source
Step 6: Check the Kafka topics
After the connector is running, verify that messages are populating your Kafka topics. Topics follow the naming pattern {prefix}.{SObject}, for example, salesforce.Account and salesforce.Contact.
For more information and examples to use with the Confluent Cloud API for Connect, see the Confluent Cloud API for Connect Usage Examples section.
Migrate from Salesforce Bulk API 2.0 Source or CDC Source to Salesforce Source V2
This version introduces changes that are not directly backward compatible, so Confluent provides Salesforce Source V2 as a separate connector type on Confluent Cloud.
Before you migrate
The Salesforce Source V2 connector is not a direct replacement for every Salesforce Bulk API 2.0 Source or Salesforce CDC Source. The following configurations cannot migrate directly:
For Bulk API 2.0 Source, you cannot migrate custom SOQL queries, per-object
sincedates, per-objectinclude.deleted.recordssettings, or custom per-object topic names.For CDC Source, you cannot migrate custom channels or configurations that rely on Streaming API replay-ID semantics.
The following changes apply to every migration:
Topic names change to the format
<topic.prefix>.<SObjectName>. You must repoint downstream consumers to the new topics.Record schemas might differ for some fields, which requires Confluent Schema Registry compatibility evaluation.
Salesforce CDC Source offsets cannot be directly translated because the Streaming API and Pub/Sub API have independent replay-ID spaces.
The Salesforce Source V2 connector supports only
AVRO,JSON_SR, andPROTOBUFoutput formats.Downstream applications must be idempotent to handle potential duplicates during the cutover window.
Migrate from Salesforce Bulk API 2.0 Source to Salesforce Source V2
Use the following procedure to migrate. Implement and validate the configuration in a pre-production environment before you promote it to production.
Pause the Salesforce Bulk API 2.0 Source connector to stabilize its stored offsets.
Fetch the connector offsets by using the Confluent Cloud Connect offsets API. The offset uses the following structure:
{ "partition": {"table": "<SObjectName>"}, "offset": { "time": <epoch-ms>, "jobId": "<jobId-or-null>", "locator": "<resultsLocator-when-non-null>", "offsetCheckpoint": "<watermark>", "lastModifiedCheckpoint": "<watermark>" } }
You must obtain the
lastModifiedCheckpointproperty value for each SObject.For more information, see the Connect offsets API reference and Manage Offsets for Fully-Managed Connectors in Confluent Cloud.
Create a configuration file with connector configurations and offsets.
{ "name": "<v2-connector-name>", "config": { "connector.class": "SalesforceSourceV2", "topic.prefix": "<your-prefix>", "sobject.names": "Account,Lead", "historical.snapshot": "false", "real.time.ingestion.mode": "Periodic Polling (using Bulk API 2.0)", "salesforce.grant.type": "...", "salesforce.instance": "...", "salesforce.consumer.key": "...", "salesforce.consumer.secret": "...", "output.data.format": "AVRO", "tasks.max": "1" }, "offsets": [ { "partition": {"sobject": "Account"}, "offset": { "offset.version": 1, "mode": "Periodic Polling", "lastSystemModstamp": "<lastModifiedCheckpoint for Account>", "lastId": "" } }, { "partition": {"sobject": "Lead"}, "offset": { "offset.version": 1, "mode": "Periodic Polling", "lastSystemModstamp": "<lastModifiedCheckpoint for Lead>", "lastId": "" } } ] }
Create the connector by running the following command:
confluent connect cluster create --config-file config.json
On startup, the connector reads each injected offset and runs its first-poll SOQL query by using the watermark filter branch, such as
WHERE SystemModstamp > <lastSystemModstamp>.Verify that the connector emits records to the new topics as expected.
Repoint downstream consumers to the new topics, which use the format
<topic.prefix>.<SObjectName>.Delete the Salesforce Bulk API 2.0 Source connector.
Migrate from Salesforce CDC Source to Salesforce Source V2
A parallel run is necessary because the Salesforce Pub/Sub API and Streaming API use independent replay ID spaces, meaning the Salesforce Source V2 connector cannot resume from the Salesforce CDC Source replay ID. Running both connectors in parallel ensures that the Salesforce Source V2 stream starts before the Salesforce CDC Source connector stops, which prevents event loss during the transition. Downstream deduplication removes any temporary duplicate records caused by this overlap.
Create the Salesforce Source V2 connector while the CDC connector is still running. Configure the Salesforce Source V2 connector for pure streaming from the latest point in the Pub/Sub stream, and add the list of objects that the CDC connector tracks:
{ "name": "<v2-connector-name>", "config": { "connector.class": "SalesforceSourceV2", "topic.prefix": "<your-prefix>", "sobject.names": "<SObject list>", "historical.snapshot": "false", "real.time.ingestion.mode": "Event-Driven Sync (using Pub/Sub API)", "event.sync.start.point": "latest", "salesforce.grant.type": "...", "salesforce.instance": "...", "salesforce.consumer.key": "...", "salesforce.consumer.secret": "...", "output.data.format": "AVRO", "tasks.max": "1" } }
Deploy the connector by running the following command:
confluent connect cluster create --config-file config.json
The Salesforce Source V2 connector opens a Pub/Sub API subscription at the next event after the subscription becomes active and begins emitting events while the old CDC connector continues to run in parallel.
Run both connectors in parallel for a buffer period of two to five minutes. Verify that both connectors receive the same change events in their respective topics.
Switch your downstream consumers to the Salesforce Source V2 topic. Your downstream applications must deduplicate these records by using a key such as the Salesforce record ID, the change type, and the commit timestamp. To avoid losing events during the switch, configure the consumer to read the Salesforce Source V2 topic from the earliest offset or from an Kafka offset that corresponds to the timestamp when the consumer stopped reading the Salesforce CDC Source topic.
Stop the Salesforce CDC Source connector after downstream applications read reliably from the new topic.
Delete the Salesforce CDC Source connector.
Considerations for deduplication and schema changes
Plan for downstream deduplication based on the Salesforce record ID and a change-marker field. For bulk-style flows, use the LastModifiedDate or SystemModstamp properties. For CDC flows, use the commit timestamp and the change type.
Evaluate schema compatibility for each subject before you promote the Salesforce Source V2 connector to production because the schemas might differ in field type mappings for certain Salesforce data types.
Configuration properties
Use the following configuration properties with the fully-managed connector. For self-managed connector property definitions and other details, see the connector docs in Self-managed connectors for Confluent Platform.
How should we connect to your data?
nameSets a name for your connector.
Type: string
Valid Values: A string at most 64 characters long
Importance: high
Kafka Cluster credentials
kafka.auth.modeKafka Authentication mode. It can be one of KAFKA_API_KEY or SERVICE_ACCOUNT. It defaults to KAFKA_API_KEY mode, whenever possible.
Type: string
Valid Values: SERVICE_ACCOUNT, KAFKA_API_KEY
Importance: high
kafka.api.keyKafka API Key. Required when kafka.auth.mode==KAFKA_API_KEY.
Type: password
Importance: high
kafka.service.account.idThe Service Account that will be used to generate the API keys to communicate with Kafka Cluster.
Type: string
Importance: high
kafka.api.secretSecret associated with Kafka API key. Required when kafka.auth.mode==KAFKA_API_KEY.
Type: password
Importance: high
Schema Config
schema.context.nameAdd a schema context name. A schema context represents an independent scope in Schema Registry. It is a separate sub-schema tied to topics in different Kafka clusters that share the same Schema Registry instance. If not used, the connector uses the default schema configured for Schema Registry in your Confluent Cloud environment.
Type: string
Default: default
Importance: medium
How should we connect to Salesforce?
salesforce.grant.typeOAuth grant type the connector uses for Salesforce authentication: PASSWORD, CLIENT_CREDENTIALS, or JWT_BEARER
Type: string
Default: PASSWORD
Importance: high
salesforce.instanceSalesforce endpoint URL the connector uses for API requests. For CLIENT_CREDENTIALS, use your Salesforce domain URL. Defaults to https://login.salesforce.com.
Type: string
Default: https://login.salesforce.com
Importance: high
salesforce.usernameSalesforce username for connector authentication
Type: string
Importance: high
salesforce.passwordSalesforce account password for connector authentication
Type: password
Importance: high
salesforce.password.tokenSecurity token associated with the Salesforce username
Type: password
Importance: high
salesforce.consumer.keyClient ID (consumer key) for the Salesforce connected app
Type: password
Importance: high
salesforce.consumer.secretClient secret (consumer secret) for the Salesforce connected app
Type: password
Importance: medium
salesforce.jwt.keystore.fileKeystore file that stores the private key for JWT authentication
Type: password
Default: [hidden]
Importance: medium
salesforce.jwt.keystore.passwordPassword that unlocks the JWT keystore file
Type: password
Importance: medium
Historical snapshot
historical.snapshotWhen enabled, the connector uses Bulk API 2.0 to retrieve existing Salesforce records starting from the Since date
Type: boolean
Default: true
Importance: high
sinceCreatedDate in yyyy-MM-dd format from which the connector pulls existing Salesforce records. Applies only when historical snapshot is enabled and defaults to today.
Type: string
Importance: medium
Real-time ingestion
real.time.ingestion.modeSelect mode for continuous ingestion: Periodic Polling using Bulk API 2.0 or Event-Driven Sync using Pub/Sub API
Type: string
Default: Event-Driven Sync (using Pub/Sub API)
Importance: high
topic.prefixPrefix added to topic names for multi-SObject configuration, for example prefix.Account or prefix.Contact.
Type: string
Importance: high
sobject.namesComma-separated list of Salesforce SObjects to ingest, such as Account, Contact, or Lead.
Type: list
Importance: high
Connection details
request.max.retries.time.msMaximum time in milliseconds until the connector stops retrying failed Salesforce requests. Default is 30000 and minimum is 1000.
Type: long
Default: 30000 (30 seconds)
Valid Values: [1000,…,250000]
Importance: low
Output messages
output.data.formatSets the output Kafka record value format. Valid entries are AVRO, JSON_SR, or PROTOBUF.
Type: string
Default: JSON
Importance: high
Number of tasks for this connector
tasks.maxMaximum number of tasks for the connector.
Type: int
Valid Values: [1,…]
Importance: high
Advanced Connector Configs
event.sync.start.pointWhen set to all, the connector replays buffered events from up to the last 72 hours. When set to latest, it streams the latest events only.
Type: string
Default: latest
Importance: medium
gap.event.recoveryDefines what happens when the connector detects missed streaming events (gap events). Choose resync to run an incremental Bulk API resync from the last record’s commit timestamp, latest to skip missed events and continue from latest events only, or fail to stop the connector task.
Type: string
Default: resync
Importance: high
cdc.full.record.on.updateWhen enabled, CDC update events read full records instead of only changed fields. Applies only to Event-Driven Sync mode.
Type: boolean
Default: false
Importance: medium
emit.tombstone.on.deleteWhen enabled, delete events emit Kafka tombstones. Applies only to Event-Driven Sync mode.
Type: boolean
Default: false
Importance: medium
gap.resync.buffer.msSafety buffer in milliseconds that the connector subtracts from the last commit time during incremental Bulk API resync. Applies only when gap event recovery is set to resync. See the Salesforce Source V2 connector documentation for tuning guidance.
Type: long
Default: 60000 (1 minute)
Importance: low
poll.interval.msInterval in milliseconds between consecutive polls. If the connector restarts during an interval, it starts a new full interval, which can make the actual wait longer than this value. See the Salesforce Source V2 connector documentation for examples and recommended values.
Type: long
Default: 30000 (30 seconds)
Valid Values: [8700,…,10800000]
Importance: medium
bulk.api.max.records.per.pollMaximum number of records returned per Bulk API 2.0 result set across all SObjects. Default is 1000.
Type: int
Default: 1000
Valid Values: [1,…,3000]
Importance: medium
include.deleted.recordsWhether Bulk API queries must include soft-deleted records by using queryAll
Type: boolean
Default: false
Importance: medium
skip.epoch.conversion.fieldsDate or datetime field names that stay as ISO 8601 strings instead of epoch milliseconds across all SObjects.
Type: list
Default: “”
Importance: low
Additional Configs
header.converterThe converter class for the headers. This is used to serialize and deserialize the headers of the messages.
Type: string
Importance: low
producer.override.compression.typeThe compression type for all data generated by the producer. Valid values are none, gzip, snappy, lz4, and zstd.
Type: string
Importance: low
producer.override.linger.msThe producer groups together any records that arrive in between request transmissions into a single batched request. More details can be found in the documentation: https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html#linger-ms.
Type: long
Valid Values: [100,…,1000]
Importance: low
value.converter.allow.optional.map.keysAllow optional string map key when converting from Connect Schema to Avro Schema. Applicable for Avro Converters.
Type: boolean
Importance: low
value.converter.auto.register.schemasSpecify if the Serializer should attempt to register the Schema.
Type: boolean
Importance: low
value.converter.connect.meta.dataAllow the Connect converter to add its metadata to the output schema. Applicable for Avro Converters.
Type: boolean
Importance: low
value.converter.enhanced.avro.schema.supportEnable enhanced schema support to preserve package information and Enums. Applicable for Avro Converters.
Type: boolean
Importance: low
value.converter.enhanced.protobuf.schema.supportEnable enhanced schema support to preserve package information. Applicable for Protobuf Converters.
Type: boolean
Importance: low
value.converter.flatten.unionsWhether to flatten unions (oneofs). Applicable for Protobuf Converters.
Type: boolean
Importance: low
value.converter.generate.index.for.unionsWhether to generate an index suffix for unions. Applicable for Protobuf Converters.
Type: boolean
Importance: low
value.converter.generate.struct.for.nullsWhether to generate a struct variable for null values. Applicable for Protobuf Converters.
Type: boolean
Importance: low
value.converter.int.for.enumsWhether to represent enums as integers. Applicable for Protobuf Converters.
Type: boolean
Importance: low
value.converter.latest.compatibility.strictVerify latest subject version is backward compatible when use.latest.version is true.
Type: boolean
Importance: low
value.converter.object.additional.propertiesWhether to allow additional properties for object schemas. Applicable for JSON_SR Converters.
Type: boolean
Importance: low
value.converter.optional.for.nullablesWhether nullable fields should be specified with an optional label. Applicable for Protobuf Converters.
Type: boolean
Importance: low
value.converter.optional.for.proto2Whether proto2 optionals are supported. Applicable for Protobuf Converters.
Type: boolean
Importance: low
value.converter.scrub.invalid.namesWhether to scrub invalid names by replacing invalid characters with valid characters. Applicable for Avro and Protobuf Converters.
Type: boolean
Importance: low
value.converter.use.latest.versionUse latest version of schema in subject for serialization when auto.register.schemas is false.
Type: boolean
Importance: low
value.converter.use.optional.for.nonrequiredWhether to set non-required properties to be optional. Applicable for JSON_SR Converters.
Type: boolean
Importance: low
value.converter.wrapper.for.nullablesWhether nullable fields should use primitive wrapper messages. Applicable for Protobuf Converters.
Type: boolean
Importance: low
value.converter.wrapper.for.raw.primitivesWhether a wrapper message should be interpreted as a raw primitive at root level. Applicable for Protobuf Converters.
Type: boolean
Importance: low
errors.toleranceUse this property if you would like to configure the connector’s error handling behavior. WARNING: This property should be used with CAUTION for SOURCE CONNECTORS as it may lead to dataloss. If you set this property to ‘all’, the connector will not fail on errant records, but will instead log them (and send to DLQ for Sink Connectors) and continue processing. If you set this property to ‘none’, the connector task will fail on errant records.
Type: string
Default: none
Importance: low
key.converter.key.schema.id.serializerThe class name of the schema ID serializer for keys. This is used to serialize schema IDs in the message headers.
Type: string
Default: io.confluent.kafka.serializers.schema.id.PrefixSchemaIdSerializer
Importance: low
key.converter.key.subject.name.strategyHow to construct the subject name for key schema registration.
Type: string
Default: TopicNameStrategy
Importance: low
value.converter.decimal.formatSpecify the JSON/JSON_SR serialization format for Connect DECIMAL logical type values with two allowed literals:
BASE64 to serialize DECIMAL logical types as base64 encoded binary data and
NUMERIC to serialize Connect DECIMAL logical type values in JSON/JSON_SR as a number representing the decimal value.
Type: string
Default: BASE64
Importance: low
value.converter.flatten.singleton.unionsWhether to flatten singleton unions. Applicable for Avro and JSON_SR Converters.
Type: boolean
Default: false
Importance: low
value.converter.ignore.default.for.nullablesWhen set to true, this property ensures that the corresponding record in Kafka is NULL, instead of showing the default column value. Applicable for AVRO,PROTOBUF and JSON_SR Converters.
Type: boolean
Default: false
Importance: low
value.converter.reference.subject.name.strategySet the subject reference name strategy for value. Valid entries are DefaultReferenceSubjectNameStrategy or QualifiedReferenceSubjectNameStrategy. Note that the subject reference name strategy can be selected only for PROTOBUF format with the default strategy being DefaultReferenceSubjectNameStrategy.
Type: string
Default: DefaultReferenceSubjectNameStrategy
Importance: low
value.converter.replace.null.with.defaultWhether to replace fields that have a default value and that are null to the default value. When set to true, the default value is used, otherwise null is used. Applicable for JSON Converter.
Type: boolean
Default: true
Importance: low
value.converter.schemas.enableInclude schemas within each of the serialized values. Input messages must contain schema and payload fields and may not contain additional fields. For plain JSON data, set this to false. Applicable for JSON Converter.
Type: boolean
Default: false
Importance: low
value.converter.value.schema.id.serializerThe class name of the schema ID serializer for values. This is used to serialize schema IDs in the message headers.
Type: string
Default: io.confluent.kafka.serializers.schema.id.PrefixSchemaIdSerializer
Importance: low
value.converter.value.subject.name.strategyDetermines how to construct the subject name under which the value schema is registered with Schema Registry.
Type: string
Default: TopicNameStrategy
Importance: low
Auto-restart policy
auto.restart.on.user.errorEnable connector to automatically restart on user-actionable errors.
Type: boolean
Default: true
Importance: medium
Next steps
For an example that shows fully-managed Confluent Cloud connectors in action with Confluent Cloud for Apache Flink, see the Cloud ETL Demo. This example also shows how to use Confluent CLI to manage your resources in Confluent Cloud.
