Configuration Reference for Debezium PostgreSQL Source Connector for Confluent Platform¶
You can configure the Postgres Source connector using a variety of configuration properties.
Note
These are properties for the self-managed connector. If you are using Confluent Cloud, see see PostgreSQL Source connector for Confluent Cloud.
Required properties¶
name
- Unique name for the connector. Trying to register again with the same name will fail.
connector.class
- The name of the Java class for the connector. You must use a value of
io.debezium.connector.postgresql.PostgresConnector
for the PostgreSQL connector. tasks.max
The maximum number of tasks that should be created for this connector. The connector always uses a single task and so does not use this value– the default is always acceptable.
- Type: int
- Default: 1
plugin.name
The name of the Postgres logical decoding plugin installed on the server. When the processed transactions are very large it is possible that the JSON batch event with all changes in the transaction will not fit into the hard-coded memory buffer of size 1 GB. In such cases, it is possible to switch to streaming mode when every change in transactions is sent as a separate message from PostgreSQL into Debezium. You can configure the streaming mode by setting
plugin.name
topgoutput
. For more details, see PostgreSQL 10+ logical decoding support (pgoutput) in the Debezium documentation.- Type: string
- Importance: medium
- Default:
decoderbufs
- Valid values:
decoderbufs
,wal2json
andwal2json_rds
. There are two new options supported since 0.8.0.Beta1:wal2json_streaming
andwal2json_rds_streaming
.
slot.name
The name of the Postgres logical decoding slot created for streaming changes from a plugin and database instance. Values must conform to Postgres replication slot naming rules which state: “Each replication slot has a name, which can contain lower-case letters, numbers, and the underscore character.”
- Type: string
- Importance: medium
- Default:
debezium
slot.drop.on.stop
Indicates to drop or not to drop the logical replication slot when the connector finishes orderly. Should only be set to
true
in testing or development environments. Dropping the slot allows WAL segments to be discarded by the database. If set totrue
the connector may not be able to resume from the WAL position where it left off.- Type: string
- Importance: low
- Default:
false
publication.name
The name of the PostgreSQL publication created for streaming changes when using
pgoutput
. This publication is created at start-up if it does not already exist and it includes all tables. If the publication already exists, either for all tables or configured with a subset of tables, the connector uses the publication as it is defined.- Type: string
- Importance: low
- Default:
dbz_publication
database.hostname
IP address or hostname of the PostgreSQL database server.
- Type: string
- Importance: high
database.port
Integer port number of the PostgreSQL database server.
- Type: int
- Importance: low
- Default:
5432
database.user
Username to use when when connecting to the PostgreSQL database server.
- Type: string
- Importance: high
database.password
Password to use when when connecting to the PostgreSQL database server.
- Type: password
- Importance: high
database.dbname
The name of the PostgreSQL database from which to stream the changes.
- Type: string
- Importance: high
topic.prefix
Topic prefix that provides a namespace for the particular PostgreSQL database server or cluster in which Debezium is capturing changes. The prefix should be unique across all other connectors, since it is used as a topic name prefix for all Kafka topics that receive records from this connector. Only alphanumeric characters, hyphens, dots and underscores must be used in the database server logical name. Do not change the value of this property. If you change the name value, after a restart, instead of continuing to emit events to the original topics, the connector emits subsequent events to topics whose names are based on the new value.
- Type: string
- Default: No default
schema.include.list
An optional comma-separated list of regular expressions that match schema names to be monitored. Any schema name not included in the whitelist will be excluded from monitoring. By default all non-system schemas are monitored. May not be used with
schema.exclude.list
.- Type: list of strings
- Importance: low
schema.exclude.list
An optional comma-separated list of regular expressions that match schema names to be excluded from monitoring. Any schema name not included in the exclude list will be monitored, with the exception of system schemas. May not be used with
schema.whitelist
.- Type: list of strings
- Importance: low
table.include.list
An optional comma-separated list of regular expressions that match fully-qualified table identifiers for tables to be monitored. Any table not included in the whitelist is excluded from monitoring. Each identifier is in the form
schemaName.tableName
. By default the connector will monitor every non-system table in each monitored schema. May not be used withtable.exclude.list
.- Type: list of strings
- Importance: low
table.exclude.list
An optional comma-separated list of regular expressions that match fully-qualified table identifiers for tables to be excluded from monitoring. Any table not included in the exclude list is monitored. Each identifier is in the form
schemaName.tableName
. May not be used withtable.whitelist
.- Type: list of strings
- Importance: low
column.include.list
An optional, comma-separated list of regular expressions that match the fully-qualified names of columns that should be included in change event record values. Fully-qualified names for columns are of the form
schemaName.tableName.columnName
. Do not also set thecolumn.exclude.list
property.- Type: list of strings
- Importance: low
column.exclude.list
An optional comma-separated list of regular expressions that match the fully-qualified names of columns that should be excluded from change event message values. Fully-qualified names for columns are of the form
schemaName.tableName.columnName
.- Type: list of strings
- Importance: low
skip.messages.without.change
Specifies whether to skip publishing messages when there is no change in included columns. This would essentially filter messages if there is no change in columns included as per
column.include.list
orcolumn.exclude.list
properties.- Type: boolean
- Default: false
time.precision.mode
Time, date, and timestamps can be represented with different kinds of precision, including:
adaptive
: Captures the time and timestamp values exactly as they are in the database.adaptive
uses either millisecond, microsecond, or nanosecond precision values based on the database column type.adaptive_time_microseconds
: Captures the date, datetime and timestamp values exactly as they are in the database.adaptive_time_microseconds
: Uses either millisecond, microsecond, or nanosecond precision values based on the database column type, with the exception ofTIME
type fields, which are always captured as microseconds.connect
: Always represents time and timestamp values using Kafka Connect’s built-in representations for Time, Date, and Timestamp.connect
uses millisecond precision regardless of database column precision. For more details, see temporal values.
- Type: string
- Importance: high
- Default:
adaptive
decimal.handling.mode
Specifies how the connector should handle values for
DECIMAL
andNUMERIC
columns:precise
: Represents values precisely usingjava.math.BigDecimal
, which are represented in change events in binary form.double
: Represents them using double values.double
may result in a loss of precision but is easier to use.string option
: Encodes values as a formatted string.string option
is easy to consume but semantic information about the real type is lost. See Decimal Values.
- Type: string
- Importance: high
- Default:
precise
hstore.handling.mode
Specifies how the connector should handle values for hstore columns.
map
represents using MAP.json
represents them using JSON strings. The JSON option encodes values as formatted strings, such askey
:val
. For more details, see HStore Values.- Type: list of strings
- Importance: low
- Default:
map
interval.handling.mode
Specifies how the connector should handle values for interval columns.
- Type: string
- Default:
numeric
- Valid values: [
numeric
orstring
]
database.sslmode
Sets whether or not to use an encrypted connection to the PostgreSQL server. The option of
disable
uses an unencrypted connection.require
uses a secure (encrypted) connection and fails if one cannot be established.verify-ca
is similar torequire
, but additionally verify the server TLS certificate against the configured Certificate Authority (CA) certificates. Fails if no valid matching CA certificates are found.verify-full
is similar toverify-ca
but additionally verify that the server certificate matches the host to which the connection is attempted. For more information, see the PostgreSQL documentation.- Type: string
- Importance: low
- Default:
disable
database.sslcert
The path to the file containing the SSL certificate of the client. See the PostgreSQL documentation for more information.
- Type: string
- Importance: high
database.sslkey
The path to the file containing the SSL private key of the client. See the PostgreSQL documentation for more information.
- Type: string
database.sslpassword
The password to access the client private key from the file specified by
database.sslkey
. See the PostgreSQL documentation for more information.- Type: string
- Importance: low
database.sslrootcert
The path to the file containing the root certificate(s) against which the server is validated. See the PostgreSQL documentation for more information.
- Type: string
- Importance: low
database.tcpKeepAlive
Enable TCP keep-alive probe to verify that database connection is still alive. Enabled by default. See the PostgreSQL documentation for more information.
- Type: string
- Importance: low
- Default:
true
tombstones.on.delete
Controls whether a tombstone event should be generated after a delete event. When
true
the delete operations are represented by a delete event and a subsequent tombstone event. Whenfalse
only a delete event is sent. Emitting a tombstone event (the default behavior) allows Kafka to completely delete all events pertaining to the given key once the source record got deleted.- Type: string
- Importance: high
- Default:
true
column.truncate.to.length.chars
An optional, comma-separated list of regular expressions that match the fully-qualified names of character-based columns. Fully-qualified names for columns are of the form schemaName.tableName.columnName. In change event records, values in these columns are truncated if they are longer than the number of characters specified by length in the property name. You can specify multiple properties with different lengths in a single configuration. Length must be a positive integer, for example,
+column.truncate.to.20.chars.
- Type: list of strings
- Default: No default
column.mask.with.length.chars
An optional, comma-separated list of regular expressions that match the fully-qualified names of character-based columns. Fully-qualified names for columns are of the form
schemaName.tableName.columnName
. In change event values, the values in the specified table columns are replaced with length number of asterisk(*)
characters. You can specify multiple properties with different lengths in a single configuration. Length must be a positive integer or zero. When you specify zero, the connector replaces a value with an empty string.- Type: list of strings
- Default: No default
column.mask.hash.hashAlgorithm.with.salt.salt
;column.mask.hash.v2.hashAlgorithm.with.salt.salt
An optional, comma-separated list of regular expressions that match the fully-qualified names of character-based columns. Fully-qualified names for columns are of the form
<schemaName>.<tableName>.<columnName>
. In the resulting change event record, the values for the specified columns are replaced with pseudonyms, which consists of the hashed value that results from applying the specifiedhashAlgorithm
andsalt
. Based on the hash function that is used, referential integrity is maintained, while column values are replaced with pseudonyms. Supported hash functions are described in the MessageDigest documentation.- Type: list of strings
- Default: No default
column.propagate.source.type
An optional, comma-separated list of regular expressions that match the database-specific data type name for some columns. Fully-qualified data type names are of the form
databaseName.tableName.typeName
, ordatabaseName.schemaName.tableName.typeName
.For these data types, the connector adds parameters to the corresponding field schemas in emitted change records. The added parameters specify the original type and length of the column:
__debezium.source.column.type + __debezium.source.column.length + __debezium.source.column.scale
.- Type: list of strings
- Default: No default
datatype.propagate.source.type
An optional, comma-separated list of regular expressions that match the database-specific data type name for some columns. Fully-qualified data type names are of the form
databaseName.tableName.typeName
, ordatabaseName.schemaName.tableName.typeName
. For more details, see the Debezium documentation- Type: list of strings
- Default: No default
message.key.columns
A list of expressions that specify the columns that the connector uses to form custom message keys for change event records that it publishes to the Kafka topics for specified tables.
By default, Debezium uses the primary key column of a table as the message key for records that it emits. In place of the default, or to specify a key for tables that lack a primary key, you can configure custom message keys based on one or more columns.
To establish a custom message key for a table, list the table, followed by the columns to use as the message key. Each list entry takes the following format:
<fully-qualified_tableName>:_<keyColumn>_,<keyColumn>
To base a table key on multiple column names, insert commas between the column names.
Each fully-qualified table name is a regular expression in the following format:
<databaseName>.<tableName>
The property can include entries for multiple tables. Use a semicolon to separate table entries in the list. The following example sets the message key for the tables
inventory.customers
andpurchase.orders
:inventory.customers:pk1,pk2;(.*).purchaseorders:pk3,pk4
For the table inventory.customer, the columns pk1 and pk2 are specified as the message key. For the
purchaseorders
tables in any database, the columnspk3
andpk4
server as the message key.There is no limit to the number of columns that you use to create custom message keys. However, it’s best to use the minimum number that are required to specify a unique key.
- Type: list
- Default: empty string
publication.autocreate.mode
Applies only when streaming changes by using the pgoutput plug-in. The setting determines how creation of a publication should work.
- Default:
all_tables
- Valid values: [
all_tables
,disabled
,filtered
]
- Default:
replica.identity.autoset.values
A comma-separated list of regular expressions that match fully-qualified tables and the replica identity value to be used in a table. This property determines the value for replica identity at the table level and will overwrite the existing value in the database. For more details about this property, see the Debezium documentation.
- Type: list of strings
- Default: empty string
binary.handling.mode
Specifies how binary (bytea) columns should be represented in change events.
- Type: bytes or string
- Importance: low
- Valid values: [
bytes
,base4
,hex
]
schema.name.adjustment.mode
Specifies how schema names should be adjusted for compatibility with the message converter used by the connector. Possible settings are:
avro
, which replaces the characters that cannot be used in the Avro type name with an underscore, andnone
, which does not apply any adjustment.- Type: string
- Default:
avro
field.name.adjustment.mode
Specifies how schema names should be adjusted for compatibility with the message converter used by the connector. The following are possible settings:
avro
: Replaces the characters that cannot be used in the Avro type name with an underscorenone
: Does not apply any adjustmentavro_unicode
: Replaces the underscore or characters that cannot be used in the Avro type name with corresponding unicode like_uxxxx
. Note that_
is an escape sequence like backslash in Java.
For more details, see Avro naming.
- Type: string
- Default:
none
money.fraction.digits
Specifies how many decimal digits should be used when converting Postgres money type to
java.math.BigDecimal
, which represents the values in change events. Applicable only whendecimal.handling.mode
is set toprecise
.- Type: int
- Default: 2
message.prefix.include.list
An optional, comma-separated list of regular expressions that match names of logical decoding message prefixes for which you want to capture. Any logical decoding message with a prefix not included in
message.prefix.include.list
is excluded. Do not also set themessage.prefix.exclude.list
parameter when setting this property.For information about the structure of message events and about their ordering semantics, see message events.
- Type: list of strings
- Default: By default, all logical decoding messages are captured.
message.prefix.exclude.list
An optional, comma-separated list of regular expressions that match names of logical decoding message prefixes for which you do not to capture. Any logical decoding message with a prefix that is not included in
message.prefix.exclude.list
is included. Do not also set themessage.prefix.include.list
parameter when setting this property. To exclude all logical decoding messages pass.*
into this config.- Type: list of strings
- Default: No default
Advanced properties¶
converters
Enumerates a comma-separated list of the symbolic names of the custom converter instances that the connector can use. For more details about this property, see the Debezium documentation
- Default: No default
snapshot.mode
The criteria for running a snapshot upon startup of the connector.
- Type: string
- Importance: medium
- Default:
initial
- Valid values: [
always
,initial
,initial_only
never
,custom
]
snapshot.custom.class
- A full Java class name that is an implementation of the
io.debezium.connector.postgresql.spi.Snapshotter interface
. Required when thesnapshot.mode
property is set tocustom
. snapshot.include.collection.list
A optional comma-separated list of regular expressions that match the fully-qualified names (
<schemaName>.<tableName>
) of the tables to include in a snapshot. The specified items must be named in the connector’stable.include.list property
. This property takes effect only if the connector’ssnapshot.mode
property is set to a value other thannever
.- Default: All tables specified in
table.include.list
- Default: All tables specified in
snapshot.lock.timeout.ms
Positive integer value that specifies the maximum amount of time (in milliseconds) to wait to obtain table locks when performing a snapshot. If table locks cannot be acquired in this time interval, the snapshot will fail.
- Type: string
- Importance: low
- Default: 10000
snapshot.select.statement.overrides
Controls which rows from tables will be included in snapshot. This property contains a comma-separated list of fully-qualified tables (
DB_NAME.TABLE_NAME
). Select statements for the individual tables are specified in additional configuration properties, one for each table, identified by the idsnapshot.select.statement.overrides.[DB_NAME].[TABLE_NAME]
. The value of these properties is the SELECT statement to use when retrieving data from the specific table during the snapshot process. A possible use case for large append-only tables is setting a specific point where to start (resume) the snapshot process, in case a previous snapshot process was interrupted.Note that this setting has impact on snapshots only. Events generated by logical decoder are not affected by it at all.
- Type: list of strings
- Importance: low
- Default: No default
event.processing.failure.handling.mode
Specifies how the connector should react to exceptions during processing of events.
- Default:
fail
- Valid values: [
fail
,warn
,skip
]
- Default:
max.queue.size
Positive integer value that specifies the maximum size of the blocking queue into which change events received during streaming replication are placed before they are written to Kafka. This queue can provide backpressure when, for example, writes to Kafka are slower, or if Kafka is not available.
- Type: int
- Importance: low
- Default: 8192
max.batch.size
Positive integer value that specifies the maximum size of each batch of events that should be processed during each iteration of this connector.
- Type: int
- Importance: low
- Default: 2048
max.queue.size.in.bytes
Long value for the maximum size in bytes of the blocking queue. The feature is disabled by default, it will be active if it’s set with a positive long value.
- Type: long
- Default: 0
poll.interval.ms
Positive integer value that specifies the number of milliseconds the connector should wait during each iteration for new change events to appear. Defaults to
500
milliseconds.- Type: int
- Importance: low
- Default: 500
include.unknown.datatypes
When Debezium encounters a field whose data type is unknown, the field is omitted from the change event and a warning is logged (the default). In some cases it may be preferable to include the field and send it downstream to clients in an opaque binary representation so the clients can decode it. Set to
false
to filter unknown data from events andtrue
to keep them in binary format.Note
Clients risk backward compatibility issues with this setting. Not only may the database-specific binary representation change between releases, but when the datatype is eventually supported, it will be sent downstream as a logical type, requiring adjustments by consumers. In general, when encountering unsupported data types, please file a feature request so that support can be added.
- Type: boolean
- Importance: low
- Default:
false
database.initial.statements
A semicolon-separated list of SQL statements to be executed when a JDBC connection (not the transaction log reading connection) to the database is established. Use a double semicolon (
;;
) to use a semicolon as a character and not as a delimiter.- Type: list of strings
- Importance: low
- Default: No default
status.update.interval.ms
Frequency for sending replication connection status updates to the server, given in milliseconds. The property also controls how frequently the database status is checked to detect a dead connection in case the database was shut down.
- Type: int
- Default: 10000
heartbeat.interval.ms
Controls how frequently heartbeat messages are sent. This property (which is disabled by default) contains an interval in milliseconds that defines how frequently the connector sends messages to a heartbeat topic. This can be used to monitor whether the connector is still receiving change events from the database. You also should use heartbeat messages when records in non-captured tables are changed for a longer period of time. In this case, the connector proceeds to read the log from the database but never emits any change messages into Kafka. This means that no offset updates are committed to Kafka.
This causes WAL files to be retained by the database longer than needed because the connector processed the files already but did not flush the latest retrieved Log Sequence Number (LSN) to the database. Using heartbeat messages may also result in more re-sent change events after a connector restart. Set this parameter to
0
to not send heartbeat messages.- Type: int
- Importance: low
- Default: 0
heartbeat.action.query
Specifies a query that the connector executes on the source database when the connector sends a heartbeat message. For more details, see the Debezium
- Default: No default
schema.refresh.mode
Specify the conditions that trigger a refresh of the in-memory schema for a table.
columns_diff
(the default) is the safest mode. This setting ensures the in-memory schema stays in-sync with the database table schema.columns_diff_exclude_unchanged_toast
instructs the connector to refresh the in-memory schema cache if there is a discrepancy between it and the schema derived from the incoming message, unless unchanged TOASTable data fully accounts for the discrepancy.This setting can improve connector performance significantly if there are frequent table updates for tables that have TOASTed data which are rarely part of the updates. However, it is possible for the in-memory schema to become outdated if TOASTable columns are dropped from the table.
- Type: list of strings separated
- Importance: low
- Default:
columns_diff
snapshot.delay.ms
An interval in milliseconds that the connector should wait before taking a snapshot after starting up. This setting can be used to avoid snapshot interruptions when starting multiple connectors in a cluster, which can cause connector re-balancing.
- Type: int
- Importance: low
- Default: No default
snapshot.fetch.size
During a snapshot, the connector reads table content in batches of rows. This property specifies the maximum number of rows in a batch.
- Type: int
- Default: 10240
slot.stream.params
Optional list of parameters to be passed to the configured logical decoding plugin. This optional list can be used to enable server-side table filtering when using the wal2json plugin. Allowed values depend on the chosen plugin and are separated by semicolon (for example,
add-tables=`public.table,public.table2;include-lsn=true
.- Type: int
- Importance: low
- Default: No default
slot.max.retries
If connecting to a replication slot fails, this is the maximum number of consecutive attempts to connect.
- Type: int
- Default: 6
slot.retry.delay.ms
The number of milliseconds to wait between retry attempts when the connector fails to connect to a replication slot.
- Type: int
- Default: 10000 (10 seconds)
unavailable.value.placeholder
Specifies the constant that the connector provides to indicate that the original value is a toasted value that is not provided by the database. If the setting of
unavailable.value.placeholder
starts with thehex:
prefix, it is expected that the rest of the string represents hexadecimally encoded octets. For more details, see the Debezium documentation- Type: string
- Default:
__debezium_unavailable_value
provide.transaction.metadata
Determines whether the connector generates events with transaction boundaries and enriches change event envelopes with transaction metadata. Specify
true
if you want the connector to do this.- Type: boolean
- Default:
false
flush.lsn.source
Determines whether the connector should commit the LSN of the processed records in the source postgres database so that the WAL logs can be deleted. Specify
false
if you don’t want the connector to do this. Please note that if set tofalse
, LSN will not be acknowledged by Debezium and as a result the WAL logs will not be cleared which might result in disk space issues. Users are expected to handle the acknowledgement of LSN outside Debezium.- Type: boolean
- Default:
true
retriable.restart.connector.wait.ms
The number of milliseconds to wait before restarting a connector after a retriable error occurs.
- Type: int
- Default: 10000
skipped.operations
A comma-separated list of operation types that will be skipped during streaming. Possible values include:
c
for inserts/create,u
for updates,d
for deletes,t
for truncates, andnone
to not skip any operation.- Type: list
- Default:
t
signal.data.collection
Fully-qualified name of the data collection that is used to send signals to the connector. Use the following format to specify the collection name:
<schemaName>.<tableName>
.- Type: string
- Default: No default
signal.enabled.channels
A list of the signal channel names that are enabled for the connector. The following channels are available by default:
source
,kafka
,file
andjmx
.Note that you may also implement a custom signaling channel.
- Type: string
- Default:
source
notification.enabled.channels
A list of the notification channel names that are enabled for the connector. The following channels are available by default:
sink
,log
andjmx
.Note that you may also implement a custom notification channel.
- Type: string
- Default: No default
incremental.snapshot.chunk.size
The number of rows fetched from the database for each incremental snapshot iteration.
- Type: int
- Default: 1024
xmin.fetch.interval.ms
How often, in milliseconds, the XMIN will be read from the replication slot. The XMIN value provides the lower bounds of where a new replication slot could start from.
- Type: int
- Default: 0 (disable tracking XMIN tracking)
topic.naming.strategy
The name of the TopicNamingStrategy class that should be used to determine the topic name for data change, schema change, transaction, heartbeat event, and so forth.
- Type: string
- Defaults:
io.debezium.schema.DefaultTopicNamingStrategy
topic.delimiter
Specifies the delimiter for topic name.
- Type: string
- Default:
.
topic.cache.size
The size used for holding the topic names in a bounded concurrent hash map. This cache will help to determine the topic name corresponding to a given data collection.
- Type: int
- Default: 10000
topic.heartbeat.prefix
Sets the name of the topic to which heartbeat messages are sent. The topic is named according to the pattern
<topics.heartbeat.prefix>.<topic.prefix>
. For example, if the topic prefix isfulfillment
, the default topic name is__debezium-heartbeat.fulfillment
.- Type: string
- Importance: low
- Default:
__debezium-heartbeat
topic.transaction
Controls the name of the topic to which the connector sends transaction metadata messages. The topic name has the following pattern:
<topic.prefix>.<topic.transaction>
. For example, if the topic prefix isfulfillment
, the default topic name isfulfillment.transaction
.- Type: string
- Default:
transaction
snapshot.max.threads
Positive integer value that specifies the maximum number of threads used to perform an initial sync of the collections in a replica set.
- Type: int
- Importance: medium
- Default: 1
custom.metric.tagss
A comma-separated list of key-value pairs to customize the MBean object name which should be appended to the end of the regular name. For more details, see the Debezium documentation.
- Type: list of strings
- Default: No default
errors.max.retries
The maximum number of retries on retriable errors (for example, connection errors) before failing.
- Type: int
- Default: -1
Auto topic creation¶
For more information about Auto topic creation, see Configuring Auto Topic Creation for Source Connectors.
Configuration properties accept regular expressions (regex) that are defined as Java regex.
topic.creation.groups
A list of group aliases that are used to define per-group topic configurations for matching topics. A
default
group always exists and matches all topics.- Type: List of String types
- Default: empty
- Possible Values: The values of this property refer to any additional groups. A
default
group is always defined for topic configurations.
topic.creation.$alias.replication.factor
The replication factor for new topics created by the connector. This value must not be larger than the number of brokers in the Kafka cluster. If this value is larger than the number of Kafka brokers, an error occurs when the connector attempts to create a topic. This is a required property for the
default
group. This property is optional for any other group defined intopic.creation.groups
. Other groups use the Kafka broker default value.- Type: int
- Default: n/a
- Possible Values:
>= 1
for a specific valid value or-1
to use the Kafka broker’s default value.
topic.creation.$alias.partitions
The number of topic partitions created by this connector. This is a required property for the
default
group. This property is optional for any other group defined intopic.creation.groups
. Other groups use the Kafka broker default value.- Type: int
- Default: n/a
- Possible Values:
>= 1
for a specific valid value or-1
to use the Kafka broker’s default value.
topic.creation.$alias.include
A list of strings that represent regular expressions that match topic names. This list is used to include topics with matching values, and apply this group’s specific configuration to the matching topics.
$alias
applies to any group defined intopic.creation.groups
. This property does not apply to thedefault
group.- Type: List of String types
- Default: empty
- Possible Values: Comma-separated list of exact topic names or regular expressions.
topic.creation.$alias.exclude
A list of strings representing regular expressions that match topic names. This list is used to exclude topics with matching values from getting the group’s specfic configuration.
$alias
applies to any group defined intopic.creation.groups
. This property does not apply to thedefault
group. Note that exclusion rules override any inclusion rules for topics.- Type: List of String types
- Default: empty
- Possible Values: Comma-separated list of exact topic names or regular expressions.
topic.creation.$alias.${kafkaTopicSpecificConfigName}
Any of the Changing Broker Configurations Dynamically for the version of the Kafka broker where the records will be written. The broker’s topic-level configuration value is used if the configuration is not specified for the rule.
$alias
applies to thedefault
group as well as any group defined intopic.creation.groups
.- Type: property values
- Default: Kafka broker value
For more details, see the Debezium connector properties documentation.
CSFLE configuration¶
csfle.enabled
Accepts a boolean value. CSFLE is enabled for the connector if csfle.enabled
is set to True.
- Type: boolean
- Default: False
auto.register.schemas
Specifies if the Serializer should attempt to register the Schema with Schema Registry.
- Type: boolean
- Default: true
- Importance: medium
use.latest.version
Only applies when auto.register.schemas
is set to false. If auto.register.schemas
is set to false and use.latest.version
is set to true, then instead of deriving a schema for the object passed to the client for serialization, Schema Registry uses the latest version of the schema in the subject for serialization.
- Type: boolean
- Default: true
- Importance: medium
Note
Portions of the information provided here derives from documentation originally produced by the Debezium Community. Work produced by Debezium is licensed under Creative Commons 3.0.