Debezium PostgreSQL Source Connector for Confluent Platform¶
The Debezium PostgreSQL Connector is a source connector that can obtain a snapshot of the existing data in a PostgreSQL database and then monitor and record all subsequent row-level changes to that data. All of the events for each table are recorded in a separate Apache Kafka® topic, where they can be easily consumed by applications and services.
- Confluent supports Debezium PostgreSQL connector version 0.9.3 and later.
- Confluent supports using this connector with PostgreSQL 9.6, 10, 11, 12, 13, 14, 15, and 16.
- Databases hosted by a service such as Heroku Postgres can’t be monitored with Debezium, since you may be unable to install the logical decoding plugin.
Features¶
The Debezium PostgreSQL Source connector includes the following features:
- At least once delivery
- Supports one task
- Automatic topic creation
- CSFLE (Client-side Field level encryption)
At least once delivery¶
The connector guarantees that records are delivered at least once to the Kafka topic. If a fault occurs (for example, if there are network connectivity issues), or the connector restarts, you may see some duplicate records in the Kafka topic.
Supports one task¶
The Debezium PostgreSQL Source connector supports running only one task.
Automatic topic creation¶
The connector will create the internal database history Kafka topic if it doesn’t exist.
CSFLE (Client-side Field level encryption)¶
This connector supports the CSFLE functionality. For more information, see Manage CSFLE.
Schema definition considerations¶
For naming and versioning consistency, Debezium schemas are defined in a central
point, unlike in earlier versions. If you use Schema Registry, you may experience schema
compatibility issues. The current workaround is to set the schema compatibility
to NONE
if you want to upgrade from version 1.x. For more details about
Debezium schema naming and versioning, see the Debezium 2.0.0 release
documentation.
Install the Postgres connector¶
You can install this connector by using the instructions or you can manually download the ZIP file.
confluent connect plugin install debezium/debezium-connector-postgresql:latest
You can install a specific version by replacing latest
with a version number. For example:
confluent connect plugin install debezium/debezium-connector-postgresql:<version-number>
The Debezium PostgreSQL Source connector has specific ACL requirements. See the ACL requirements for Debezium Source connectors to ensure you meet the specified requirements.
Install the connector manually¶
Download and extract the ZIP file for your connector and then follow the manual connector installation instructions.
License¶
The Debezium PostgreSQL connector is an open source connector and does not require a Confluent Enterprise License.
Configuration properties¶
For a complete list of configuration properties for this connector, see Configuration Reference for Debezium PostgreSQL Source Connector for Confluent Platform.
Setting up PostgreSQL¶
Before using the Debezium PostgreSQL connector to monitor the changes committed on a PostgreSQL server, first install the logical decoding plugin into the PostgreSQL server. Enable a replication slot and configure a user with sufficient privileges to perform the replication.
To monitor a PostgreSQL database running in Amazon RDS, refer to the Debezium documentation for PostgreSQL on AmazonRDS.
Enable logical decoding and replication on the PostgreSQL server¶
The Postgres relational database management system has a feature called logical decoding that allows clients to extract all persistent changes to database tables into a coherent format. This formatted data can be interpreted without detailed knowledge of the internal state of the database. An output plugin transforms the data from the write-ahead log’s internal representation into a format the consumer of a replication slot needs.
The Debezium PostgreSQL connector works with one of the following supported logical decoding plugins from Debezium:
Install the wal2json
plugin¶
Before executing the commands, make sure the user has write-privilege to the
wal2json
library at the PostgreSQL lib directory. Note that for the test
environment, this directory is /usr/pgsql-9.6/lib/
. In the test environment
set the export path as shown below:
export PATH="$PATH:/usr/pgsql-9.6/bin"
Enter the wal2json installation commands.
git clone https://github.com/eulerto/wal2json -b master --single-branch \
&& cd wal2json \
&& git checkout 92b33c7d7c2fccbeb9f79455dafbc92e87e00ddd \
&& make && make install \
&& cd .. \
&& rm -rf wal2json
Enable replication on the PostgreSQL server¶
Add the following lines to the end of the
/usr/share/postgresql/postgresql.conf
PostgreSQL configuration file. These
lines include the plugin at the shared libraries and adjust some Write-Ahead Log
(WAL) and streaming replication settings.
# LOGGING
log_min_error_statement = fatal
# CONNECTION
listen_addresses = '*'
# MODULES
shared_preload_libraries = 'decoderbufs'
# REPLICATION
wal_level = logical # minimal, archive, hot_standby, or logical (change requires restart)
max_wal_senders = 1 # max number of walsender processes (change requires restart)
#wal_keep_segments = 4 # in logfile segments, 16MB each; 0 disables
#wal_sender_timeout = 60s # in milliseconds; 0 disables
max_replication_slots = 1 # max number of replication slots (change requires restart)
Note that in the PostgreSQL configuration file you may also set
shared_preload_libraries
to the following value:
...
# MODULES
shared_preload_libraries = 'wal2json'
...
Initialize replication permissions¶
Add the following lines to the end of the pg_hba.conf
PostgreSQL configuration file. These lines configure the client authentication for the database replication.
############ REPLICATION ##############
local replication postgres trust
host replication postgres 127.0.0.1/32 trust
host replication postgres ::1/128 trust
Quick start¶
The Debezium PostgreSQL Connector is a source connector that can record events for each table in a separate Kafka topic, where they can be easily consumed by applications and services.
For an example of how to get Kafka Connect connected to Confluent Cloud, see Connect Self-Managed Kafka Connect to Confluent Cloud.
Install the connector¶
Refer to the Debezium tutorial if you want to use Docker images to set up Kafka, ZooKeeper and Connect. For the following tutorial, you need to have a local Confluent Platform installation. Note that as of Confluent Platform 7.5, ZooKeeper is deprecated for new deployments. Confluent recommends KRaft mode for new deployments.
Navigate to your Confluent Platform installation directory and run the following command to install the connector:
confluent connect plugin install debezium/debezium-connector-postgresql:<version-number>
Note that adding a new connector plugin requires restarting Kafka Connect. Use the Confluent CLI to restart Kafka Connect by running the two commands:
confluent local services connect stop confluent local services connect start
You should see output similar to the following:
Using CONFLUENT_CURRENT: /Users/username/Sandbox/confluent-snapshots/var/confluent.NuZHxXfq Starting Zookeeper Zookeeper is [UP] Starting Kafka Kafka is [UP] Starting Schema Registry Schema Registry is [UP] Starting Kafka REST Kafka REST is [UP] Starting Connect Connect is [UP]
Check if the PostgreSQL plugin has been installed correctly and picked up by the plugin loader.
curl -sS localhost:8083/connector-plugins | jq '.[].class' | grep postgres "io.debezium.connector.postgresql.PostgresConnector"
Set up PostgreSQL using Docker (optional)¶
If you do not have a native installation of PostgreSQL, you may use the
following command to start a new container to run a PostgreSQL database server
preconfigured with the logical decoding
plugin, replication slot and an
inventory
test database.
# Pull docker image
docker pull debezium/example-postgres
# Run docker container
docker run -it --rm --name postgres -p 5432:5432 \
-e POSTGRES_USER=postgres -e POSTGRES_PASSWORD=postgres \
debezium/example-postgres
# In a separate terminal, launch psql to run SQL queries:
docker run -it --rm --name psql_client \
-e PGOPTIONS="--search_path=inventory" \
-e PGPASSWORD=postgres --link postgres:postgres debezium/example-postgres \
psql -h postgres -U postgres
# To see the list of relations in the inventory database, type \d at the postgres prompt. To exit, type \q
Enable logical decoding on the PostgreSQL server¶
Logical encoding is already enabled if you set up PostgreSQL using the Docker image (in the previous section). On your native installation, follow these steps to Enable logical decoding and replication on the PostgreSQL server.
Start the Debezium PostgreSQL connector¶
Create the file
register-postgres.json
to store the following connector configuration:{ "name": "inventory-connector", "config": { "connector.class": "io.debezium.connector.postgresql.PostgresConnector", "tasks.max": "1", "database.hostname": "0.0.0.0", "database.port": "5432", "database.user": "postgres", "database.password": "postgres", "database.dbname" : "postgres", "topic.prefix": "dbserver1", "schema.include.list": "inventory" } }
Start the connector.
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-postgres.json
Start your Kafka consumer¶
Start the consumer in a new terminal session.
confluent local services kafka consume dbserver1.inventory.customers --from-beginning
When you enter SQL queries in bash (to add or modify records in the database) messages populate and are displayed on your consumer terminal to reflect those records.
The following is an example psql query to update a record in the customers table.
update customers set first_name = 'Sarah' where id = 1001;
Clean up resources¶
Delete the connector and stop Confluent services.
curl -X DELETE localhost:8083/connectors/inventory-connector confluent local stop
Stop PostgreSQL containers.
docker stop psql_client # Alternatively type \q at the psql prompt docker stop postgres
Note
Portions of the information provided here derive from documentation originally produced by the Debezium Community. Work produced by Debezium is licensed under Creative Commons 3.0.