Important
You are viewing documentation for an older version of Confluent Platform. For the latest, click here.
Integrating With A PostgresDB¶
This tutorial demonstrates a simple workflow to integrate KSQL with an instance of PostgresDB.
Prerequisites:
- Confluent Platform is installed an running. This installation includes a Kafka broker, KSQL, ZooKeeper, Schema Registry and Connect.
- If you installed Confluent Platform via TAR or ZIP, navigate into the installation directory. The paths and commands used throughout this tutorial assume that you are in this installation directory.
- Consider installing the Confluent CLI to start a local installation of Confluent Platform.
- Java: Minimum version 1.8. Install Oracle Java JRE or JDK >= 1.8 on your local machine
Installing JDBC Source Connector Plugin¶
If you installed Kafka Connect via Confluent Platform, then it comes with an installation of the JDBC source connector. Otherwise, install it via Confluent Hub.
Installing Postgres via Docker¶
If you are just playing around with the KSQL-Connect integration and do not have a PostgresDB instance locally, you can install it by using Docker and populate some data:
Install postgres by using the
docker pull postgres
command.Start the DB and expose the JDBC port:
docker run -p 5432:5432 --name some-postgres -e POSTGRES_USER=$USER -e POSTGRES_DB=$USER -d postgres
Run PSQL to generate some data
docker exec -it some-postgres psql -U $USER psql (11.5 (Debian 11.5-1.pgdg90+1)) Type "help" for help. postgres=# CREATE TABLE users (username VARCHAR, popularity INT); CREATE TABLE postgres=# INSERT INTO users (username, popularity) VALUES ('user1', 100); INSERT 0 1 postgres=# INSERT INTO users (username, popularity) VALUES ('user2', 5); INSERT 0 1 postgres=# INSERT INTO users (username, popularity) VALUES ('user3', 75); INSERT 0 1
When you’re done, clear your local state by using the docker kill some-postgres && docker rm some-postgres
command.
Create a JDBC Source Connector¶
Now that Postgres is up and running with a database for your user, you can connect to it via KSQL.
If you’re using the default configurations, KSQL connects automatically to your Connect cluster.
Otherwise, you must change the ksql.connect.url
property to point to your Connect deployment.
ksql> CREATE SOURCE CONNECTOR `jdbc-connector` WITH(\ "connector.class"='io.confluent.connect.jdbc.JdbcSourceConnector',\ "connection.url"='jdbc:postgresql://localhost:5432/YOUR_USERNAME',\ "mode"='bulk',\ "topic.prefix"='jdbc-',\ "key"='username');
Profit¶
At this point, data should automatically start flowing in from Postgres to KSQL. Confirm this
by running DESCRIBE CONNECTOR "jdbc-connector";
. Your output should resemble:
Name : jdbc-connector
Class : io.confluent.connect.jdbc.JdbcSourceConnector
Type : source
State : RUNNING
WorkerId : 10.200.7.69:8083
Task ID | State | Error Trace
---------------------------------
0 | RUNNING |
---------------------------------
Related Topics
----------------
jdbc-users
----------------
Import that topic as a table to KSQL CREATE TABLE JDBC_USERS WITH(value_format='AVRO', kafka_topic='jdbc-users');
and select everything from the topic to see how it gets auto populated:
ksql>SELECT * FROM JDBC_USERS EMIT CHANGES;
+------------------+------------------+------------------+------------------+
|ROWTIME |ROWKEY |USERNAME |POPULARITY |
+------------------+------------------+------------------+------------------+
|1566336783102 |user1 |user1 |100 |
|1566336783102 |user2 |user2 |5 |
|1566336783102 |user3 |user3 |75 |
|1566336788106 |user1 |user1 |100 |
|1566336788106 |user2 |user2 |5 |
|1566336788106 |user3 |user3 |75 |
Note that users are repeated multiple times. This is bulk
mode is specified, which re-imports
the entire database every time. Obviously, this isn’t appropriate for production. For more information
on changelog capture, see Incremental Query Modes.