PostgreSQL Source Connector for Confluent Cloud¶
If you are installing the connector locally for Confluent Platform, see JDBC Connector (Source and Sink) for Confluent Platform.
The Kafka Connect PostgreSQL Source connector can obtain a snapshot of the existing data in a PostgreSQL database and then monitor and record all subsequent row-level changes to that data. All of the events for each table are recorded in a separate Apache Kafka® topic. The events can then be easily consumed by applications and services. Note that deleted records are not captured.
The Confluent Cloud PostgreSQL source connector provides the following features:
timestamp mode is enabled when only a timestamp column is specified when you enter database details.
timestamp+incrementing mode is enabled when both a timestamp column and incrementing column are specified when you enter database details.
A timestamp column must not be nullable.
Database authentication: password authentication.
Data formats: Avro, JSON. For JSON, the underlying default configuration property is changed to
Select configuration properties:
Configuration properties that are not shown in the Confluent Cloud UI use the default values. See JDBC Source Connector Configuration Properties for default values and property definitions.
For more information, see the Confluent Cloud connector limitations.
Use this quick start to get up and running with the Confluent Cloud PostgreSQL source connector. The quick start provides the basics of selecting the connector and configuring it to obtain a snapshot of the existing data in a PostgreSQL database and then monitoring and recording all subsequent row-level changes.
Authorized access to a Confluent Cloud cluster on Amazon Web Services (AWS), Microsoft Azure (Azure), or Google Cloud Platform (GCP).
The Confluent Cloud CLI installed and configured for the cluster. See Install and Configure the Confluent Cloud CLI.
Public access must be enabled for your database. The example below shows the AWS Management Console when setting up a PostgreSQL database.
Public inbound traffic access (
0.0.0.0/0) must be allowed to the VPC where the database is located. The example below shows the AWS Management Console when setting up security group rules for the VPC.
See your specific cloud platform documentation for how to configure security rules for your VPC.
At least one topic must exist before creating the connector. If you want a topic prefix, the name of the topic you create must include the prefix.
Confluent Cloud Schema Registry must be enabled if you use Avro.
Use one of the following for the Kafka cluster credentials fields:
- A Confluent Cloud API key and secret. After you have created your cluster, go to Cluster settings > API access > Create Key.
- A Confluent Cloud service account.
Step 1: Launch your Confluent Cloud cluster.¶
See the Confluent Cloud Quick Start for installation instructions.
Step 2: Add a connector.¶
Click Connectors > Add connector.
Step 3: Select your connector.¶
Click the PostgreSQL Source connector icon.
Step 4: Set up the connection.¶
Complete the following and click Continue.
Make sure you have all your prerequisites completed.
Enter a connector name.
Enter your Kafka Cluster credentials. The credentials are either the API key and secret or the service account API key and secret.
Enter the topic prefix for the database table name. You use this configuration to specify a Kafka topic or topics, since this connector creates a topic or topics directly based on table names from your database.
You must create the topic names before before creating and launching this connector. To use a topic prefix, you must name the topic using the prefix beforehand. For example, if the database table being sourced is named
productsand you want the topic to have a prefix
list-, name your topic
list-products. You would then enter the prefix
list-in the prefix field.
Add the connection details for the database.
Add the Database details for your database. Review the following notes for more information about field selections.
- Enter a Timestamp column name to enable timesamp mode. This mode uses a timestamp (or timestamp-like) column to detect new and modified rows. This assumes the column is updated with each write, and that values are monotonically incrementing, but not necessarily unique.
- Enter both a Timestamp column name and an Incrementing column name to enable timestamp+incrementing mode. This mode uses two columns, a timestamp column that detects new and modified rows, and a strictly incrementing column which provides a globally unique ID for updates so each row can be assigned a unique stream offset.
- By default, the connector only detects tables with type TABLE from the source database. Use VIEW for virtual tables created from joining one or more tables. Use ALIAS for tables with a shortened or temporary name.
Add the Connection details for your connection to the database.
Select how your messages are formatted.
Step 5: Launch the connector.¶
Verify the connection details and click Launch.
Step 6: Check the connector status.¶
The status for the connector should go from Provisioning to Running. It may take a few minutes.
Step 7: Check the Kafka topic.¶
After the connector is running, verify that messages are populating your Kafka topic.
For additional information about this connector see JDBC Source Connector for Confluent Platform. Note that not all Confluent Platform connector features are provided in the Confluent Cloud connector.