.. _mysql-source-connector:

Debezium MySQL Source Connector
-------------------------------

The `Debezium’s MySQL Connector`_ is a source connector that can obtain a snapshot of the existing data and record all of the row-level changes in the databases on a MySQL server/cluster. The first time it connects to a MySQL server, it reads a consistent snapshot of all of the databases. When that snapshot is complete, the connector continuously reads the changes that were committed to MySQL and generates corresponding insert, update, and delete events. All of the events for each table are recorded in a separate |ak| topic, where they can be easily consumed by applications and services.

* Confluent supports MySQL connector version 0.9.3 and later. 
* Confluent supports using this connector with MySQL 5.6 or later. 

Install the MySQL Connector
===========================

.. include:: ../includes/connector-install.rst

.. codewithvars:: bash

   confluent-hub install debezium/debezium-connector-mysql:latest

.. include:: ../includes/connector-install-version.rst

.. codewithvars:: bash

   confluent-hub install debezium/debezium-connector-mysql:0.9.4

License
=======

The Debezium MySQL connector is an open source connector and does not require a Confluent Enterprise License.

Enable the Binary Log on MySQL Server
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

The MySQL server must be configured to use a row-level binary log, which is described in more detail in the `MySQL documentation`_. MySQL’s binary log, or binlog, records all operations in the same order they are committed by the database, including changes to the schemas of tables or changes to data stored within the tables. MySQL uses its binlog for replication and recovery.

Debezium’s MySQL connector reads MySQL’s binary log to understand what and in what order data has changed. It then produces a change event for every row-level insert, update, and delete operation in the binlog, recording all the change events for each table in a separate |ak| topic. 

This is most often done in the MySQL server configuration file, and will look similar to the following fragment:
::

   server-id         = 223344
   log_bin           = mysql-bin
   binlog_format     = row
   binlog_row_image  = full
   expire_logs_days  = 10

where:

* the value for **server-id** must be unique for each server and replication client within the MySQL cluster. When you set up the connector, you also assign the connector a unique server ID.
* the value for **log_bin** is the base name for the sequence of binlog files.
* the value for **binlog_format** must be set to row or ROW.
* the value for **binlog_row_image** must be set to full or FULL.
* the value for **expire_log_days** is the number of days for automatic binary log file removal. The default is 0, which means "no automatic removal," so be sure to set a value that is appropriate for your environment.

.. _mysql-source-connector_quickstart:

Quick Start
===========

`Debezium’s MySQL Connector`_ is a source connector that can record events for each table in a separate |ak| topic,
where they can be easily consumed by applications and services.

Install the Connector
^^^^^^^^^^^^^^^^^^^^^

Refer to the `Debezium tutorial`_ if you want to use Docker images for setting up |ak|, |zk|, and |kconnect-long|. For the following tutorial, you need to have a local setup of |cp|.

Navigate to your Confluent Platform installation directory and run the following
command to install the connector:

.. codewithvars:: bash

   confluent-hub install debezium/debezium-connector-mysql:0.9.4

Adding a new connector plugin requires restarting Connect. Use the
Confluent CLI to restart Connect:

.. codewithvars:: bash

   |confluent_stop| connect && |confluent_start| connect
   Using CONFLUENT_CURRENT: /Users/username/Sandbox/confluent-snapshots/var/confluent.NuZHxXfq
   Starting zookeeper
   zookeeper is [UP]
   Starting kafka
   kafka is [UP]
   Starting schema-registry
   schema-registry is [UP]
   Starting kafka-rest
   kafka-rest is [UP]
   Starting connect
   connect is [UP]

Check if the MySQL plugin has been installed correctly and picked
up by the plugin loader:

.. codewithvars:: bash

   curl -sS localhost:8083/connector-plugins | jq .[].class | grep mysql
   "io.debezium.connector.mysql.MySqlConnector"

Set up MySQL using Docker (Optional)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

If you do not have a native installation of MySQL, you may use the following command to start a new container that runs a MySQL database server preconfigured with an `inventory` database:

.. codewithvars:: bash

   #Run docker container
   docker run -it --rm --name mysql \
   -p 3306:3306 \
   -e MYSQL_ROOT_PASSWORD=debezium \
   -e MYSQL_USER=mysqluser \
   -e MYSQL_PASSWORD=mysqlpw debezium/example-mysql:0.9
   
   
Start a MySQL command line client.

.. codewithvars:: bash

   docker run -it --rm --name mysqlterm --link mysql --rm mysql:5.7 sh -c 'exec mysql -h"$MYSQL_PORT_3306_TCP_ADDR" -P"$MYSQL_PORT_3306_TCP_PORT" -uroot -p"$MYSQL_ENV_MYSQL_ROOT_PASSWORD"'
   

Start the Debezium SQL Server connector
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

Create the file ``register-mysql.json`` to store the following connector configuration:

::

   {
    "name": "inventory-connector",
    "config": {
        "connector.class": "io.debezium.connector.mysql.MySqlConnector",
        "tasks.max": "1",
        "database.hostname": "mysql",
        "database.port": "3306",
        "database.user": "debezium",
        "database.password": "dbz",
        "database.server.id": "184054",
        "database.server.name": "dbserver1",
        "database.whitelist": "inventory",
        "database.history.kafka.bootstrap.servers": "localhost:9092",
        "database.history.kafka.topic": "schema-changes.inventory"
        }
    }

Start the connector.
::

   curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-mysql.json

Start your |ak| consumer
^^^^^^^^^^^^^^^^^^^^^^^^

Start the consumer in a new terminal session.

.. codewithvars:: bash

   |confluent_consume| dbserver1.inventory.customers|dash| --from-beginning
   
When you enter SQL queries in MySQL bash to add or modify records in the database, messages populate and are displayed on your consumer terminal showing the added or modified records. 

.. codewithvars:: bash

  # Explore the sample inventory database already populated in your MySQL client running in Docker
  use inventory;
  SELECT * FROM customers;
  # Type these queries to see change events in the consumer terminal
  UPDATE customers SET first_name='Anne Marie' WHERE id=1004;
  DELETE FROM customers WHERE id=1004;


Clean up resources
^^^^^^^^^^^^^^^^^^

Delete the connector and stop Confluent services.

.. codewithvars:: bash

 curl -X DELETE localhost:8083/connectors/inventory-connector
 |confluent_stop|

Stop SQL Server container.

.. codewithvars:: bash

   docker stop mysqlterm mysql

.. note::
   
   Portions of the information provided here derives from documentation originally
   produced by the `Debezium Community <https://debezium.io/>`_. Work produced
   by Debezium is licensed under `Creative Commons 3.0 <https://creativecommons.org/licenses/by/3.0/>`_.

Additional Documentation
========================

.. toctree::
   :maxdepth: 1
   :titlesonly:
   
   mysql_source_connector_config
   
.. _Debezium’s MySQL Connector: https://debezium.io/docs/connectors/mysql/
.. _Debezium tutorial: https://github.com/debezium/debezium-examples/tree/master/tutorial#using-mysql

.. _Debezium’s MySQL Connector: https://debezium.io/docs/connectors/mysql/
.. _MySQL documentation: https://dev.mysql.com/doc/refman/5.7/en/replication-options.html