Important
You are viewing documentation for an older version of Confluent Platform. For the latest, click here.
PostgreSQL Source Connector (Debezium) Configuration Properties¶
The Postgres Source Connector can be configured using a variety of configuration properties.
plugin.name
The name of the Postgres logical decoding plugin installed on the server. Supported values are either
decoderbufs
,wal2json
orwal2json_rds
. There are two new options supported since 0.8.0.Beta1:wal2json_streaming
andwal2json_rds_streaming
. When the processed transactions are very large it is possible that the JSON batch event with all changes in the transaction will not fit into the hard-coded memory buffer of size 1 GB. In such cases it is possible to switch to so-called streaming mode when every change in transactions is sent as a separate message from PostgreSQL into Debezium.- Type: String
- Importance: Medium
- Default:
decoderbufs
slot.name
The name of the Postgres logical decoding slot created for streaming changes from a plugin and database instance. Values must conform to Postgres replication slot naming rules which state: “Each replication slot has a name, which can contain lower-case letters, numbers, and the underscore character.”
- Type: String
- Importance: Medium
- Default:
debezium
slot.drop_on_stop
Indicates to drop or not to drop the logical replication slot when the connector finishes orderly. Should only be set to
true
in testing or development environments. Dropping the slot allows WAL segments to be discarded by the database. If set totrue
the connector may not be able to resume from the WAL position where it left off.- Type: String
- Importance: Low
- Default:
false
database.hostname
IP address or hostname of the PostgreSQL database server.
- Type: String
- Importance: High
database.port
Integer port number of the PostgreSQL database server.
- Type: Integer
- Importance: Low
- Default:
5432
database.user
Username to use when when connecting to the PostgreSQL database server.
- Type: String
- Importance: High
database.password
Password to use when when connecting to the PostgreSQL database server.
- Type: Password
- Importance: High
database.dbname
The name of the PostgreSQL database from which to stream the changes.
- Type: String
- Importance: High
database.server.name
Logical name that identifies and provides a namespace for the particular PostgreSQL database server/cluster being monitored. The logical name should be unique across all other connectors, since it is used as a prefix for all Kafka topic names coming from this connector. Defaults to
host:_port_/dbname
, where host is the value of thedatabase.hostname property
, port is the value of thedatabase.port property
, anddbname
is the value of thedatabase.dbname property
. Confluent recommends using a meaningful and logical name fordbname
.- Type: String
- Importance: High
- Default:
database.hostname:database.port/database.dbname
schema.whitelist
An optional comma-separated list of regular expressions that match schema names to be monitored. Any schema name not included in the whitelist will be excluded from monitoring. By default all non-system schemas are monitored. May not be used with
schema.blacklist
.- Type: List of Strings
- Importance: Low
schema.blacklist
An optional comma-separated list of regular expressions that match schema names to be excluded from monitoring. Any schema name not included in the blacklist will be monitored, with the exception of system schemas. May not be used with
schema.whitelist
.- Type: List of Strings
- Importance: Low
table.whitelist
An optional comma-separated list of regular expressions that match fully-qualified table identifiers for tables to be monitored. Any table not included in the whitelist is excluded from monitoring. Each identifier is in the form
schemaName.tableName
. By default the connector will monitor every non-system table in each monitored schema. May not be used withtable.blacklist
.- Type: List of Strings
- Importance: Low
table.blacklist
An optional comma-separated list of regular expressions that match fully-qualified table identifiers for tables to be excluded from monitoring. Any table not included in the blacklist is monitored. Each identifier is in the form
schemaName.tableName
. May not be used withtable.whitelist
.- Type: List of Strings
- Importance: Low
column.blacklist
An optional comma-separated list of regular expressions that match the fully-qualified names of columns that should be excluded from change event message values. Fully-qualified names for columns are of the form
schemaName.tableName.columnName
.- Type: List of Strings
- Importance: Low
time.precision.mode
Time, date, and timestamps can be represented with different kinds of precision, including:
adaptive
(the default) which captures the time and timestamp values exactly as they are in the database.adaptive
uses either millisecond, microsecond, or nanosecond precision values based on the database column type;adaptive_time_microseconds
which captures the date, datetime and timestamp values exactly as they are in the database.adaptive_time_microseconds
uses either millisecond, microsecond, or nanosecond precision values based on the database column type, with the exception ofTIME
type fields, which are always captured as microseconds;connect
which always represents time and timestamp values using Kafka Connect’s built-in representations for Time, Date, and Timestamp.connect
uses millisecond precision regardless of database column precision.See temporal values.
- Type: String
- Importance: High
- Default:
adaptive
decimal.handling.mode
Specifies how the connector should handle values for
DECIMAL
andNUMERIC
columns:precise
(the default) represents values precisely usingjava.math.BigDecimal
, which are represented in change events in binary form;double
which represents them using double values.double
may result in a loss of precision but is easier to use;string option
which encodes values as a formatted string.string option
is easy to consume but semantic information about the real type is lost.See Decimal Values.
- Type: String
- Importance: High
- Default:
precise
hstore.handling.mode
Specifies how the connector should handle values for hstore columns:
map
(the default) represents using MAP;json
represents them using json strings. The json option encodes values as formatted strings such askey
:val
}.See HStore Values.
- Type: List of Strings
- Importance: Low
- Default: n/a
database.sslmode
Sets whether or not to use an encrypted connection to the PostgreSQL server. Options include:
disable
(the default) to use an unencrypted connection ;require
to use a secure (encrypted) connection. Fails if one cannot be established;verify-ca
is similar torequire
, but additionally verify the server TLS certificate against the configured Certificate Authority (CA) certificates. Fails if no valid matching CA certificates are found;verify-full
is similar toverify-ca
but additionally verify that the server certificate matches the host to which the connection is attempted.See the PostgreSQL documentation for more information.
- Type: String
- Importance: Low
- Default:
disable
database.sslcert
The path to the file containing the SSL Certificate for the client. See the PostgreSQL documentation for more information.
- Type: String
- Importance: High
database.sslpassword
The password to access the client private key from the file specified by
database.sslkey
. See the PostgreSQL documentation for more information.- Type: String
- Importance: Low
database.sslrootcert
The path to the file containing the root certificate(s) against which the server is validated. See the PostgreSQL documentation for more information.
- Type: String
- Importance: Low
database.tcpKeepAlive
Enable TCP keep-alive probe to verify that database connection is still alive. (enabled by default). See the PostgreSQL documentation for more information.
- Type: String
- Importance: Low
tombstones.on.delete
Controls whether a tombstone event should be generated after a delete event. When
true
the delete operations are represented by a delete event and a subsequent tombstone event. Whenfalse
only a delete event is sent. Emitting a tombstone event (the default behavior) allows Kafka to completely delete all events pertaining to the given key once the source record got deleted.- Type: String
- Importance: High
- Dedault: true
column.propagate.source.type
An optional comma-separated list of regular expressions that match the fully-qualified names of columns whose original type and length should be added as a parameter to the corresponding field schemas in the emitted change messages. The schema parameters
__debezium.source.column.type
,__debezium.source.column.length
and__debezium.source.column.scale
are used to propagate the original type name and length (for variable-width types), respectively. Useful to properly size corresponding columns in sink databases. Fully-qualified names for columns are in the formdatabaseName.tableName.columnName
, ordatabaseName.schemaName.tableName.columnName
.- Type: List of Strings
- Importance: Low
- Default: n/a
The following are advanced configuration properties:
snapshot.mode
The path to the file containing the root certificate(s) against which the server is validated. See the PostgreSQL documentation for more information.
- Type: String
- Importance: Low
- Default:
initial
- Valid values: [always, initial, never,`initial_only`]
snapshot.lock.timeout.ms
Positive integer value that specifies the maximum amount of time (in milliseconds) to wait to obtain table locks when performing a snapshot. If table locks cannot be acquired in this time interval, the snapshot will fail See snapshosts.
- Type: String
- Importance: Low
- Default:
10000
snapshot.select.statement.overrides
Controls which rows from tables will be included in snapshot. This property contains a comma-separated list of fully-qualified tables (
DB_NAME.TABLE_NAME
). Select statements for the individual tables are specified in additional configuration properties, one for each table, identified by the idsnapshot.select.statement.overrides.[DB_NAME].[TABLE_NAME]
. The value of these properties is the SELECT statement to use when retrieving data from the specific table during the snapshot process. A possible use case for large append-only tables is setting a specific point where to start (resume) the snapshot process, in case a previous snapshot process was interrupted.Note: This setting has impact on snapshots only. Events generated by logical decoder are not affected by it at all.
- Type: List of Strings
- Importance: Low
rows.fetch.size
Positive integer value that specifies the maximum number of rows that should be read at one time from each table while taking a snapshot. The connector reads the table contents in multiple batches of this size.
- Type: Integer
- Importance: Low
- Default:
10240
max.queue.size
Positive integer value that specifies the maximum size of the blocking queue into which change events received via streaming replication are placed before they are written to Kafka. This queue can provide backpressure when, for example, writes to Kafka are slower or if Kafka is not available.
- Type: Integer
- Importance: Low
- Default:
8192
max.batch.size
Positive integer value that specifies the maximum size of each batch of events that should be processed during each iteration of this connector.
- Type: Integer
- Importance: Low
- Default:
2048
poll.interval.ms
Positive integer value that specifies the number of milliseconds the connector should wait during each iteration for new change events to appear. Defaults to
500
milliseconds.- Type: Integer
- Importance: Low
- Default:
500
include.unknown.datatypes
When Debezium encounters a field whose data type is unknown, the field is omitted from the change event and a warning is logged (the default). In some cases it may be preferable to include the field and send it downstream to clients in an opaque binary representation so the clients can decode it. Set to
false
to filter unknown data from events andtrue
to keep them in binary format.Note: Clients risk backward compatibility issues with this setting. Not only may the database-specific binary representation change between releases, but when the datatype is eventually supported, it will be sent downstream as a logical type, requiring adjustments by consumers. In general, when encountering unsupported data types, please file a feature request so that support can be added.
- Type: Boolean
- Importance: Low
- Default:
false
database.initial.statements
A semicolon separated list of SQL statements to be executed when a JDBC connection (not the transaction log reading connection) to the database is established. Use a double semicolon (
;;
) to use a semicolon as a character and not as a delimiter.Note: The connector may establish JDBC connections at its own discretion. This setting is typically only used for configuring session parameters only. It should not be used for executing DML statements.
- Type: List of Strings separated
- Importance: Low
heartbeat.interval.ms
Controls how frequently heartbeat messages are sent. This property (which is disabled by default) contains an interval in milliseconds that defines how frequently the connector sends messages to a heartbeat topic. This can be used to monitor whether the connector is still receiving change events from the database. You also should use heartbeat messages when records in non-captured tables are changed for a longer period of time. In this case, the connector proceeds to read the log from the database but never emits any change messages into Kafka. This means that no offset updates are committed to Kafka.
This causes WAL files to be retained by the database longer than needed because the connector processed the files already but did not flush the latest retrieved Log Sequence Number (LSN) to the database. Using heartbeat messages may also result in more re-sent change events after a connector restart. Set this parameter to
0
to not send heartbeat messages.- Type: Integer
- Importance: Low
- Default:
0
heartbeat.topics.prefix
Sets the name of the topic to which heartbeat messages are sent. The topic is named according to the pattern
<heartbeat.topics.prefix>.<server.name>
.- Type: String
- Importance: Low
- Default:
__debezium-heartbeat
schema.refresh.mode
Specify the conditions that trigger a refresh of the in-memory schema for a table.
columns_diff
(the default) is the safest mode. This setting ensures the in-memory schema stays in-sync with the database table schema.columns_diff_exclude_unchanged_toast
instructs the connector to refresh the in-memory schema cache if there is a discrepancy between it and the schema derived from the incoming message, unless unchanged TOASTable data fully accounts for the discrepancy.This setting can improve connector performance significantly if there are frequent table updates for tables that have TOASTed data which are rarely part of the updates. However, it is possible for the in-memory schema to become outdated if TOASTable columns are dropped from the table.
- Type: List of Strings separated
- Importance: Low
- Default:
columns_diff
snapshot.delay.ms
An interval in milliseconds that the connector should wait before taking a snapshot after starting up. This setting can be used to avoid snapshot interruptions when starting multiple connectors in a cluster, which can cause connector re-balancing.
- Type: Integer
- Importance: Low
slot.stream.params
Optional list of parameters to be passed to the configured logical decoding plugin. This optional list can be used to enable server-side table filtering when using the wal2json plugin. Allowed values depend on the chosen plugin and are separated by semicolon (for example,
add-tables=`public.table,public.table2;include-lsn=true
.- Type: Integer
- Importance: Low
More details can be found in the Debezium connector properties documentation.
Note
Portions of the information provided here derives from documentation originally produced by the Debezium Community. Work produced by Debezium is licensed under Creative Commons 3.0.