PostgreSQL CDC Source V2 (Debezium) Connector for Confluent Cloud

The fully-managed PostgreSQL Change Data Capture (CDC) Source V2 (Debezium) connector for Confluent Cloud can obtain a snapshot of the existing data in a PostgreSQL database and then monitor and record all subsequent row-level changes to that data. The connector supports Avro, JSON Schema, Protobuf, or JSON (schemaless) output data formats. All of the events for each table are recorded in a separate Apache Kafka® topic. The events can then be easily consumed by applications and services.

Note

V2 Improvements

Note the following improvements made to the V2 connector.

  • Added support for PostgreSQL 15.
  • Added support for PostgreSQL 16. Note: The connector does not support logical replication from standby servers.
  • Supports columns of type bytea[], an array of bytes (byte array).
  • Filtered publications are updated automatically when updating the table capture list.
  • Can stop or pause an in-progress incremental snapshot. Can resume the incremental snapshot if it was previously been paused.
  • Supports regular expressions to specify table names for incremental snapshots.
  • Supports SQL-based predicates to control the subset of records to be included in the incremental snapshot.
  • Supports specifying a single column as a surrogate key for performing incremental snapshots.
  • Can perform ad-hoc blocking snapshots.
  • Indices that rely on hidden, auto-generated columns, or columns wrapped in database functions are no longer considered primary key alternatives for tables that do not have a primary key defined.
  • Configuration options to specify how topic and schema names should be adjusted for compatibility.

Features

The PostgreSQL CDC Source V2 (Debezium) connector provides the following features:

  • Topics created automatically: The connector automatically creates Kafka topics using the naming convention: <topic.prefix>.<schemaName>.<tableName>. The tables are created with the properties: topic.creation.default.partitions=1 and topic.creation.default.replication.factor=3. For more information, see Maximum message size.
  • Logical decoding plugins supported: pgoutput. The default used is pgoutput.
  • Database authentication: Uses password authentication.
  • SSL support: Supports SSL encryption.
  • Tables included and Tables excluded: Sets whether a table is or is not monitored for changes. By default, the connector monitors every non-system table.
  • Tombstones on delete: Configures whether a tombstone event should be generated after a delete event. Default is true.
  • Output data formats: The connector supports Avro, JSON Schema, Protobuf, or JSON (schemaless) output Kafka record value format. It supports Avro, JSON Schema, Protobuf, JSON (schemaless), and String output record key format. Schema Registry must be enabled to use a Schema Registry-based format (for example, Avro, JSON_SR (JSON Schema), or Protobuf). See Schema Registry Enabled Environments for additional information.
  • Tasks per connector: Organizations can run multiple connectors with a limit of one task per connector ("tasks.max": "1").
  • Incremental snapshot: Supports incremental snapshotting via signaling.

For more information and examples to use with the Confluent Cloud API for Connect, see the Confluent Cloud API for Managed and Custom Connectors section.

Supported database versions

The PostgreSQL CDC Source V2 (Debezium) connector is compatible with the following PostgreSQL versions: 10, 11, 12, 13, 14, 15, 16.

Limitations

Be sure to review the following information.

Maximum message size

This connector creates topics automatically. When it creates topics, the internal connector configuration property max.message.bytes is set to the following:

  • Basic cluster: 8 MB
  • Standard cluster: 8 MB
  • Enterprise cluster: 8 MB
  • Dedicated cluster: 20 MB

For more information about Confluent Cloud clusters, see Kafka Cluster Types in Confluent Cloud.

Log retention during snapshot

When launched, the CDC connector creates a snapshot of the existing data in the database to capture the nominated tables. To do this, the connector executes a “SELECT *” statement. Completing the snapshot can take a while if one or more of the nominated tables is very large.

During the snapshot process, the replication slot is not advanced. This is to ensure that the database server does not remove WAL segments needed for replication once the snapshot process completes. If one or more tables are very large in size, the snapshot process could take a long time to complete. In situations with a high rate of change, it is possible that the PostgreSQL disk space consumed by WAL files keeps increasing. This has the potential to exhaust the disk space on the database server, leading to database operation failures or server shutdown.

Quick Start

Use this quick start to get up and running with the Confluent Cloud PostgreSQL CDC Source V2 (Debezium) connector. The quick start provides the basics of selecting the connector and configuring it to obtain a snapshot of the existing data in a PostgreSQL database and then monitoring and recording all subsequent row-level changes.

Prerequisites
  • Authorized access to a Confluent Cloud cluster on Amazon Web Services (AWS), Microsoft Azure (Azure), or Google Cloud (Google Cloud).

  • The Confluent CLI installed and configured for the cluster. See Install the Confluent CLI.

  • Schema Registry must be enabled to use a Schema Registry-based format (for example, Avro, JSON_SR (JSON Schema), or Protobuf). See Schema Registry Enabled Environments for additional information.

  • The PostgreSQL database must be configured for CDC. For details, see PostgreSQL in the Cloud.

  • Public access may be required for your database. See Manage Networking for Confluent Cloud Connectors for details. The following example shows the AWS Management Console when setting up a PostgreSQL database.

    AWS example showing public access for PostgreSQL

    Public access enabled

  • A parameter group with the property rds.logical_replication=1 is required. An example is shown below. Once created, you must reboot the database.

    Parameter Group

    Parameter group

    RDS Logical Replication

    RDS logical replication

  • For networking considerations, see Networking, DNS, and service endpoints. To use a set of public egress IP addresses, see Public Egress IP Addresses for Confluent Cloud Connectors. The following example shows the AWS Management Console when setting up security group rules for the VPC.

    AWS example showing security group rules

    Open inbound traffic

    Note

    See your specific cloud platform documentation for how to configure security rules for your VPC.

  • Kafka cluster credentials. The following lists the different ways you can provide credentials.
    • Enter an existing service account resource ID.
    • Create a Confluent Cloud service account for the connector. Make sure to review the ACL entries required in the service account documentation. Some connectors have specific ACL requirements.
    • Create a Confluent Cloud API key and secret. To create a key and secret, you can use confluent api-key create or you can autogenerate the API key and secret directly in the Cloud Console when setting up the connector.

Using the Confluent Cloud Console

Step 1: Launch your Confluent Cloud cluster

See the Quick Start for Confluent Cloud for installation instructions.

Step 2: Add a connector

In the left navigation menu, click Connectors. If you already have connectors in your cluster, click + Add connector.

Step 3: Select your connector

Click the PostgreSQL CDC Source V2 connector card.

PostgreSQL Source Connector V2 Card

Step 4: Enter the connector details

Note

At the Add Postgres CDC Source V2 (Debezium) Connector screen, complete the following:

  1. Select the way you want to provide Kafka Cluster credentials. You can choose one of the following options:
    • Global Access: Allows your connector to access everything you have access to. With global access, connector access will be linked to your account. This option is not recommended for production.
    • Granular access: Limits the access for your connector. You will be able to manage connector access through a service account. This option is recommended for production.
    • Use an existing API key: Allows you to enter an API key and secret part you have stored. You can enter an API key and secret (or generate these in the Cloud Console).
  2. Click Continue.

Step 5: Check the Kafka topic

After the connector is running, verify that messages are populating your Kafka topic.

For more information and examples to use with the Confluent Cloud API for Connect, see the Confluent Cloud API for Managed and Custom Connectors section.

Using the Confluent CLI

Complete the following steps to set up and run the connector using the Confluent CLI.

Note

Make sure you have all your prerequisites completed.

Step 1: List the available connectors

Enter the following command to list available connectors:

confluent connect plugin list

Step 2: List the connector configuration properties

Enter the following command to show the connector configuration properties:

confluent connect plugin describe <connector-plugin-name>

The command output shows the required and optional configuration properties.

Step 3: Create the connector configuration file

Create a JSON file that contains the connector configuration properties. The following example shows the required connector properties.

{
  "connector.class": "PostgresCdcSourceV2",
  "name": "PostgresCdcSourceV2Connector_0",
  "kafka.auth.mode": "KAFKA_API_KEY",
  "kafka.api.key": "****************",
  "kafka.api.secret": "****************************************************************",
  "database.hostname": "debezium-1.<host-id>.us-east-2.rds.amazonaws.com",
  "database.port": "5432",
  "database.user": "postgres",
  "database.password": "**************",
  "database.dbname": "postgres",
  "topic.prefix": "cdc",
  "slot.name": "dbz_slot",
  "publication.name": "dbz_publication",
  "table.include.list":"public.passengers",
  "output.data.format": "JSON",
  "tasks.max": "1"
}

Note the following property definitions:

  • "connector.class": Identifies the connector plugin name.
  • "name": Sets a name for your new connector.
  • "kafka.auth.mode": Identifies the connector authentication mode you want to use. There are two options: SERVICE_ACCOUNT or KAFKA_API_KEY (the default). To use an API key and secret, specify the configuration properties kafka.api.key and kafka.api.secret, as shown in the example configuration (above). To use a service account, specify the Resource ID in the property kafka.service.account.id=<service-account-resource-ID>. To list the available service account resource IDs, use the following command:

    confluent iam service-account list
    

    For example:

    confluent iam service-account list
    
       Id     | Resource ID |       Name        |    Description
    +---------+-------------+-------------------+-------------------
       123456 | sa-l1r23m   | sa-1              | Service account 1
       789101 | sa-l4d56p   | sa-2              | Service account 2
    
  • "database.hostname": IP address or hostname of the PostgreSQL database server.

  • "database.port": Port number of the PostgreSQL database server.

  • "database.user": The name of the PostgreSQL database user that has the required authorization.

  • "database.password": Password of the PostgreSQL database user that has the required authorization.

  • "database.dbname": The name of the PostgreSQL database from which to stream the changes.

  • "topic.prefix": Provides a namespace for the particular database server/cluster that the connector is capturing changes from.

  • "slot.name": The name of the PostgreSQL logical decoding slot that is created for streaming changes from a particular plug-in for a particular database/schema. The slot name can contain only lower-case letters, numbers, and the underscore character.

  • "publication.name": The name of the PostgreSQL publication created for streaming changes when using the standard logical decoding plugin (pgoutput).

  • "table.include.list": An optional, comma-separated list of fully-qualified table identifiers for tables whose changes you want to capture. By default, the connector monitors all non-system tables. A fully-qualified table name is in the form schemaName.tableName. This property cannot be used with the property table.exclude.list.

  • "output.data.format": Sets the output record format (data coming from the connector). Valid entries are AVRO, JSON_SR, PROTOBUF, or JSON. You must have Confluent Cloud Schema Registry configured if using a schema-based record format (for example, Avro, JSON_SR (JSON Schema), or Protobuf).

  • "tasks.max": Enter the number of tasks in use by the connector. Organizations can run multiple connectors with a limit of one task per connector (that is, "tasks.max": "1").

Single Message Transforms: See the Single Message Transforms (SMT) documentation for details about adding SMTs using the CLI. For additional information about the Debezium SMTs ExtractNewRecordState and EventRouter (Debezium), see Debezium transformations.

See Configuration Properties for all property values and definitions.

Step 4: Load the properties file and create the connector

Enter the following command to load the configuration and start the connector:

confluent connect cluster create --config-file <file-name>.json

For example:

confluent connect cluster create --config-file postgresql-cdc-source-v2.json

Example output:

Created connector PostgresCdcSourceV2Connector_0 lcc-ix4dl

Step 5: Check the connector status

Enter the following command to check the connector status:

confluent connect cluster list

Example output:

ID          |            Name                | Status  |  Type
+-----------+--------------------------------+---------+-------+
lcc-ix4dl   | PostgresCdcSourceV2Connector_0 | RUNNING | source

Step 6: Check the Kafka topic.

After the connector is running, verify that messages are populating your Kafka topic.

For more information and examples to use with the Confluent Cloud API for Connect, see the Confluent Cloud API for Managed and Custom Connectors section.

After-state only output limitation

When a connector is configured with the property after.state.only set to false, you expect to see the previous values of all columns under before in the record.

However, for certain conditions before will contain null or partially displayed columns. If Protobuf is used, the record may not contain the before field at all. The following example shows this issue and provides a corrective action to take. However, depending on the REPLICA IDENTITY setting of the corresponding table, the before field will be set to null or show a subset of the columns. If PROTOBUF is used, the record may not contain the before field at all. The following example shows this issue and provides a corrective action to take.

{
  "before": null,
  "after": {
    "id": 5,
    "name": "Allen William Henry",
    "sex": "male",
    "age": 25,
    "sibsp": 0,
    "parch": 0,
    "created_at": "2024-01-17T11:30:40.831461Z"
   },
   "source": {
     "version": "2.4.2.Final",
     "connector": "postgresql",
     "name": "test",
     "ts_ms": 1705471663123,
     "snapshot": "false",
     "db": "postgres",
     "sequence": "[null,\"8736500352768\"]",
     "schema": "public",
     "table": "passengers",
     "txId": 572,
     "lsn": 8736500352768,
     "xmin": null
   },
   "op": "u",
   "ts_ms": 1705471663501,
   "transaction": null
}

For an updated record to contain the previous (before) values of all columns in the row, you need to modify the passengers table by running ALTER TABLE passengers REPLICA IDENTITY FULL. After you make this change in the PostgreSQL database, and records are updated, you should see records similar to the following sample.

{
  "before": {
    "id": 8,
    "name": "Gosta Leonard",
    "sex": "male",
    "age": 2,
    "sibsp": 3,
    "parch": 1,
    "created_at": "2024-01-17T11:30:55.955056Z"
  },
  "after": {
    "id": 8,
    "name": "Gosta Leonard",
    "sex": "male",
    "age": 25,
    "sibsp": 3,
    "parch": 1,
    "created_at": "2024-01-17T11:30:55.955056Z"
  },
  "source": {
    "version": "2.4.2.Final",
    "connector": "postgresql",
    "name": "test",
    "ts_ms": 1705471953456,
    "snapshot": "false",
    "db": "postgres",
    "sequence": "[\"8736433249408\",\"8736500352768\"]",
    "schema": "public",
    "table": "passengers",
    "txId": 581,
    "lsn": 8736500482568,
    "xmin": null
  },
  "op": "u",
  "ts_ms": 1705471953986,
  "transaction": null
}

Moving from V1 to V2

Version 2 of this connector supports new features and breaking changes that are not backward compatible with version 1 of the connector. To understand these changes and to plan for moving to version 2, see Backward Incompatible Changes in Debezium CDC V2 Connectors.

Given the backward-incompatible changes between version 1 and 2 of the CDC connectors, version 2 is being provided in a new set of CDC connectors on Confluent Cloud. You can provision either version 1 or version 2. However, note that eventually version 1 will be deprecated and no longer supported.

Before exploring your options for moving from version 1 to 2, be sure to make the required changes documented in Backward Incompatible Changes in Debezium CDC V2 Connectors.

The following lists options for moving from version 1 to 2 (v1 to v2).

  • The easiest option is to delete the existing v1 connector and then provision a v2 connector with an identical configuration, except with snapshot.mode set to initial. This triggers a complete re-snapshotting of the capture tables. This process will take a long time for very large tables. This process can result in duplicate events in the existing change event topics. You can use a different value for topic.prefix to emit the change events to a new set of topics.
  • If you don’t mind duplicating a few change events, you can delete the existing v1 connector and then provision a v2 connector with an identical configuration, except with snapshot.mode set to never. Since this is a new connector and there are no previously stored offsets in the Kafka offsets topic, the connector starts streaming changes from the earliest available position in the replication slot.
  • If temporarily halting writes to the database is an option, you can use the following steps to move from v1 to v2 no data loss nor duplicates.
    1. Configure every application to stop doing writes to the database, or put them into read-only mode.
    2. Give the connector enough time to capture all the change events that are written to the transaction log.
    3. Verify that the existing connector has completed consuming entries from the CDC tables by checking the last consumed message on the Kafka topic.
    4. Delete the existing v1 connector.
    5. Provision a v2 connector with similar configuration to the previous one.
      • Set snapshot.mode to schema_only to prevent re-snapshotting.
      • slot.name to a new slot name - to ensure only the latest change events are captured.
    6. Re-configure the applications to resume the writes, or put them back into read/write mode.

Important

Implement and validate any connector changes in a pre-production environment before promoting to production.

Configuration Properties

Use the following configuration properties with the fully-managed connector. For self-managed connector property definitions and other details, see the connector docs in Self-managed connectors for Confluent Platform.

How should we connect to your data?

name

Sets a name for your connector.

  • Type: string
  • Valid Values: A string at most 64 characters long
  • Importance: high

Kafka Cluster credentials

kafka.auth.mode

Kafka Authentication mode. It can be one of KAFKA_API_KEY or SERVICE_ACCOUNT. It defaults to KAFKA_API_KEY mode.

  • Type: string
  • Default: KAFKA_API_KEY
  • Valid Values: KAFKA_API_KEY, SERVICE_ACCOUNT
  • Importance: high
kafka.api.key

Kafka API Key. Required when kafka.auth.mode==KAFKA_API_KEY.

  • Type: password
  • Importance: high
kafka.service.account.id

The Service Account that will be used to generate the API keys to communicate with Kafka Cluster.

  • Type: string
  • Importance: high
kafka.api.secret

Secret associated with Kafka API key. Required when kafka.auth.mode==KAFKA_API_KEY.

  • Type: password
  • Importance: high

How should we connect to your database?

database.hostname

IP address or hostname of the PostgreSQL database server.

  • Type: string
  • Importance: high
database.port

Port number of the PostgreSQL database server.

  • Type: int
  • Valid Values: [0,…,65535]
  • Importance: high
database.user

The name of the PostgreSQL database user that has the required authorization.

  • Type: string
  • Importance: high
database.password

Password of the PostgreSQL database user that has the required authorization.

  • Type: password
  • Importance: high
database.dbname

The name of the PostgreSQL database from which to stream the changes.

  • Type: string
  • Importance: high
database.sslmode

Whether to use an encrypted connection to the PostgreSQL server. Possible settings are: disable, prefer, and require.

disable uses an unencrypted connection.

prefer attempts to use a secure (encrypted) connection first and, failing that, an unencrypted connection.

require uses a secure (encrypted) connection, and fails if one cannot be established.

  • Type: string
  • Default: prefer
  • Importance: high

Output messages

output.data.format

Sets the output Kafka record value format. Valid entries are AVRO, JSON_SR, PROTOBUF, or JSON. Note that you need to have Confluent Cloud Schema Registry configured if using a schema-based message format like AVRO, JSON_SR, and PROTOBUF

  • Type: string
  • Importance: high
output.key.format

Sets the output Kafka record key format. Valid entries are AVRO, JSON_SR, PROTOBUF, STRING or JSON. Note that you need to have Confluent Cloud Schema Registry configured if using a schema-based message format like AVRO, JSON_SR, and PROTOBUF

  • Type: string
  • Default: JSON
  • Valid Values: AVRO, JSON, JSON_SR, PROTOBUF, STRING
  • Importance: high
json.output.decimal.format

Specify the JSON/JSON_SR serialization format for Connect DECIMAL logical type values with two allowed literals:

BASE64 to serialize DECIMAL logical types as base64 encoded binary data and

NUMERIC to serialize Connect DECIMAL logical type values in JSON/JSON_SR as a number representing the decimal value.

  • Type: string
  • Default: BASE64
  • Importance: low
after.state.only

Controls whether the generated Kafka record should contain only the state of the row after the event occurred.

  • Type: boolean
  • Default: false
  • Importance: low
tombstones.on.delete

Controls whether a tombstone event should be generated after a delete event.

true - a delete operation is represented by a delete event and a subsequent tombstone event.

false - only a delete event is emitted.

After a source record is deleted, emitting the tombstone event (the default behavior) allows Kafka to completely delete all events that pertain to the key of the deleted row in case log compaction is enabled for the topic.

  • Type: boolean
  • Default: true
  • Importance: medium

How should we name your topic(s)?

topic.prefix

Topic prefix that provides a namespace (logical server name) for the particular PostgreSQL database server or cluster in which Debezium is capturing changes. The prefix should be unique across all other connectors, since it is used as a topic name prefix for all Kafka topics that receive records from this connector. Only alphanumeric characters, hyphens, dots and underscores must be used. The connector automatically creates Kafka topics using the naming convention: <topic.prefix>.<schemaName>.<tableName>.

  • Type: string
  • Importance: high

Database config

slot.name

The name of the PostgreSQL logical decoding slot that was created for streaming changes from a particular plug-in for a particular database/schema. The server uses this slot to stream events to the Debezium connector that you are configuring. Slot names must conform to PostgreSQL replication slot naming rules, which state: “Each replication slot has a name, which can contain lower-case letters, numbers, and the underscore character.”

  • Type: string
  • Default: debezium
  • Valid Values: Must match the regex ^[a-z0-9_]+$
  • Importance: medium
publication.name

The name of the PostgreSQL publication created for streaming changes when using pgoutput. Based on the value of publication.autocreate.mode the publication is created at start-up if it does not already exist and it includes all tables. Debezium then applies its own include/exclude list filtering, if configured, to limit the publication to change events for the specific tables of interest. The connector user must have superuser permissions to create this publication, so it is usually preferable to create the publication before starting the connector for the first time. If the publication already exists, either for all tables or configured with a subset of tables, Debezium uses the publication as it is defined.

  • Type: string
  • Default: dbz_publication
  • Valid Values: Must match the regex ^[^\s\"\'\`]+$
  • Importance: medium
publication.autocreate.mode

Applies only when streaming changes by using the pgoutput plug-in. Possible settings are all_tables, disabled, and filtered.

all_tables - If a publication exists, the connector uses it. If a publication does not exist, the connector creates a publication for all tables in the database for which the connector is capturing changes. For the connector to create a publication it must access the database through a database user account that has permission to create publications and perform replications. You can create the publication using following SQL command: CREATE PUBLICATION <publication_name> FOR ALL TABLES;.

disabled - The connector does not attempt to create a publication. A database administrator or the user configured to perform replications must have created the publication before running the connector. If the connector cannot find the publication, the connector throws an exception and stops.

filtered - If a publication exists, the connector uses it. If no publication exists, the connector creates a new publication for tables that match the current filter configuration as specified by the table.include.list, and table.exclude.list connector configuration properties. For example: CREATE PUBLICATION <publication_name> FOR TABLE <tbl1, tbl2, tbl3>. If the publication exists, the connector updates the publication for tables that match the current filter configuration. For example: ALTER PUBLICATION <publication_name> SET TABLE <tbl1, tbl2, tbl3>. For the connector to alter a publication it must access the database through a database user account that has ownership of the publication and the tables it is capturing.

  • Type: string
  • Default: all_tables
  • Valid Values: all_tables, disabled, filtered
  • Importance: medium
signal.data.collection

Fully-qualified name of the data collection that needs to be used to send signals to the connector. Use the following format to specify the fully-qualified collection name: schemaName.tableName

  • Type: string
  • Importance: medium

Connector config

snapshot.mode

Specifies the criteria for running a snapshot upon startup of the connector. Possible settings are: initial, never, and initial_only.

initial - The connector performs a snapshot only when no offsets have been recorded for the logical server name.

never - The connector never performs snapshots. When a connector is configured this way, its behavior when it starts is as follows. If there is a previously stored LSN in the Kafka offsets topic, the connector continues streaming changes from that position. If no LSN has been stored, the connector starts streaming changes from the starting position available in the replication slot. The never snapshot mode is useful only when you know all data of interest is still reflected in the WAL.

initial_only - The connector performs an initial snapshot and then stops, without processing any subsequent changes.

  • Type: string
  • Default: initial
  • Valid Values: initial, initial_only, never
  • Importance: medium
table.include.list

An optional, comma-separated list of regular expressions that match fully-qualified table identifiers for tables whose changes you want to capture. When this property is set, the connector captures changes only from the specified tables. Each identifier is of the form schemaName.tableName. By default, the connector captures changes in every non-system table in each schema whose changes are being captured.

To match the name of a table, Debezium applies the regular expression that you specify as an anchored regular expression. That is, the specified expression is matched against the entire identifier for the table; it does not match substrings that might be present in a table name.

If you include this property in the configuration, do not also set the table.exclude.list property.

  • Type: list
  • Importance: medium
table.exclude.list

An optional, comma-separated list of regular expressions that match fully-qualified table identifiers for tables whose changes you do not want to capture. Each identifier is of the form schemaName.tableName. When this property is set, the connector captures changes from every table that you do not specify.

To match the name of a table, Debezium applies the regular expression that you specify as an anchored regular expression. That is, the specified expression is matched against the entire identifier for the table; it does not match substrings that might be present in a table name.

If you include this property in the configuration, do not set the table.include.list property.

  • Type: list
  • Importance: medium
event.processing.failure.handling.mode

Specifies how the connector should react to exceptions during processing of events. Possible settings are: fail, skip, and warn.

fail propagates the exception, indicates the offset of the problematic event, and causes the connector to stop.

warn logs the offset of the problematic event, skips that event, and continues processing.

skip skips the problematic event and continues processing.

  • Type: string
  • Default: fail
  • Valid Values: fail, skip, warn
  • Importance: low
schema.name.adjustment.mode

Specifies how schema names should be adjusted for compatibility with the message converter used by the connector. Possible settings are: none, avro, and avro_unicode.

none does not apply any adjustment.

avro replaces the characters that cannot be used in the Avro type name with underscore.

avro_unicode replaces the underscore or characters that cannot be used in the Avro type name with corresponding unicode like _uxxxx. Note: _ is an escape sequence like backslash in Java.

  • Type: string
  • Default: none
  • Valid Values: avro, avro_unicode, none
  • Importance: medium
field.name.adjustment.mode

Specifies how field names should be adjusted for compatibility with the message converter used by the connector. Possible settings are: none, avro, and avro_unicode.

none does not apply any adjustment.

avro replaces the characters that cannot be used in the Avro type name with underscore.

avro_unicode replaces the underscore or characters that cannot be used in the Avro type name with corresponding unicode like _uxxxx. Note: _ is an escape sequence like backslash in Java.

  • Type: string
  • Default: none
  • Valid Values: avro, avro_unicode, none
  • Importance: medium
heartbeat.interval.ms

Controls how frequently the connector sends heartbeat messages to a Kafka topic. The behavior of default value 0 is that the connector does not send heartbeat messages. Heartbeat messages are useful for monitoring whether the connector is receiving change events from the database. Heartbeat messages might help decrease the number of change events that need to be re-sent when a connector restarts. To send heartbeat messages, set this property to a positive integer, which indicates the number of milliseconds between heartbeat messages.

  • Type: int
  • Default: 0
  • Valid Values: [0,…]
  • Importance: low

Schema Config

schema.context.name

Add a schema context name. A schema context represents an independent scope in Schema Registry. It is a separate sub-schema tied to topics in different Kafka clusters that share the same Schema Registry instance. If not used, the connector uses the default schema configured for Schema Registry in your Confluent Cloud environment.

  • Type: string
  • Default: default
  • Importance: medium
key.converter.reference.subject.name.strategy

Set the subject reference name strategy for key. Valid entries are DefaultReferenceSubjectNameStrategy or QualifiedReferenceSubjectNameStrategy. Note that the subject reference name strategy can be selected only for PROTOBUF format with the default strategy being DefaultReferenceSubjectNameStrategy.

  • Type: string
  • Default: DefaultReferenceSubjectNameStrategy
  • Importance: high
value.converter.reference.subject.name.strategy

Set the subject reference name strategy for value. Valid entries are DefaultReferenceSubjectNameStrategy or QualifiedReferenceSubjectNameStrategy. Note that the subject reference name strategy can be selected only for PROTOBUF format with the default strategy being DefaultReferenceSubjectNameStrategy.

  • Type: string
  • Default: DefaultReferenceSubjectNameStrategy
  • Importance: high

How should we handle data types?

decimal.handling.mode

Specifies how the connector should handle values for DECIMAL and NUMERIC columns. Possible settings are: precise, double, and string.

precise represents values by using java.math.BigDecimal to represent values in binary form in change events. double represents values by using double values, which might result in a loss of precision but which is easier to use. string encodes values as formatted strings, which are easy to consume but semantic information about the real type is lost.

  • Type: string
  • Default: precise
  • Valid Values: double, precise, string
  • Importance: medium
time.precision.mode

Time, date, and timestamps can be represented with different kinds of precisions:

adaptive captures the time and timestamp values exactly as in the database using either millisecond, microsecond, or nanosecond precision values based on the database column’s type.

adaptive_time_microseconds captures the date, datetime and timestamp values exactly as in the database using either millisecond, microsecond, or nanosecond precision values based on the database column’s type. An exception is TIME type fields, which are always captured as microseconds.

connect always represents time and timestamp values by using Kafka Connect’s built-in representations for Time, Date, and Timestamp, which use millisecond precision regardless of the database columns’ precision.

  • Type: string
  • Default: adaptive
  • Valid Values: adaptive, adaptive_time_microseconds, connect
  • Importance: medium

Number of tasks for this connector

tasks.max

Maximum number of tasks for the connector.

  • Type: int
  • Valid Values: [1,…,1]
  • Importance: high

Next Steps

For an example that shows fully-managed Confluent Cloud connectors in action with Confluent Cloud ksqlDB, see the Cloud ETL Demo. This example also shows how to use Confluent CLI to manage your resources in Confluent Cloud.

../../_images/topology.png