PostgreSQL Source Connector (Debezium) Configuration Properties

You can configure the Postgres Source connector using a variety of configuration properties.

Note

These are properties for the self-managed connector. If you are using Confluent Cloud, see see PostgreSQL Source connector for Confluent Cloud.

Required configuration properties

name
Unique name for the connector. Trying to register again with the same name will fail.
connector.class
The name of the Java class for the connector. You must use a value of io.debezium.connector.postgresql.PostgresConnector for the PostgreSQL connector.
tasks.max

The maximum number of tasks that should be created for this connector. The connector always uses a single task and so does not use this value– the default is always acceptable.

  • Type: int
  • Default: 1
plugin.name

The name of the Postgres logical decoding plugin installed on the server. Supported values are either decoderbufs, wal2json or wal2json_rds. There are two new options supported since 0.8.0.Beta1: wal2json_streaming and wal2json_rds_streaming. When the processed transactions are very large it is possible that the JSON batch event with all changes in the transaction will not fit into the hard-coded memory buffer of size 1 GB. In such cases it is possible to switch to so-called streaming mode when every change in transactions is sent as a separate message from PostgreSQL into Debezium.

  • Type: string
  • Importance: medium
  • Default: decoderbufs
slot.name

The name of the Postgres logical decoding slot created for streaming changes from a plugin and database instance. Values must conform to Postgres replication slot naming rules which state: “Each replication slot has a name, which can contain lower-case letters, numbers, and the underscore character.”

  • Type: string
  • Importance: medium
  • Default: debezium
slot.drop_on_stop

Indicates to drop or not to drop the logical replication slot when the connector finishes orderly. Should only be set to true in testing or development environments. Dropping the slot allows WAL segments to be discarded by the database. If set to true the connector may not be able to resume from the WAL position where it left off.

  • Type: string
  • Importance: low
  • Default: false
publication.name

The name of the PostgreSQL publication created for streaming changes when using pgoutput. This publication is created at start-up if it does not already exist and it includes all tables. If the publication already exists, either for all tables or configured with a subset of tables, the connector uses the publication as it is defined.

  • Type: string
  • Importance: low
  • Default: dbz_publication
database.hostname

IP address or hostname of the PostgreSQL database server.

  • Type: string
  • Importance: high
database.port

Integer port number of the PostgreSQL database server.

  • Type: int
  • Importance: low
  • Default: 5432
database.user

Username to use when when connecting to the PostgreSQL database server.

  • Type: string
  • Importance: high
database.password

Password to use when when connecting to the PostgreSQL database server.

  • Type: password
  • Importance: high
database.dbname

The name of the PostgreSQL database from which to stream the changes.

  • Type: string
  • Importance: high
database.server.name

Logical name that identifies and provides a namespace for the particular PostgreSQL database server/cluster being monitored. The logical name should be unique across all other connectors, since it is used as a prefix for all Kafka topic names coming from this connector. Defaults to host:_port_/dbname, where host is the value of the database.hostname property, port is the value of the database.port property, and dbname is the value of the database.dbname property. Confluent recommends using a meaningful and logical name for dbname.

  • Type: string
  • Importance: high
  • Default: database.hostname:database.port/database.dbname
schema.include.list

An optional comma-separated list of regular expressions that match schema names to be monitored. Any schema name not included in the whitelist will be excluded from monitoring. By default all non-system schemas are monitored. May not be used with schema.exclude.list.

  • Type: list of strings
  • Importance: low
schema.exclude.list

An optional comma-separated list of regular expressions that match schema names to be excluded from monitoring. Any schema name not included in the exclude list will be monitored, with the exception of system schemas. May not be used with schema.whitelist.

  • Type: list of strings
  • Importance: low
table.include.list

An optional comma-separated list of regular expressions that match fully-qualified table identifiers for tables to be monitored. Any table not included in the whitelist is excluded from monitoring. Each identifier is in the form schemaName.tableName. By default the connector will monitor every non-system table in each monitored schema. May not be used with table.exclude.list.

  • Type: list of strings
  • Importance: low
table.exclude.list

An optional comma-separated list of regular expressions that match fully-qualified table identifiers for tables to be excluded from monitoring. Any table not included in the exclude list is monitored. Each identifier is in the form schemaName.tableName. May not be used with table.whitelist.

  • Type: list of strings
  • Importance: low
column.include.list

An optional, comma-separated list of regular expressions that match the fully-qualified names of columns that should be included in change event record values. Fully-qualified names for columns are of the form schemaName.tableName.columnName. Do not also set the column.exclude.list property.

  • Type: list of strings
  • Importance: low
column.exclude.list

An optional comma-separated list of regular expressions that match the fully-qualified names of columns that should be excluded from change event message values. Fully-qualified names for columns are of the form schemaName.tableName.columnName.

  • Type: list of strings
  • Importance: low
time.precision.mode

Time, date, and timestamps can be represented with different kinds of precision, including:

  • adaptive: Captures the time and timestamp values exactly as they are in the database. adaptive uses either millisecond, microsecond, or nanosecond precision values based on the database column type.
  • adaptive_time_microseconds: Captures the date, datetime and timestamp values exactly as they are in the database.
  • adaptive_time_microseconds: Uses either millisecond, microsecond, or nanosecond precision values based on the database column type, with the exception of TIME type fields, which are always captured as microseconds.
  • connect: Always represents time and timestamp values using Kafka Connect’s built-in representations for Time, Date, and Timestamp. connect uses millisecond precision regardless of database column precision. See temporal values.
  • Type: string
  • Importance: high
  • Default: adaptive
decimal.handling.mode

Specifies how the connector should handle values for DECIMAL and NUMERIC columns:

  • precise: Represents values precisely using java.math.BigDecimal, which are represented in change events in binary form.
  • double: Represents them using double values. double may result in a loss of precision but is easier to use.
  • string option: Encodes values as a formatted string. string option is easy to consume but semantic information about the real type is lost. See Decimal Values.
  • Type: string
  • Importance: high
  • Default: precise
hstore.handling.mode

Specifies how the connector should handle values for hstore columns:

  • map: Represents using MAP.
  • json: Represents them using JSON strings. The JSON option encodes values as formatted strings such as key:val}. See HStore Values.
  • Type: list of strings
  • Importance: low
  • Default: map
interval.handling.mode

Specifies how the connector should handle values for interval columns.

  • Type: string
  • Default: numeric
  • Valid values: [numeric or string]
database.sslmode

Sets whether or not to use an encrypted connection to the PostgreSQL server. Options include:

  • disable: Use an unencrypted connection.
  • require: Use a secure (encrypted) connection. Fails if one cannot be established.
  • verify-ca: Similar to require, but additionally verify the server TLS certificate against the configured Certificate Authority (CA) certificates. Fails if no valid matching CA certificates are found.
  • verify-full: Similar to verify-ca but additionally verify that the server certificate matches the host to which the connection is attempted. See the PostgreSQL documentation for more information.
  • Type: string
  • Importance: low
  • Default: disable
database.sslcert

The path to the file containing the SSL certificate of the client. See the PostgreSQL documentation for more information.

  • Type: string
  • Importance: high
database.sslkey

The path to the file containing the SSL private key of the client. See the PostgreSQL documentation for more information.

  • Type: string
database.sslpassword

The password to access the client private key from the file specified by database.sslkey. See the PostgreSQL documentation for more information.

  • Type: string
  • Importance: low
database.sslrootcert

The path to the file containing the root certificate(s) against which the server is validated. See the PostgreSQL documentation for more information.

  • Type: string
  • Importance: low
database.tcpKeepAlive

Enable TCP keep-alive probe to verify that database connection is still alive. Enabled by default. See the PostgreSQL documentation for more information.

  • Type: string
  • Importance: low
  • Default: true
tombstones.on.delete

Controls whether a tombstone event should be generated after a delete event. When true the delete operations are represented by a delete event and a subsequent tombstone event. When false only a delete event is sent. Emitting a tombstone event (the default behavior) allows Kafka to completely delete all events pertaining to the given key once the source record got deleted.

  • Type: string
  • Importance: high
  • Default: true
column.truncate.to._length_.chars

An optional, comma-separated list of regular expressions that match the fully-qualified names of character-based columns. Fully-qualified names for columns are of the form schemaName.tableName.columnName. In change event records, values in these columns are truncated if they are longer than the number of characters specified by length in the property name. You can specify multiple properties with different lengths in a single configuration. Length must be a positive integer, for example, +column.truncate.to.20.chars.

  • Type: list of strings
column.mask.with._length_.chars

An optional, comma-separated list of regular expressions that match the fully-qualified names of character-based columns. Fully-qualified names for columns are of the form schemaName.tableName.columnName. In change event values, the values in the specified table columns are replaced with length number of asterisk (*) characters. You can specify multiple properties with different lengths in a single configuration. Length must be a positive integer or zero. When you specify zero, the connector replaces a value with an empty string.

  • Type: list of strings
column.mask.hash.hashAlgorithm.with.salt.salt; column.mask.hash.v2.hashAlgorithm.with.salt.salt

An optional, comma-separated list of regular expressions that match the fully-qualified names of character-based columns. Fully-qualified names for columns are of the form <schemaName>.<tableName>.<columnName>. In the resulting change event record, the values for the specified columns are replaced with pseudonyms, which consists of the hashed value that results from applying the specified hashAlgorithm and salt. Based on the hash function that is used, referential integrity is maintained, while column values are replaced with pseudonyms. Supported hash functions are described in the MessageDigest documentation.

  • Type: list of strings
column.propagate.source.type

An optional, comma-separated list of regular expressions that match the database-specific data type name for some columns. Fully-qualified data type names are of the form databaseName.tableName.typeName, or databaseName.schemaName.tableName.typeName.

For these data types, the connector adds parameters to the corresponding field schemas in emitted change records. The added parameters specify the original type and length of the column: __debezium.source.column.type + __debezium.source.column.length + __debezium.source.column.scale.

message.key.columns

A list of expressions that specify the columns that the connector uses to form custom message keys for change event records that it publishes to the Kafka topics for specified tables.

By default, Debezium uses the primary key column of a table as the message key for records that it emits. In place of the default, or to specify a key for tables that lack a primary key, you can configure custom message keys based on one or more columns.

To establish a custom message key for a table, list the table, followed by the columns to use as the message key. Each list entry takes the following format:

<fully-qualified_tableName>:_<keyColumn>_,<keyColumn>

To base a table key on multiple column names, insert commas between the column names.

Each fully-qualified table name is a regular expression in the following format:

<databaseName>.<tableName>

The property can include entries for multiple tables. Use a semicolon to separate table entries in the list. The following example sets the message key for the tables inventory.customers and purchase.orders:

inventory.customers:pk1,pk2;(.*).purchaseorders:pk3,pk4

For the table inventory.customer, the columns pk1 and pk2 are specified as the message key. For the purchaseorders tables in any database, the columns pk3 and pk4 server as the message key.

There is no limit to the number of columns that you use to create custom message keys. However, it’s best to use the minimum number that are required to specify a unique key.

  • Type: list
  • Default: n/a
publication.autocreate.mode

Applies only when streaming changes by using the pgoutput plug-in. The setting determines how creation of a publication should work.

  • Default: all_tables
  • Valid values: [all_tables, disabled, filtered]
binary.handling.mode

Specifies how binary (bytea) columns should be represented in change events.

  • Type: bytes or string
  • Importance: low
  • Valid values: [bytes, base4, hex]
money.fraction.digits

Specifies how many decimal digits should be used when converting Postgres money type to java.math.BigDecimal, which represents the values in change events. Applicable only when decimal.handling.mode is set to precise.

  • Type: int
  • Default: 2
message.prefix.include.list
An optional comma-separated list of regular expressions that match names of logical decoding message prefixes for which you want to capture. Any logical decoding message with a prefix not included in message.prefix.include.list is excluded. By default, all logical decoding messages are captured. Do not also set the message.prefix.exclude.list property.
message.prefix.exclude.list
An optional comma-separated list of regular expressions that match names of logical decoding message prefixes for which you do not to capture. Any logical decoding message with a prefix that is not included in message.prefix.exclude.list is included. Do not also set the message.prefix.include.list property. To exclude all logical decoding messages pass .* into this config.

Advanced configuration properties

snapshot.mode

The criteria for running a snapshot upon startup of the connector.

  • Type: string
  • Importance: medium
  • Default: initial
  • Valid values: [always, initial, initial_only never, custom]
snapshot.custom.class
A full Java class name that is an implementation of the io.debezium.connector.postgresql.spi.Snapshotter interface. Required when the snapshot.mode property is set to custom.
snapshot.include.collection.list

A optional comma-separated list of regular expressions that match the fully-qualified names (<schemaName>.<tableName>) of the tables to include in a snapshot. The specified items must be named in the connector’s table.include.list property. This property takes effect only if the connector’s snapshot.mode property is set to a value other than never.

  • Default: All tables specified in table.include.list
snapshot.lock.timeout.ms

Positive integer value that specifies the maximum amount of time (in milliseconds) to wait to obtain table locks when performing a snapshot. If table locks cannot be acquired in this time interval, the snapshot will fail.

  • Type: string
  • Importance: low
  • Default: 10000
snapshot.select.statement.overrides

Controls which rows from tables will be included in snapshot. This property contains a comma-separated list of fully-qualified tables (DB_NAME.TABLE_NAME). Select statements for the individual tables are specified in additional configuration properties, one for each table, identified by the id

snapshot.select.statement.overrides.[DB_NAME].[TABLE_NAME]. The value of these properties is the SELECT statement to use when retrieving data from the specific table during the snapshot process. A possible use case for large append-only tables is setting a specific point where to start (resume) the snapshot process, in case a previous snapshot process was interrupted.

Note: This setting has impact on snapshots only. Events generated by logical decoder are not affected by it at all.

  • Type: list of strings
  • Importance: low
event.processing.failure.handling.mode

Specifies how the connector should react to exceptions during processing of events.

  • Default: fail
  • Valid values: [fail, warn, skip]
max.queue.size

Positive integer value that specifies the maximum size of the blocking queue into which change events received via streaming replication are placed before they are written to Kafka. This queue can provide backpressure when, for example, writes to Kafka are slower or if Kafka is not available.

  • Type: int
  • Importance: low
  • Default: 20240
max.batch.size

Positive integer value that specifies the maximum size of each batch of events that should be processed during each iteration of this connector.

  • Type: int
  • Importance: low
  • Default: 2048
max.queue.size.in.bytes

Long value for the maximum size in bytes of the blocking queue. The feature is disabled by default, it will be active if it’s set with a positive long value.

  • Type: long
  • Default: 0
poll.interval.ms

Positive integer value that specifies the number of milliseconds the connector should wait during each iteration for new change events to appear. Defaults to 500 milliseconds.

  • Type: int
  • Importance: low
  • Default: 1000
include.unknown.datatypes

When Debezium encounters a field whose data type is unknown, the field is omitted from the change event and a warning is logged (the default). In some cases it may be preferable to include the field and send it downstream to clients in an opaque binary representation so the clients can decode it. Set to false to filter unknown data from events and true to keep them in binary format.

Note: Clients risk backward compatibility issues with this setting. Not only may the database-specific binary representation change between releases, but when the datatype is eventually supported, it will be sent downstream as a logical type, requiring adjustments by consumers. In general, when encountering unsupported data types, please file a feature request so that support can be added.

  • Type: boolean
  • Importance: low
  • Default: false
database.initial.statements

A semicolon separated list of SQL statements to be executed when a JDBC connection (not the transaction log reading connection) to the database is established. Use a double semicolon (;;) to use a semicolon as a character and not as a delimiter.

Note: The connector may establish JDBC connections at its own discretion. This setting is typically only used for configuring session parameters only. It should not be used for executing DML statements.

  • Type: list of strings separated
  • Importance: low
+status.update.interval.ms

Frequency for sending replication connection status updates to the server, given in milliseconds. The property also controls how frequently the database status is checked to detect a dead connection in case the database was shut down.

  • Type: int
  • Default: 10000
heartbeat.interval.ms

Controls how frequently heartbeat messages are sent. This property (which is disabled by default) contains an interval in milliseconds that defines how frequently the connector sends messages to a heartbeat topic. This can be used to monitor whether the connector is still receiving change events from the database. You also should use heartbeat messages when records in non-captured tables are changed for a longer period of time. In this case, the connector proceeds to read the log from the database but never emits any change messages into Kafka. This means that no offset updates are committed to Kafka.

This causes WAL files to be retained by the database longer than needed because the connector processed the files already but did not flush the latest retrieved Log Sequence Number (LSN) to the database. Using heartbeat messages may also result in more re-sent change events after a connector restart. Set this parameter to 0 to not send heartbeat messages.

  • Type: int
  • Importance: low
  • Default: 0
heartbeat.topics.prefix

Sets the name of the topic to which heartbeat messages are sent. The topic is named according to the pattern <heartbeat.topics.prefix>.<server.name>.

  • Type: string
  • Importance: low
  • Default: __debezium-heartbeat
heartbeat.action.query
Specifies a query that the connector executes on the source database when the connector sends a heartbeat message.
schema.refresh.mode

Specify the conditions that trigger a refresh of the in-memory schema for a table. columns_diff (the default) is the safest mode. This setting ensures the in-memory schema stays in-sync with the database table schema.

columns_diff_exclude_unchanged_toast instructs the connector to refresh the in-memory schema cache if there is a discrepancy between it and the schema derived from the incoming message, unless unchanged TOASTable data fully accounts for the discrepancy.

This setting can improve connector performance significantly if there are frequent table updates for tables that have TOASTed data which are rarely part of the updates. However, it is possible for the in-memory schema to become outdated if TOASTable columns are dropped from the table.

  • Type: list of strings separated
  • Importance: low
  • Default: columns_diff
snapshot.delay.ms

An interval in milliseconds that the connector should wait before taking a snapshot after starting up. This setting can be used to avoid snapshot interruptions when starting multiple connectors in a cluster, which can cause connector re-balancing.

  • Type: int
  • Importance: low
snapshot.fetch.size

During a snapshot, the connector reads table content in batches of rows. This property specifies the maximum number of rows in a batch.

  • Type: int
  • Default: 10240
slot.stream.params

Optional list of parameters to be passed to the configured logical decoding plugin. This optional list can be used to enable server-side table filtering when using the wal2json plugin. Allowed values depend on the chosen plugin and are separated by semicolon (for example, add-tables=`public.table,public.table2;include-lsn=true.

  • Type: int
  • Importance: low
sanitize.field.names
Indicates whether field names are sanitized to adhere to Avro naming requirements. true if connector configuration sets the key.converter or value.converter property to the Avro converter. false if not.
slot.max.retries

If connecting to a replication slot fails, this is the maximum number of consecutive attempts to connect.

  • Type: int
  • Default: 6
slot.rety.delay.ms

The number of milliseconds to wait between retry attempts when the connector fails to connect to a replication slot.

  • Type: int
  • Default: 10000 (10 seconds)
unavailable.value.placeholder

Specifies the constant that the connector provides to indicate that the original value is a toasted value that is not provided by the database. If the setting of unavailable.value.placeholder starts with the hex: prefix it is expected that the rest of the string represents hexadecimally encoded octets.

  • Default: __debezium_unavailable_value
provide.transaction.metadata

Determines whether the connector generates events with transaction boundaries and enriches change event envelopes with transaction metadata. Specify true if you want the connector to do this.

  • Type: boolean
  • Default: false
transaction.topic

Controls the name of the topic to which the connector sends transaction metadata messages. The placeholder ${database.server.name} can be used for referring to the connector’s logical name.

  • Default: ${database.server.name}.transaction
retriable.restart.connector.wait.ms

The number of milliseconds to wait before restarting a connector after a retriable error occurs.

  • Type: int
  • Default: 10000
skipped.operations

A comma-separated list of operation types that will be skipped during streaming.

  • Type: int
  • Default: t
  • Value values: [c, d, u, t, none]
signal.data.collection
Fully-qualified name of the data collection that is used to send signals to the connector. Specify the collection name using the following format: <schemaName>.<tableName>.
incremental.snapshot.chunk.size

The maximum number of rows that the connector fetches and reads into memory during an incremental snapshot chunk.

  • Type: int
  • Default: 1024

Auto topic creation

For more information about Auto topic creation, see Configuring Auto Topic Creation for Source Connectors.

Note

Configuration properties accept regular expressions (regex) that are defined as Java regex.

topic.creation.groups

A list of group aliases that are used to define per-group topic configurations for matching topics. A default group always exists and matches all topics.

  • Type: List of String types
  • Default: empty
  • Possible Values: The values of this property refer to any additional groups. A default group is always defined for topic configurations.
topic.creation.$alias.replication.factor

The replication factor for new topics created by the connector. This value must not be larger than the number of brokers in the Kafka cluster. If this value is larger than the number of Kafka brokers, an error occurs when the connector attempts to create a topic. This is a required property for the default group. This property is optional for any other group defined in topic.creation.groups. Other groups use the Kafka broker default value.

  • Type: int
  • Default: n/a
  • Possible Values: >= 1 for a specific valid value or -1 to use the Kafka broker’s default value.
topic.creation.$alias.partitions

The number of topic partitions created by this connector. This is a required property for the default group. This property is optional for any other group defined in topic.creation.groups. Other groups use the Kafka broker default value.

  • Type: int
  • Default: n/a
  • Possible Values: >= 1 for a specific valid value or -1 to use the Kafka broker’s default value.
topic.creation.$alias.include

A list of strings that represent regular expressions that match topic names. This list is used to include topics with matching values, and apply this group’s specific configuration to the matching topics. $alias applies to any group defined in topic.creation.groups. This property does not apply to the default group.

  • Type: List of String types
  • Default: empty
  • Possible Values: Comma-separated list of exact topic names or regular expressions.
topic.creation.$alias.exclude

A list of strings representing regular expressions that match topic names. This list is used to exclude topics with matching values from getting the group’s specfic configuration. $alias applies to any group defined in topic.creation.groups. This property does not apply to the default group. Note that exclusion rules override any inclusion rules for topics.

  • Type: List of String types
  • Default: empty
  • Possible Values: Comma-separated list of exact topic names or regular expressions.
topic.creation.$alias.${kafkaTopicSpecificConfigName}

Any of the Changing Broker Configurations Dynamically for the version of the Kafka broker where the records will be written. The broker’s topic-level configuration value is used if the configuration is not specified for the rule. $alias applies to the default group as well as any group defined in topic.creation.groups.

  • Type: property values
  • Default: Kafka broker value

For more detail, see the Debezium connector properties documentation.

Note

Portions of the information provided here derives from documentation originally produced by the Debezium Community. Work produced by Debezium is licensed under Creative Commons 3.0.