Configuration Reference for Debezium PostgreSQL Source Connector for Confluent Platform

You can configure the Postgres Source connector using a variety of configuration properties.

Note

These are properties for the self-managed connector. If you are using Confluent Cloud, see see PostgreSQL Source connector for Confluent Cloud.

Required properties

name
Unique name for the connector. Trying to register again with the same name will fail.
connector.class
The name of the Java class for the connector. You must use a value of io.debezium.connector.postgresql.PostgresConnector for the PostgreSQL connector.
tasks.max

The maximum number of tasks that should be created for this connector. The connector always uses a single task and so does not use this value– the default is always acceptable.

  • Type: int
  • Default: 1
plugin.name

The name of the Postgres logical decoding plugin installed on the server. When the processed transactions are very large it is possible that the JSON batch event with all changes in the transaction will not fit into the hard-coded memory buffer of size 1 GB. In such cases, it is possible to switch to streaming mode when every change in transactions is sent as a separate message from PostgreSQL into Debezium. You can configure the streaming mode by setting plugin.name to pgoutput. For more details, see PostgreSQL 10+ logical decoding support (pgoutput) in the Debezium documentation.

  • Type: string
  • Importance: medium
  • Default: decoderbufs
  • Valid values: decoderbufs, wal2json and wal2json_rds. There are two new options supported since 0.8.0.Beta1: wal2json_streaming and wal2json_rds_streaming.
slot.name

The name of the Postgres logical decoding slot created for streaming changes from a plugin and database instance. Values must conform to Postgres replication slot naming rules which state: “Each replication slot has a name, which can contain lower-case letters, numbers, and the underscore character.”

  • Type: string
  • Importance: medium
  • Default: debezium
slot.drop.on.stop

Indicates to drop or not to drop the logical replication slot when the connector finishes orderly. Should only be set to true in testing or development environments. Dropping the slot allows WAL segments to be discarded by the database. If set to true the connector may not be able to resume from the WAL position where it left off.

  • Type: string
  • Importance: low
  • Default: false
publication.name

The name of the PostgreSQL publication created for streaming changes when using pgoutput. This publication is created at start-up if it does not already exist and it includes all tables. If the publication already exists, either for all tables or configured with a subset of tables, the connector uses the publication as it is defined.

  • Type: string
  • Importance: low
  • Default: dbz_publication
database.hostname

IP address or hostname of the PostgreSQL database server.

  • Type: string
  • Importance: high
database.port

Integer port number of the PostgreSQL database server.

  • Type: int
  • Importance: low
  • Default: 5432
database.user

Username to use when when connecting to the PostgreSQL database server.

  • Type: string
  • Importance: high
database.password

Password to use when when connecting to the PostgreSQL database server.

  • Type: password
  • Importance: high
database.dbname

The name of the PostgreSQL database from which to stream the changes.

  • Type: string
  • Importance: high
topic.prefix

Topic prefix that provides a namespace for the particular PostgreSQL database server or cluster in which Debezium is capturing changes. The prefix should be unique across all other connectors, since it is used as a topic name prefix for all Kafka topics that receive records from this connector. Only alphanumeric characters, hyphens, dots and underscores must be used in the database server logical name. Do not change the value of this property. If you change the name value, after a restart, instead of continuing to emit events to the original topics, the connector emits subsequent events to topics whose names are based on the new value.

  • Type: string
  • Default: No default
schema.include.list

An optional comma-separated list of regular expressions that match schema names to be monitored. Any schema name not included in the whitelist will be excluded from monitoring. By default all non-system schemas are monitored. May not be used with schema.exclude.list.

  • Type: list of strings
  • Importance: low
schema.exclude.list

An optional comma-separated list of regular expressions that match schema names to be excluded from monitoring. Any schema name not included in the exclude list will be monitored, with the exception of system schemas. May not be used with schema.whitelist.

  • Type: list of strings
  • Importance: low
table.include.list

An optional comma-separated list of regular expressions that match fully-qualified table identifiers for tables to be monitored. Any table not included in the whitelist is excluded from monitoring. Each identifier is in the form schemaName.tableName. By default the connector will monitor every non-system table in each monitored schema. May not be used with table.exclude.list.

  • Type: list of strings
  • Importance: low
table.exclude.list

An optional comma-separated list of regular expressions that match fully-qualified table identifiers for tables to be excluded from monitoring. Any table not included in the exclude list is monitored. Each identifier is in the form schemaName.tableName. May not be used with table.whitelist.

  • Type: list of strings
  • Importance: low
column.include.list

An optional, comma-separated list of regular expressions that match the fully-qualified names of columns that should be included in change event record values. Fully-qualified names for columns are of the form schemaName.tableName.columnName. Do not also set the column.exclude.list property.

  • Type: list of strings
  • Importance: low
column.exclude.list

An optional comma-separated list of regular expressions that match the fully-qualified names of columns that should be excluded from change event message values. Fully-qualified names for columns are of the form schemaName.tableName.columnName.

  • Type: list of strings
  • Importance: low
skip.messages.without.change

Specifies whether to skip publishing messages when there is no change in included columns. This would essentially filter messages if there is no change in columns included as per column.include.list or column.exclude.list properties.

  • Type: boolean
  • Default: false
time.precision.mode

Time, date, and timestamps can be represented with different kinds of precision, including:

  • adaptive: Captures the time and timestamp values exactly as they are in the database. adaptive uses either millisecond, microsecond, or nanosecond precision values based on the database column type.
  • adaptive_time_microseconds: Captures the date, datetime and timestamp values exactly as they are in the database.
  • adaptive_time_microseconds: Uses either millisecond, microsecond, or nanosecond precision values based on the database column type, with the exception of TIME type fields, which are always captured as microseconds.
  • connect: Always represents time and timestamp values using Kafka Connect’s built-in representations for Time, Date, and Timestamp. connect uses millisecond precision regardless of database column precision. For more details, see temporal values.
  • Type: string
  • Importance: high
  • Default: adaptive
decimal.handling.mode

Specifies how the connector should handle values for DECIMAL and NUMERIC columns:

  • precise: Represents values precisely using java.math.BigDecimal, which are represented in change events in binary form.
  • double: Represents them using double values. double may result in a loss of precision but is easier to use.
  • string option: Encodes values as a formatted string. string option is easy to consume but semantic information about the real type is lost. See Decimal Values.
  • Type: string
  • Importance: high
  • Default: precise
hstore.handling.mode

Specifies how the connector should handle values for hstore columns. map represents using MAP. json represents them using JSON strings. The JSON option encodes values as formatted strings, such as key: val. For more details, see HStore Values.

  • Type: list of strings
  • Importance: low
  • Default: map
interval.handling.mode

Specifies how the connector should handle values for interval columns.

  • Type: string
  • Default: numeric
  • Valid values: [numeric or string]
database.sslmode

Sets whether or not to use an encrypted connection to the PostgreSQL server. The option of disable uses an unencrypted connection. require uses a secure (encrypted) connection and fails if one cannot be established. verify-ca is similar to require, but additionally verify the server TLS certificate against the configured Certificate Authority (CA) certificates. Fails if no valid matching CA certificates are found. verify-full is similar to verify-ca but additionally verify that the server certificate matches the host to which the connection is attempted. For more information, see the PostgreSQL documentation.

  • Type: string
  • Importance: low
  • Default: disable
database.sslcert

The path to the file containing the SSL certificate of the client. See the PostgreSQL documentation for more information.

  • Type: string
  • Importance: high
database.sslkey

The path to the file containing the SSL private key of the client. See the PostgreSQL documentation for more information.

  • Type: string
database.sslpassword

The password to access the client private key from the file specified by database.sslkey. See the PostgreSQL documentation for more information.

  • Type: string
  • Importance: low
database.sslrootcert

The path to the file containing the root certificate(s) against which the server is validated. See the PostgreSQL documentation for more information.

  • Type: string
  • Importance: low
database.tcpKeepAlive

Enable TCP keep-alive probe to verify that database connection is still alive. Enabled by default. See the PostgreSQL documentation for more information.

  • Type: string
  • Importance: low
  • Default: true
tombstones.on.delete

Controls whether a tombstone event should be generated after a delete event. When true the delete operations are represented by a delete event and a subsequent tombstone event. When false only a delete event is sent. Emitting a tombstone event (the default behavior) allows Kafka to completely delete all events pertaining to the given key once the source record got deleted.

  • Type: string
  • Importance: high
  • Default: true
column.truncate.to.length.chars

An optional, comma-separated list of regular expressions that match the fully-qualified names of character-based columns. Fully-qualified names for columns are of the form schemaName.tableName.columnName. In change event records, values in these columns are truncated if they are longer than the number of characters specified by length in the property name. You can specify multiple properties with different lengths in a single configuration. Length must be a positive integer, for example, +column.truncate.to.20.chars.

  • Type: list of strings
  • Default: No default
column.mask.with.length.chars

An optional, comma-separated list of regular expressions that match the fully-qualified names of character-based columns. Fully-qualified names for columns are of the form schemaName.tableName.columnName. In change event values, the values in the specified table columns are replaced with length number of asterisk (*) characters. You can specify multiple properties with different lengths in a single configuration. Length must be a positive integer or zero. When you specify zero, the connector replaces a value with an empty string.

  • Type: list of strings
  • Default: No default
column.mask.hash.hashAlgorithm.with.salt.salt; column.mask.hash.v2.hashAlgorithm.with.salt.salt

An optional, comma-separated list of regular expressions that match the fully-qualified names of character-based columns. Fully-qualified names for columns are of the form <schemaName>.<tableName>.<columnName>. In the resulting change event record, the values for the specified columns are replaced with pseudonyms, which consists of the hashed value that results from applying the specified hashAlgorithm and salt. Based on the hash function that is used, referential integrity is maintained, while column values are replaced with pseudonyms. Supported hash functions are described in the MessageDigest documentation.

  • Type: list of strings
  • Default: No default
column.propagate.source.type

An optional, comma-separated list of regular expressions that match the database-specific data type name for some columns. Fully-qualified data type names are of the form databaseName.tableName.typeName, or databaseName.schemaName.tableName.typeName.

For these data types, the connector adds parameters to the corresponding field schemas in emitted change records. The added parameters specify the original type and length of the column: __debezium.source.column.type + __debezium.source.column.length + __debezium.source.column.scale.

  • Type: list of strings
  • Default: No default
datatype.propagate​.source.type

An optional, comma-separated list of regular expressions that match the database-specific data type name for some columns. Fully-qualified data type names are of the form databaseName.tableName.typeName, or databaseName.schemaName.tableName.typeName. For more details, see the Debezium documentation

  • Type: list of strings
  • Default: No default
message.key.columns

A list of expressions that specify the columns that the connector uses to form custom message keys for change event records that it publishes to the Kafka topics for specified tables.

By default, Debezium uses the primary key column of a table as the message key for records that it emits. In place of the default, or to specify a key for tables that lack a primary key, you can configure custom message keys based on one or more columns.

To establish a custom message key for a table, list the table, followed by the columns to use as the message key. Each list entry takes the following format:

<fully-qualified_tableName>:_<keyColumn>_,<keyColumn>

To base a table key on multiple column names, insert commas between the column names.

Each fully-qualified table name is a regular expression in the following format:

<databaseName>.<tableName>

The property can include entries for multiple tables. Use a semicolon to separate table entries in the list. The following example sets the message key for the tables inventory.customers and purchase.orders:

inventory.customers:pk1,pk2;(.*).purchaseorders:pk3,pk4

For the table inventory.customer, the columns pk1 and pk2 are specified as the message key. For the purchaseorders tables in any database, the columns pk3 and pk4 server as the message key.

There is no limit to the number of columns that you use to create custom message keys. However, it’s best to use the minimum number that are required to specify a unique key.

  • Type: list
  • Default: empty string
publication.autocreate.mode

Applies only when streaming changes by using the pgoutput plug-in. The setting determines how creation of a publication should work.

  • Default: all_tables
  • Valid values: [all_tables, disabled, filtered]
replica.identity.autoset.values

A comma-separated list of regular expressions that match fully-qualified tables and the replica identity value to be used in a table. This property determines the value for replica identity at the table level and will overwrite the existing value in the database. For more details about this property, see the Debezium documentation.

  • Type: list of strings
  • Default: empty string
binary.handling.mode

Specifies how binary (bytea) columns should be represented in change events.

  • Type: bytes or string
  • Importance: low
  • Valid values: [bytes, base4, hex]
schema.name.adjustment.mode

Specifies how schema names should be adjusted for compatibility with the message converter used by the connector. Possible settings are: avro, which replaces the characters that cannot be used in the Avro type name with an underscore, and none, which does not apply any adjustment.

  • Type: string
  • Default: avro
field.name.adjustment.mode

Specifies how schema names should be adjusted for compatibility with the message converter used by the connector. The following are possible settings:

  • avro: Replaces the characters that cannot be used in the Avro type name with an underscore
  • none: Does not apply any adjustment
  • avro_unicode: Replaces the underscore or characters that cannot be used in the Avro type name with corresponding unicode like _uxxxx. Note that _ is an escape sequence like backslash in Java.

For more details, see Avro naming.

  • Type: string
  • Default: none
money.fraction.digits

Specifies how many decimal digits should be used when converting Postgres money type to java.math.BigDecimal, which represents the values in change events. Applicable only when decimal.handling.mode is set to precise.

  • Type: int
  • Default: 2
message.prefix.include.list

An optional, comma-separated list of regular expressions that match names of logical decoding message prefixes for which you want to capture. Any logical decoding message with a prefix not included in message.prefix.include.list is excluded. Do not also set the message.prefix.exclude.list parameter when setting this property.

For information about the structure of message events and about their ordering semantics, see message events.

  • Type: list of strings
  • Default: By default, all logical decoding messages are captured.
message.prefix.exclude.list

An optional, comma-separated list of regular expressions that match names of logical decoding message prefixes for which you do not to capture. Any logical decoding message with a prefix that is not included in message.prefix.exclude.list is included. Do not also set the message.prefix.include.list parameter when setting this property. To exclude all logical decoding messages pass .* into this config.

  • Type: list of strings
  • Default: No default

Advanced properties

converters

Enumerates a comma-separated list of the symbolic names of the custom converter instances that the connector can use. For more details about this property, see the Debezium documentation

  • Default: No default
snapshot.mode

The criteria for running a snapshot upon startup of the connector.

  • Type: string
  • Importance: medium
  • Default: initial
  • Valid values: [always, initial, initial_only never, custom]
snapshot.custom.class
A full Java class name that is an implementation of the io.debezium.connector.postgresql.spi.Snapshotter interface. Required when the snapshot.mode property is set to custom.
snapshot.include.collection.list

A optional comma-separated list of regular expressions that match the fully-qualified names (<schemaName>.<tableName>) of the tables to include in a snapshot. The specified items must be named in the connector’s table.include.list property. This property takes effect only if the connector’s snapshot.mode property is set to a value other than never.

  • Default: All tables specified in table.include.list
snapshot.lock.timeout.ms

Positive integer value that specifies the maximum amount of time (in milliseconds) to wait to obtain table locks when performing a snapshot. If table locks cannot be acquired in this time interval, the snapshot will fail.

  • Type: string
  • Importance: low
  • Default: 10000
snapshot.select.statement.overrides

Controls which rows from tables will be included in snapshot. This property contains a comma-separated list of fully-qualified tables (DB_NAME.TABLE_NAME). Select statements for the individual tables are specified in additional configuration properties, one for each table, identified by the id

snapshot.select.statement.overrides.[DB_NAME].[TABLE_NAME]. The value of these properties is the SELECT statement to use when retrieving data from the specific table during the snapshot process. A possible use case for large append-only tables is setting a specific point where to start (resume) the snapshot process, in case a previous snapshot process was interrupted.

Note that this setting has impact on snapshots only. Events generated by logical decoder are not affected by it at all.

  • Type: list of strings
  • Importance: low
  • Default: No default
event.processing.failure.handling.mode

Specifies how the connector should react to exceptions during processing of events.

  • Default: fail
  • Valid values: [fail, warn, skip]
max.queue.size

Positive integer value that specifies the maximum size of the blocking queue into which change events received during streaming replication are placed before they are written to Kafka. This queue can provide backpressure when, for example, writes to Kafka are slower, or if Kafka is not available.

  • Type: int
  • Importance: low
  • Default: 8192
max.batch.size

Positive integer value that specifies the maximum size of each batch of events that should be processed during each iteration of this connector.

  • Type: int
  • Importance: low
  • Default: 2048
max.queue.size.in.bytes

Long value for the maximum size in bytes of the blocking queue. The feature is disabled by default, it will be active if it’s set with a positive long value.

  • Type: long
  • Default: 0
poll.interval.ms

Positive integer value that specifies the number of milliseconds the connector should wait during each iteration for new change events to appear. Defaults to 500 milliseconds.

  • Type: int
  • Importance: low
  • Default: 500
include.unknown.datatypes

When Debezium encounters a field whose data type is unknown, the field is omitted from the change event and a warning is logged (the default). In some cases it may be preferable to include the field and send it downstream to clients in an opaque binary representation so the clients can decode it. Set to false to filter unknown data from events and true to keep them in binary format.

Note

Clients risk backward compatibility issues with this setting. Not only may the database-specific binary representation change between releases, but when the datatype is eventually supported, it will be sent downstream as a logical type, requiring adjustments by consumers. In general, when encountering unsupported data types, please file a feature request so that support can be added.

  • Type: boolean
  • Importance: low
  • Default: false
database.initial.statements

A semicolon-separated list of SQL statements to be executed when a JDBC connection (not the transaction log reading connection) to the database is established. Use a double semicolon (;;) to use a semicolon as a character and not as a delimiter.

  • Type: list of strings
  • Importance: low
  • Default: No default
status.update.interval.ms

Frequency for sending replication connection status updates to the server, given in milliseconds. The property also controls how frequently the database status is checked to detect a dead connection in case the database was shut down.

  • Type: int
  • Default: 10000
heartbeat.interval.ms

Controls how frequently heartbeat messages are sent. This property (which is disabled by default) contains an interval in milliseconds that defines how frequently the connector sends messages to a heartbeat topic. This can be used to monitor whether the connector is still receiving change events from the database. You also should use heartbeat messages when records in non-captured tables are changed for a longer period of time. In this case, the connector proceeds to read the log from the database but never emits any change messages into Kafka. This means that no offset updates are committed to Kafka.

This causes WAL files to be retained by the database longer than needed because the connector processed the files already but did not flush the latest retrieved Log Sequence Number (LSN) to the database. Using heartbeat messages may also result in more re-sent change events after a connector restart. Set this parameter to 0 to not send heartbeat messages.

  • Type: int
  • Importance: low
  • Default: 0
heartbeat.action.query

Specifies a query that the connector executes on the source database when the connector sends a heartbeat message. For more details, see the Debezium

  • Default: No default
schema.refresh.mode

Specify the conditions that trigger a refresh of the in-memory schema for a table. columns_diff (the default) is the safest mode. This setting ensures the in-memory schema stays in-sync with the database table schema.

columns_diff_exclude_unchanged_toast instructs the connector to refresh the in-memory schema cache if there is a discrepancy between it and the schema derived from the incoming message, unless unchanged TOASTable data fully accounts for the discrepancy.

This setting can improve connector performance significantly if there are frequent table updates for tables that have TOASTed data which are rarely part of the updates. However, it is possible for the in-memory schema to become outdated if TOASTable columns are dropped from the table.

  • Type: list of strings separated
  • Importance: low
  • Default: columns_diff
snapshot.delay.ms

An interval in milliseconds that the connector should wait before taking a snapshot after starting up. This setting can be used to avoid snapshot interruptions when starting multiple connectors in a cluster, which can cause connector re-balancing.

  • Type: int
  • Importance: low
  • Default: No default
snapshot.fetch.size

During a snapshot, the connector reads table content in batches of rows. This property specifies the maximum number of rows in a batch.

  • Type: int
  • Default: 10240
slot.stream.params

Optional list of parameters to be passed to the configured logical decoding plugin. This optional list can be used to enable server-side table filtering when using the wal2json plugin. Allowed values depend on the chosen plugin and are separated by semicolon (for example, add-tables=`public.table,public.table2;include-lsn=true.

  • Type: int
  • Importance: low
  • Default: No default
slot.max.retries

If connecting to a replication slot fails, this is the maximum number of consecutive attempts to connect.

  • Type: int
  • Default: 6
slot.retry.delay.ms

The number of milliseconds to wait between retry attempts when the connector fails to connect to a replication slot.

  • Type: int
  • Default: 10000 (10 seconds)
unavailable.value.placeholder

Specifies the constant that the connector provides to indicate that the original value is a toasted value that is not provided by the database. If the setting of unavailable.value.placeholder starts with the hex: prefix, it is expected that the rest of the string represents hexadecimally encoded octets. For more details, see the Debezium documentation

  • Type: string
  • Default: __debezium_unavailable_value
provide.transaction.metadata

Determines whether the connector generates events with transaction boundaries and enriches change event envelopes with transaction metadata. Specify true if you want the connector to do this.

  • Type: boolean
  • Default: false
flush.lsn.source

Determines whether the connector should commit the LSN of the processed records in the source postgres database so that the WAL logs can be deleted. Specify false if you don’t want the connector to do this. Please note that if set to false, LSN will not be acknowledged by Debezium and as a result the WAL logs will not be cleared which might result in disk space issues. Users are expected to handle the acknowledgement of LSN outside Debezium.

  • Type: boolean
  • Default: true
retriable.restart.connector.wait.ms

The number of milliseconds to wait before restarting a connector after a retriable error occurs.

  • Type: int
  • Default: 10000
skipped.operations

A comma-separated list of operation types that will be skipped during streaming. Possible values include: c for inserts/create, u for updates, d for deletes, t for truncates, and none to not skip any operation.

  • Type: list
  • Default: t
signal.data.collection

Fully-qualified name of the data collection that is used to send signals to the connector. Use the following format to specify the collection name: <schemaName>.<tableName>.

  • Type: string
  • Default: No default
signal.enabled.channels

A list of the signal channel names that are enabled for the connector. The following channels are available by default: source, kafka, file and jmx.

Note that you may also implement a custom signaling channel.

  • Type: string
  • Default: source
notification.enabled.channels

A list of the notification channel names that are enabled for the connector. The following channels are available by default: sink, log and jmx.

Note that you may also implement a custom notification channel.

  • Type: string
  • Default: No default
incremental.snapshot.chunk.size

The number of rows fetched from the database for each incremental snapshot iteration.

  • Type: int
  • Default: 1024
xmin.fetch.interval.ms

How often, in milliseconds, the XMIN will be read from the replication slot. The XMIN value provides the lower bounds of where a new replication slot could start from.

  • Type: int
  • Default: 0 (disable tracking XMIN tracking)
topic.naming.strategy

The name of the TopicNamingStrategy class that should be used to determine the topic name for data change, schema change, transaction, heartbeat event, and so forth.

  • Type: string
  • Defaults: io.debezium.schema.DefaultTopicNamingStrategy
topic.delimiter

Specifies the delimiter for topic name.

  • Type: string
  • Default: .
topic.cache.size

The size used for holding the topic names in a bounded concurrent hash map. This cache will help to determine the topic name corresponding to a given data collection.

  • Type: int
  • Default: 10000
topic.heartbeat.prefix

Sets the name of the topic to which heartbeat messages are sent. The topic is named according to the pattern <topics.heartbeat.prefix>.<topic.prefix>. For example, if the topic prefix is fulfillment, the default topic name is __debezium-heartbeat.fulfillment.

  • Type: string
  • Importance: low
  • Default: __debezium-heartbeat
topic.transaction

Controls the name of the topic to which the connector sends transaction metadata messages. The topic name has the following pattern: <topic.prefix>.<topic.transaction>. For example, if the topic prefix is fulfillment, the default topic name is fulfillment.transaction.

  • Type: string
  • Default: transaction
snapshot.max.threads

Positive integer value that specifies the maximum number of threads used to perform an initial sync of the collections in a replica set.

  • Type: int
  • Importance: medium
  • Default: 1
custom.metric.tagss

A comma-separated list of key-value pairs to customize the MBean object name which should be appended to the end of the regular name. For more details, see the Debezium documentation.

  • Type: list of strings
  • Default: No default
errors.max.retries

The maximum number of retries on retriable errors (for example, connection errors) before failing.

  • Type: int
  • Default: -1

Auto topic creation

For more information about Auto topic creation, see Configuring Auto Topic Creation for Source Connectors.

Configuration properties accept regular expressions (regex) that are defined as Java regex.

topic.creation.groups

A list of group aliases that are used to define per-group topic configurations for matching topics. A default group always exists and matches all topics.

  • Type: List of String types
  • Default: empty
  • Possible Values: The values of this property refer to any additional groups. A default group is always defined for topic configurations.
topic.creation.$alias.replication.factor

The replication factor for new topics created by the connector. This value must not be larger than the number of brokers in the Kafka cluster. If this value is larger than the number of Kafka brokers, an error occurs when the connector attempts to create a topic. This is a required property for the default group. This property is optional for any other group defined in topic.creation.groups. Other groups use the Kafka broker default value.

  • Type: int
  • Default: n/a
  • Possible Values: >= 1 for a specific valid value or -1 to use the Kafka broker’s default value.
topic.creation.$alias.partitions

The number of topic partitions created by this connector. This is a required property for the default group. This property is optional for any other group defined in topic.creation.groups. Other groups use the Kafka broker default value.

  • Type: int
  • Default: n/a
  • Possible Values: >= 1 for a specific valid value or -1 to use the Kafka broker’s default value.
topic.creation.$alias.include

A list of strings that represent regular expressions that match topic names. This list is used to include topics with matching values, and apply this group’s specific configuration to the matching topics. $alias applies to any group defined in topic.creation.groups. This property does not apply to the default group.

  • Type: List of String types
  • Default: empty
  • Possible Values: Comma-separated list of exact topic names or regular expressions.
topic.creation.$alias.exclude

A list of strings representing regular expressions that match topic names. This list is used to exclude topics with matching values from getting the group’s specfic configuration. $alias applies to any group defined in topic.creation.groups. This property does not apply to the default group. Note that exclusion rules override any inclusion rules for topics.

  • Type: List of String types
  • Default: empty
  • Possible Values: Comma-separated list of exact topic names or regular expressions.
topic.creation.$alias.${kafkaTopicSpecificConfigName}

Any of the Changing Broker Configurations Dynamically for the version of the Kafka broker where the records will be written. The broker’s topic-level configuration value is used if the configuration is not specified for the rule. $alias applies to the default group as well as any group defined in topic.creation.groups.

  • Type: property values
  • Default: Kafka broker value

For more details, see the Debezium connector properties documentation.

CSFLE configuration

csfle.enabled

Accepts a boolean value. CSFLE is enabled for the connector if csfle.enabled is set to True.

  • Type: boolean
  • Default: False

auto.register.schemas

Specifies if the Serializer should attempt to register the Schema with Schema Registry.

  • Type: boolean
  • Default: true
  • Importance: medium

use.latest.version

Only applies when auto.register.schemas is set to false. If auto.register.schemas is set to false and use.latest.version is set to true, then instead of deriving a schema for the object passed to the client for serialization, Schema Registry uses the latest version of the schema in the subject for serialization.

  • Type: boolean
  • Default: true
  • Importance: medium

Note

Portions of the information provided here derives from documentation originally produced by the Debezium Community. Work produced by Debezium is licensed under Creative Commons 3.0.