PostgreSQL Source Connector (Debezium) Configuration Properties¶
You can configure the Postgres Source connector using a variety of configuration properties.
Note
These are properties for the self-managed connector. If you are using Confluent Cloud, see see PostgreSQL Source connector for Confluent Cloud.
Required configuration properties¶
name
- Unique name for the connector. Trying to register again with the same name will fail.
connector.class
- The name of the Java class for the connector. You must use a value of
io.debezium.connector.postgresql.PostgresConnector
for the PostgreSQL connector. tasks.max
The maximum number of tasks that should be created for this connector. The connector always uses a single task and so does not use this value– the default is always acceptable.
- Type: int
- Default: 1
plugin.name
The name of the Postgres logical decoding plugin installed on the server. Supported values are either
decoderbufs
,wal2json
orwal2json_rds
. There are two new options supported since 0.8.0.Beta1:wal2json_streaming
andwal2json_rds_streaming
. When the processed transactions are very large it is possible that the JSON batch event with all changes in the transaction will not fit into the hard-coded memory buffer of size 1 GB. In such cases it is possible to switch to so-called streaming mode when every change in transactions is sent as a separate message from PostgreSQL into Debezium.- Type: string
- Importance: medium
- Default:
decoderbufs
slot.name
The name of the Postgres logical decoding slot created for streaming changes from a plugin and database instance. Values must conform to Postgres replication slot naming rules which state: “Each replication slot has a name, which can contain lower-case letters, numbers, and the underscore character.”
- Type: string
- Importance: medium
- Default:
debezium
slot.drop_on_stop
Indicates to drop or not to drop the logical replication slot when the connector finishes orderly. Should only be set to
true
in testing or development environments. Dropping the slot allows WAL segments to be discarded by the database. If set totrue
the connector may not be able to resume from the WAL position where it left off.- Type: string
- Importance: low
- Default:
false
publication.name
The name of the PostgreSQL publication created for streaming changes when using
pgoutput
. This publication is created at start-up if it does not already exist and it includes all tables. If the publication already exists, either for all tables or configured with a subset of tables, the connector uses the publication as it is defined.- Type: string
- Importance: low
- Default:
dbz_publication
database.hostname
IP address or hostname of the PostgreSQL database server.
- Type: string
- Importance: high
database.port
Integer port number of the PostgreSQL database server.
- Type: int
- Importance: low
- Default:
5432
database.user
Username to use when when connecting to the PostgreSQL database server.
- Type: string
- Importance: high
database.password
Password to use when when connecting to the PostgreSQL database server.
- Type: password
- Importance: high
database.dbname
The name of the PostgreSQL database from which to stream the changes.
- Type: string
- Importance: high
database.server.name
Logical name that identifies and provides a namespace for the particular PostgreSQL database server/cluster being monitored. The logical name should be unique across all other connectors, since it is used as a prefix for all Kafka topic names coming from this connector. Defaults to
host:_port_/dbname
, where host is the value of thedatabase.hostname property
, port is the value of thedatabase.port property
, anddbname
is the value of thedatabase.dbname property
. Confluent recommends using a meaningful and logical name fordbname
.- Type: string
- Importance: high
- Default:
database.hostname:database.port/database.dbname
schema.include.list
An optional comma-separated list of regular expressions that match schema names to be monitored. Any schema name not included in the whitelist will be excluded from monitoring. By default all non-system schemas are monitored. May not be used with
schema.exclude.list
.- Type: list of strings
- Importance: low
schema.exclude.list
An optional comma-separated list of regular expressions that match schema names to be excluded from monitoring. Any schema name not included in the exclude list will be monitored, with the exception of system schemas. May not be used with
schema.whitelist
.- Type: list of strings
- Importance: low
table.include.list
An optional comma-separated list of regular expressions that match fully-qualified table identifiers for tables to be monitored. Any table not included in the whitelist is excluded from monitoring. Each identifier is in the form
schemaName.tableName
. By default the connector will monitor every non-system table in each monitored schema. May not be used withtable.exclude.list
.- Type: list of strings
- Importance: low
table.exclude.list
An optional comma-separated list of regular expressions that match fully-qualified table identifiers for tables to be excluded from monitoring. Any table not included in the exclude list is monitored. Each identifier is in the form
schemaName.tableName
. May not be used withtable.whitelist
.- Type: list of strings
- Importance: low
column.include.list
An optional, comma-separated list of regular expressions that match the fully-qualified names of columns that should be included in change event record values. Fully-qualified names for columns are of the form
schemaName.tableName.columnName
. Do not also set thecolumn.exclude.list
property.- Type: list of strings
- Importance: low
column.exclude.list
An optional comma-separated list of regular expressions that match the fully-qualified names of columns that should be excluded from change event message values. Fully-qualified names for columns are of the form
schemaName.tableName.columnName
.- Type: list of strings
- Importance: low
time.precision.mode
Time, date, and timestamps can be represented with different kinds of precision, including:
adaptive
: Captures the time and timestamp values exactly as they are in the database.adaptive
uses either millisecond, microsecond, or nanosecond precision values based on the database column type.adaptive_time_microseconds
: Captures the date, datetime and timestamp values exactly as they are in the database.adaptive_time_microseconds
: Uses either millisecond, microsecond, or nanosecond precision values based on the database column type, with the exception ofTIME
type fields, which are always captured as microseconds.connect
: Always represents time and timestamp values using Kafka Connect’s built-in representations for Time, Date, and Timestamp.connect
uses millisecond precision regardless of database column precision. See temporal values.
- Type: string
- Importance: high
- Default:
adaptive
decimal.handling.mode
Specifies how the connector should handle values for
DECIMAL
andNUMERIC
columns:precise
: Represents values precisely usingjava.math.BigDecimal
, which are represented in change events in binary form.double
: Represents them using double values.double
may result in a loss of precision but is easier to use.string option
: Encodes values as a formatted string.string option
is easy to consume but semantic information about the real type is lost. See Decimal Values.
- Type: string
- Importance: high
- Default:
precise
hstore.handling.mode
Specifies how the connector should handle values for hstore columns:
map
: Represents using MAP.json
: Represents them using JSON strings. The JSON option encodes values as formatted strings such askey
:val
}. See HStore Values.
- Type: list of strings
- Importance: low
- Default:
map
interval.handling.mode
Specifies how the connector should handle values for interval columns.
- Type: string
- Default:
numeric
- Valid values: [
numeric
orstring
]
database.sslmode
Sets whether or not to use an encrypted connection to the PostgreSQL server. Options include:
disable
: Use an unencrypted connection.require
: Use a secure (encrypted) connection. Fails if one cannot be established.verify-ca
: Similar torequire
, but additionally verify the server TLS certificate against the configured Certificate Authority (CA) certificates. Fails if no valid matching CA certificates are found.verify-full
: Similar toverify-ca
but additionally verify that the server certificate matches the host to which the connection is attempted. See the PostgreSQL documentation for more information.
- Type: string
- Importance: low
- Default:
disable
database.sslcert
The path to the file containing the SSL certificate of the client. See the PostgreSQL documentation for more information.
- Type: string
- Importance: high
database.sslkey
The path to the file containing the SSL private key of the client. See the PostgreSQL documentation for more information.
- Type: string
database.sslpassword
The password to access the client private key from the file specified by
database.sslkey
. See the PostgreSQL documentation for more information.- Type: string
- Importance: low
database.sslrootcert
The path to the file containing the root certificate(s) against which the server is validated. See the PostgreSQL documentation for more information.
- Type: string
- Importance: low
database.tcpKeepAlive
Enable TCP keep-alive probe to verify that database connection is still alive. Enabled by default. See the PostgreSQL documentation for more information.
- Type: string
- Importance: low
- Default:
true
tombstones.on.delete
Controls whether a tombstone event should be generated after a delete event. When
true
the delete operations are represented by a delete event and a subsequent tombstone event. Whenfalse
only a delete event is sent. Emitting a tombstone event (the default behavior) allows Kafka to completely delete all events pertaining to the given key once the source record got deleted.- Type: string
- Importance: high
- Default:
true
column.truncate.to._length_.chars
An optional, comma-separated list of regular expressions that match the fully-qualified names of character-based columns. Fully-qualified names for columns are of the form schemaName.tableName.columnName. In change event records, values in these columns are truncated if they are longer than the number of characters specified by length in the property name. You can specify multiple properties with different lengths in a single configuration. Length must be a positive integer, for example,
+column.truncate.to.20.chars.
- Type: list of strings
column.mask.with._length_.chars
An optional, comma-separated list of regular expressions that match the fully-qualified names of character-based columns. Fully-qualified names for columns are of the form
schemaName.tableName.columnName
. In change event values, the values in the specified table columns are replaced with length number of asterisk(*)
characters. You can specify multiple properties with different lengths in a single configuration. Length must be a positive integer or zero. When you specify zero, the connector replaces a value with an empty string.- Type: list of strings
column.mask.hash.hashAlgorithm.with.salt.salt; column.mask.hash.v2.hashAlgorithm.with.salt.salt
An optional, comma-separated list of regular expressions that match the fully-qualified names of character-based columns. Fully-qualified names for columns are of the form
<schemaName>.<tableName>.<columnName>
. In the resulting change event record, the values for the specified columns are replaced with pseudonyms, which consists of the hashed value that results from applying the specifiedhashAlgorithm
andsalt
. Based on the hash function that is used, referential integrity is maintained, while column values are replaced with pseudonyms. Supported hash functions are described in the MessageDigest documentation.- Type: list of strings
column.propagate.source.type
An optional, comma-separated list of regular expressions that match the database-specific data type name for some columns. Fully-qualified data type names are of the form
databaseName.tableName.typeName
, ordatabaseName.schemaName.tableName.typeName
.For these data types, the connector adds parameters to the corresponding field schemas in emitted change records. The added parameters specify the original type and length of the column:
__debezium.source.column.type + __debezium.source.column.length + __debezium.source.column.scale
.message.key.columns
A list of expressions that specify the columns that the connector uses to form custom message keys for change event records that it publishes to the Kafka topics for specified tables.
By default, Debezium uses the primary key column of a table as the message key for records that it emits. In place of the default, or to specify a key for tables that lack a primary key, you can configure custom message keys based on one or more columns.
To establish a custom message key for a table, list the table, followed by the columns to use as the message key. Each list entry takes the following format:
<fully-qualified_tableName>:_<keyColumn>_,<keyColumn>
To base a table key on multiple column names, insert commas between the column names.
Each fully-qualified table name is a regular expression in the following format:
<databaseName>.<tableName>
The property can include entries for multiple tables. Use a semicolon to separate table entries in the list. The following example sets the message key for the tables
inventory.customers
andpurchase.orders
:inventory.customers:pk1,pk2;(.*).purchaseorders:pk3,pk4
For the table inventory.customer, the columns pk1 and pk2 are specified as the message key. For the
purchaseorders
tables in any database, the columnspk3
andpk4
server as the message key.There is no limit to the number of columns that you use to create custom message keys. However, it’s best to use the minimum number that are required to specify a unique key.
- Type: list
- Default: n/a
publication.autocreate.mode
Applies only when streaming changes by using the pgoutput plug-in. The setting determines how creation of a publication should work.
- Default:
all_tables
- Valid values: [
all_tables
,disabled
,filtered
]
- Default:
binary.handling.mode
Specifies how binary (bytea) columns should be represented in change events.
- Type: bytes or string
- Importance: low
- Valid values: [
bytes
,base4
,hex
]
money.fraction.digits
Specifies how many decimal digits should be used when converting Postgres money type to
java.math.BigDecimal
, which represents the values in change events. Applicable only whendecimal.handling.mode
is set toprecise
.- Type: int
- Default: 2
message.prefix.include.list
- An optional comma-separated list of regular expressions that match names of
logical decoding message prefixes for which you want to capture. Any logical
decoding message with a prefix not included in
message.prefix.include.list
is excluded. By default, all logical decoding messages are captured. Do not also set themessage.prefix.exclude.list
property. message.prefix.exclude.list
- An optional comma-separated list of regular expressions that match names of
logical decoding message prefixes for which you do not to capture. Any logical
decoding message with a prefix that is not included in
message.prefix.exclude.list
is included. Do not also set the message.prefix.include.list property. To exclude all logical decoding messages pass.*
into this config.
Advanced configuration properties¶
snapshot.mode
The criteria for running a snapshot upon startup of the connector.
- Type: string
- Importance: medium
- Default:
initial
- Valid values: [
always
,initial
,initial_only
never
,custom
]
snapshot.custom.class
- A full Java class name that is an implementation of the
io.debezium.connector.postgresql.spi.Snapshotter interface
. Required when thesnapshot.mode
property is set tocustom
. snapshot.include.collection.list
A optional comma-separated list of regular expressions that match the fully-qualified names (
<schemaName>.<tableName>
) of the tables to include in a snapshot. The specified items must be named in the connector’stable.include.list property
. This property takes effect only if the connector’ssnapshot.mode
property is set to a value other thannever
.- Default: All tables specified in
table.include.list
- Default: All tables specified in
snapshot.lock.timeout.ms
Positive integer value that specifies the maximum amount of time (in milliseconds) to wait to obtain table locks when performing a snapshot. If table locks cannot be acquired in this time interval, the snapshot will fail.
- Type: string
- Importance: low
- Default: 10000
snapshot.select.statement.overrides
Controls which rows from tables will be included in snapshot. This property contains a comma-separated list of fully-qualified tables (
DB_NAME.TABLE_NAME
). Select statements for the individual tables are specified in additional configuration properties, one for each table, identified by the idsnapshot.select.statement.overrides.[DB_NAME].[TABLE_NAME]
. The value of these properties is the SELECT statement to use when retrieving data from the specific table during the snapshot process. A possible use case for large append-only tables is setting a specific point where to start (resume) the snapshot process, in case a previous snapshot process was interrupted.Note: This setting has impact on snapshots only. Events generated by logical decoder are not affected by it at all.
- Type: list of strings
- Importance: low
event.processing.failure.handling.mode
Specifies how the connector should react to exceptions during processing of events.
- Default:
fail
- Valid values: [
fail
,warn
,skip
]
- Default:
max.queue.size
Positive integer value that specifies the maximum size of the blocking queue into which change events received via streaming replication are placed before they are written to Kafka. This queue can provide backpressure when, for example, writes to Kafka are slower or if Kafka is not available.
- Type: int
- Importance: low
- Default: 20240
max.batch.size
Positive integer value that specifies the maximum size of each batch of events that should be processed during each iteration of this connector.
- Type: int
- Importance: low
- Default: 2048
max.queue.size.in.bytes
Long value for the maximum size in bytes of the blocking queue. The feature is disabled by default, it will be active if it’s set with a positive long value.
- Type: long
- Default: 0
poll.interval.ms
Positive integer value that specifies the number of milliseconds the connector should wait during each iteration for new change events to appear. Defaults to
500
milliseconds.- Type: int
- Importance: low
- Default: 1000
include.unknown.datatypes
When Debezium encounters a field whose data type is unknown, the field is omitted from the change event and a warning is logged (the default). In some cases it may be preferable to include the field and send it downstream to clients in an opaque binary representation so the clients can decode it. Set to
false
to filter unknown data from events andtrue
to keep them in binary format.Note: Clients risk backward compatibility issues with this setting. Not only may the database-specific binary representation change between releases, but when the datatype is eventually supported, it will be sent downstream as a logical type, requiring adjustments by consumers. In general, when encountering unsupported data types, please file a feature request so that support can be added.
- Type: boolean
- Importance: low
- Default:
false
database.initial.statements
A semicolon separated list of SQL statements to be executed when a JDBC connection (not the transaction log reading connection) to the database is established. Use a double semicolon (
;;
) to use a semicolon as a character and not as a delimiter.Note: The connector may establish JDBC connections at its own discretion. This setting is typically only used for configuring session parameters only. It should not be used for executing DML statements.
- Type: list of strings separated
- Importance: low
+status.update.interval.ms
Frequency for sending replication connection status updates to the server, given in milliseconds. The property also controls how frequently the database status is checked to detect a dead connection in case the database was shut down.
- Type: int
- Default: 10000
heartbeat.interval.ms
Controls how frequently heartbeat messages are sent. This property (which is disabled by default) contains an interval in milliseconds that defines how frequently the connector sends messages to a heartbeat topic. This can be used to monitor whether the connector is still receiving change events from the database. You also should use heartbeat messages when records in non-captured tables are changed for a longer period of time. In this case, the connector proceeds to read the log from the database but never emits any change messages into Kafka. This means that no offset updates are committed to Kafka.
This causes WAL files to be retained by the database longer than needed because the connector processed the files already but did not flush the latest retrieved Log Sequence Number (LSN) to the database. Using heartbeat messages may also result in more re-sent change events after a connector restart. Set this parameter to
0
to not send heartbeat messages.- Type: int
- Importance: low
- Default: 0
heartbeat.topics.prefix
Sets the name of the topic to which heartbeat messages are sent. The topic is named according to the pattern
<heartbeat.topics.prefix>.<server.name>
.- Type: string
- Importance: low
- Default:
__debezium-heartbeat
heartbeat.action.query
- Specifies a query that the connector executes on the source database when the connector sends a heartbeat message.
schema.refresh.mode
Specify the conditions that trigger a refresh of the in-memory schema for a table.
columns_diff
(the default) is the safest mode. This setting ensures the in-memory schema stays in-sync with the database table schema.columns_diff_exclude_unchanged_toast
instructs the connector to refresh the in-memory schema cache if there is a discrepancy between it and the schema derived from the incoming message, unless unchanged TOASTable data fully accounts for the discrepancy.This setting can improve connector performance significantly if there are frequent table updates for tables that have TOASTed data which are rarely part of the updates. However, it is possible for the in-memory schema to become outdated if TOASTable columns are dropped from the table.
- Type: list of strings separated
- Importance: low
- Default:
columns_diff
snapshot.delay.ms
An interval in milliseconds that the connector should wait before taking a snapshot after starting up. This setting can be used to avoid snapshot interruptions when starting multiple connectors in a cluster, which can cause connector re-balancing.
- Type: int
- Importance: low
snapshot.fetch.size
During a snapshot, the connector reads table content in batches of rows. This property specifies the maximum number of rows in a batch.
- Type: int
- Default: 10240
slot.stream.params
Optional list of parameters to be passed to the configured logical decoding plugin. This optional list can be used to enable server-side table filtering when using the wal2json plugin. Allowed values depend on the chosen plugin and are separated by semicolon (for example,
add-tables=`public.table,public.table2;include-lsn=true
.- Type: int
- Importance: low
sanitize.field.names
- Indicates whether field names are sanitized to adhere to Avro naming
requirements.
true
if connector configuration sets thekey.converter
orvalue.converter
property to the Avro converter.false
if not. slot.max.retries
If connecting to a replication slot fails, this is the maximum number of consecutive attempts to connect.
- Type: int
- Default: 6
slot.rety.delay.ms
The number of milliseconds to wait between retry attempts when the connector fails to connect to a replication slot.
- Type: int
- Default: 10000 (10 seconds)
unavailable.value.placeholder
Specifies the constant that the connector provides to indicate that the original value is a toasted value that is not provided by the database. If the setting of
unavailable.value.placeholder
starts with the hex: prefix it is expected that the rest of the string represents hexadecimally encoded octets.- Default:
__debezium_unavailable_value
- Default:
provide.transaction.metadata
Determines whether the connector generates events with transaction boundaries and enriches change event envelopes with transaction metadata. Specify true if you want the connector to do this.
- Type: boolean
- Default:
false
transaction.topic
Controls the name of the topic to which the connector sends transaction metadata messages. The placeholder
${database.server.name}
can be used for referring to the connector’s logical name.- Default:
${database.server.name}.transaction
- Default:
retriable.restart.connector.wait.ms
The number of milliseconds to wait before restarting a connector after a retriable error occurs.
- Type: int
- Default: 10000
skipped.operations
A comma-separated list of operation types that will be skipped during streaming.
- Type: int
- Default:
t
- Value values: [
c
,d
,u
,t
,none
]
signal.data.collection
- Fully-qualified name of the data collection that is used to send signals to
the connector. Specify the collection name using the following format:
<schemaName>.<tableName>
. incremental.snapshot.chunk.size
The maximum number of rows that the connector fetches and reads into memory during an incremental snapshot chunk.
- Type: int
- Default: 1024
Auto topic creation¶
For more information about Auto topic creation, see Configuring Auto Topic Creation for Source Connectors.
Note
Configuration properties accept regular expressions (regex) that are defined as Java regex.
topic.creation.groups
A list of group aliases that are used to define per-group topic configurations for matching topics. A
default
group always exists and matches all topics.- Type: List of String types
- Default: empty
- Possible Values: The values of this property refer to any additional groups. A
default
group is always defined for topic configurations.
topic.creation.$alias.replication.factor
The replication factor for new topics created by the connector. This value must not be larger than the number of brokers in the Kafka cluster. If this value is larger than the number of Kafka brokers, an error occurs when the connector attempts to create a topic. This is a required property for the
default
group. This property is optional for any other group defined intopic.creation.groups
. Other groups use the Kafka broker default value.- Type: int
- Default: n/a
- Possible Values:
>= 1
for a specific valid value or-1
to use the Kafka broker’s default value.
topic.creation.$alias.partitions
The number of topic partitions created by this connector. This is a required property for the
default
group. This property is optional for any other group defined intopic.creation.groups
. Other groups use the Kafka broker default value.- Type: int
- Default: n/a
- Possible Values:
>= 1
for a specific valid value or-1
to use the Kafka broker’s default value.
topic.creation.$alias.include
A list of strings that represent regular expressions that match topic names. This list is used to include topics with matching values, and apply this group’s specific configuration to the matching topics.
$alias
applies to any group defined intopic.creation.groups
. This property does not apply to thedefault
group.- Type: List of String types
- Default: empty
- Possible Values: Comma-separated list of exact topic names or regular expressions.
topic.creation.$alias.exclude
A list of strings representing regular expressions that match topic names. This list is used to exclude topics with matching values from getting the group’s specfic configuration.
$alias
applies to any group defined intopic.creation.groups
. This property does not apply to thedefault
group. Note that exclusion rules override any inclusion rules for topics.- Type: List of String types
- Default: empty
- Possible Values: Comma-separated list of exact topic names or regular expressions.
topic.creation.$alias.${kafkaTopicSpecificConfigName}
Any of the Changing Broker Configurations Dynamically for the version of the Kafka broker where the records will be written. The broker’s topic-level configuration value is used if the configuration is not specified for the rule.
$alias
applies to thedefault
group as well as any group defined intopic.creation.groups
.- Type: property values
- Default: Kafka broker value
For more detail, see the Debezium connector properties documentation.
Note
Portions of the information provided here derives from documentation originally produced by the Debezium Community. Work produced by Debezium is licensed under Creative Commons 3.0.