.. _postgres-source-connector:

Debezium PostgreSQL Source Connector
------------------------------------

The `Debezium PostgreSQL Connector`_ is a source connector that can obtain a snapshot of the existing data in a PostgreSQL database and then monitor and record all subsequent row-level changes to that data. All of the events for each table are recorded in a separate |ak-tm| topic, where they can be easily consumed by applications and services.

* Confluent supports Debezium PostgreSQL connector version 0.9.3 and later. 
* Confluent supports using this connector with PostgreSQL 9.6, 10, 11. 
* Databases hosted by a service such as `Heroku Postgres`_ cannot be monitiored with Debezium, since you may be unable to install the the `logical decoding`_ plugin. 

Install the Postgres Connector
==============================

.. include:: ../includes/connector-install.rst

.. codewithvars:: bash

   confluent-hub install debezium-connector-postgresql:latest

.. include:: ../includes/connector-install-version.rst

.. codewithvars:: bash

   confluent-hub install debezium/debezium-connector-postgresql:0.9.4
   
License
=======

The Debezium PostgreSQL connector is an open source connector and does not require a Confluent Enterprise License.

Setting up PostgreSQL
=====================

Before using the Debezium PostgreSQL connector to monitor the changes committed on a PostgreSQL server, first install the `logical decoding plugin`_ into the PostgreSQL server. Enable a replication slot and configure a user with sufficient privileges to perform the replication.

To monitor a PostgreSQL database running in Amazon RDS, refer to the Debezium documentation for `PostgreSQL on AmazonRDS`_.

.. _enable-logical-decoding:

Enable Logical Decoding and Replication on the PostgreSQL server
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

The Postgres relational database management system has a feature called logical decoding that allows clients to extract all persistent changes to database tables into a coherent format. This formatted data can be interpreted without detailed knowledge of the internal state of the database. An output plugin transforms the data from the write-ahead log's internal representation into a format the consumer of a replication slot needs.

The Debezium PostgreSQL connector works with one of the following supported logical decoding plugins from Debezium:

- `protobuf`_ : To encode changes in Protobuf format
- `wal2json`_ : To encode changes in JSON format
 
Installing the **wal2json** plugin
""""""""""""""""""""""""""""""""""

Before executing the commands, make sure the user has write-privilege to the ``wal2json`` library at the PostgreSQL lib directory. Note that for the test environment, this directory is ``/usr/pgsql-9.6/lib/``. In the test environment set the export path as shown below: 

::

   export PATH="$PATH:/usr/pgsql-9.6/bin"

Enter the **wal2json** installation commands.

::

   git clone https://github.com/eulerto/wal2json -b master --single-branch \
   && cd wal2json \
   && git checkout d2b7fef021c46e0d429f2c1768de361069e58696 \
   && make && make install \
   && cd .. \
   && rm -rf wal2json


Enable Replication on the PostgreSQL server
"""""""""""""""""""""""""""""""""""""""""""

Add the following lines to the end of the ``/usr/share/postgresql/postgresql.conf`` PostgreSQL configuration file. These lines include the plugin at the shared libraries and adjust some Write-Ahead Log (WAL) and streaming replication settings. 
  
::

  # LOGGING
  log_min_error_statement = fatal
  # CONNECTION
  listen_addresses = '*'
  # MODULES
  shared_preload_libraries = 'decoderbufs'
  # REPLICATION
  wal_level = logical             # minimal, archive, hot_standby, or logical (change requires restart)
  max_wal_senders = 1             # max number of walsender processes (change requires restart)
  #wal_keep_segments = 4          # in logfile segments, 16MB each; 0 disables
  #wal_sender_timeout = 60s       # in milliseconds; 0 disables
  max_replication_slots = 1       # max number of replication slots (change requires restart)

Initialize Replication Permissions
""""""""""""""""""""""""""""""""""

Add the following lines to the end of the ``pg_hba.conf`` PostgreSQL configuration file. These lines configure the client authentication for the database replication. 
  
::

    ############ REPLICATION ##############
    local   replication     postgres                          trust		
    host    replication     postgres  127.0.0.1/32            trust		
    host    replication     postgres  ::1/128                 trust	



.. _postgres-source-connector-quickstart:

Quick Start
===========

The `Debezium PostgreSQL Connector`_  is a source connector that can record events for each table in a separate |ak| topic,
where they can be easily consumed by applications and services.

Install the Connector
^^^^^^^^^^^^^^^^^^^^^

Refer to the `Debezium tutorial`_ if you want to use Docker images to set up |ak|, |zk| and
|kconnect|. For the following tutorial, you need to have a local |cp| installation.

Navigate to your Confluent Platform installation directory and run the following
command to install the connector:

.. codewithvars:: bash

 confluent-hub install debezium/debezium-connector-postgresql:0.9.4

Adding a new connector plugin requires restarting Connect. Use the
Confluent CLI to restart Connect.

.. codewithvars:: bash

 |confluent_stop| connect && |confluent_start| connect
 Using CONFLUENT_CURRENT: /Users/username/Sandbox/confluent-snapshots/var/confluent.NuZHxXfq
 Starting zookeeper
 zookeeper is [UP]
 Starting kafka
 kafka is [UP]
 Starting schema-registry
 schema-registry is [UP]
 Starting kafka-rest
 kafka-rest is [UP]
 Starting connect
 connect is [UP]

Check if the PostgreSQL plugin has been installed correctly and picked
up by the plugin loader.

.. codewithvars:: bash

 curl -sS localhost:8083/connector-plugins | jq .[].class | grep postgres
 "io.debezium.connector.postgresql.PostgresConnector"

Set up PostgreSQL using Docker (Optional)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

If you do not have a native installation of PostgreSQL, you may use the following command to start a new container to run a PostgreSQL database server preconfigured with the `logical decoding`_ plugin, `replication slot`_ and an ``inventory`` test database.

.. codewithvars:: bash

 # Pull docker image
 docker pull debezium/example-postgres
 
 # Run docker container
 docker run -it --rm --name postgres -p 5432:5432 \
 -e POSTGRES_USER=postgres -e POSTGRES_PASSWORD=postgres \
 debezium/example-postgres
 
 # In a separate terminal, launch psql to run SQL queries:
 docker run -it --rm --name psql_client \
 -e PGOPTIONS="--search_path=inventory" \
 -e PGPASSWORD=postgres --link postgres:postgres debezium/example-postgres \
 psql -h postgres -U postgres
 
 # To see the list of relations in the inventory database, type \d at the postgres prompt. To exit, type \q
 
 
Enable Logical Decoding on the PostgreSQL server 
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

Logical encoding is already enabled if you set up PostgreSQL using the Docker image (in the previous section).
On your native installation, follow these steps to  :ref:`enable-logical-decoding`.

Start the Debezium PostgreSQL connector
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

Create the file ``register-sqlserver.json`` to store the following connector configuration:

::

 {
   "name": "inventory-connector",
   "config": {
       "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
       "tasks.max": "1",
       "database.hostname": "0.0.0.0",
       "database.port": "5432",
       "database.user": "postgres",
       "database.password": "postgres",
       "database.dbname" : "postgres",
       "database.server.name": "dbserver1",
       "schema.whitelist": "inventory"
       }
  }


Start the connector.
::

 curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-postgres.json


Start your |ak| consumer
^^^^^^^^^^^^^^^^^^^^^^^^

Start the consumer in a new terminal session.

.. codewithvars:: bash

   |confluent_consume| dbserver1.inventory.customers|dash| --from-beginning
 
When you enter SQL queries in bash (to add or modify records in the database) messages populate and are displayed on your consumer terminal to reflect those records. 

Following is an example psql query to update a record in the customers table. 

.. codewithvars:: bash

   update customers set first_name = 'Sarah' where id = 1001;


Clean up resources
^^^^^^^^^^^^^^^^^^

Delete the connector and stop Confluent services.

.. codewithvars:: bash

   curl -X DELETE localhost:8083/connectors/inventory-connector
   |confluent_stop|

Stop PostgreSQL containers.

.. codewithvars:: bash

 
    docker stop psql_client # Alternatively type \q at the psql prompt
    docker stop postgres


.. note::
   
   Portions of the information provided here derives from documentation originally
   produced by the `Debezium Community <https://debezium.io/>`_. Work produced
   by Debezium is licensed under `Creative Commons 3.0 <https://creativecommons.org/licenses/by/3.0/>`_.
   
Additional Documentation
========================
   
.. toctree::
   :maxdepth: 1

   postgres_source_connector_config
   
.. _Debezium PostgreSQL Connector: https://debezium.io/docs/connectors/postgresql/
.. _Debezium tutorial: https://github.com/debezium/debezium-examples/tree/master/tutorial#using-postgres
.. _logical decoding: https://www.postgresql.org/docs/9.6/logicaldecoding-explanation.html
.. _replication slot: https://www.postgresql.org/docs/9.6/logicaldecoding-walsender.html
.. _Debezium PostgreSQL Connector: https://debezium.io/docs/connectors/postgresql/
.. _logical decoding: https://www.postgresql.org/docs/9.6/logicaldecoding-explanation.html
.. _logical decoding plugin: https://debezium.io/docs/install/postgres-plugins/
.. _Amazon RDS: https://aws.amazon.com/rds/
.. _Heroku Postgres: https://www.heroku.com/postgres
.. _PostgreSQL on AmazonRDS: https://debezium.io/docs/connectors/postgresql/#amazon-rds
.. _protobuf: https://github.com/debezium/postgres-decoderbufs/blob/master/README.md
.. _wal2json: https://github.com/eulerto/wal2json/blob/master/README.md