Debezium MySQL Source Connector for Confluent Platform

The Debezium’s MySQL Connector is a source connector that can obtain a snapshot of the existing data and record all of the row-level changes in the databases on a MySQL server/cluster. The first time it connects to a MySQL server, it reads a consistent snapshot of all of the databases. When that snapshot is complete, the connector continuously reads the changes that were committed to MySQL and generates corresponding insert, update, and delete events. All of the events for each table are recorded in a separate Kafka topic, where they can be easily consumed by applications and services.

  • Confluent supports MySQL connector version 0.9.3 and later.
  • Confluent supports using this connector with MySQL 5.6 or later.

Features

The Debezium MySQL Source Connector includes the following features:

At least once delivery

This connector guarantees that records are delivered at least once to the Kafka topic. If the connector restarts, there may be some duplicate records in the Kafka topic.

Supports one task

The Debezium MySQL Source Connector supports running only one task.

Automatic topic creation

The connector automatically creates the internal database history Kafka topic if it doesn’t exist.

Install the MySQL Connector

You can install this connector by using the confluent connect plugin install command, or by manually downloading the ZIP file.

Prerequisites

  • You must install the connector on every machine where Connect will run.

  • An installation of the latest (latest) connector version.

    To install the latest connector version, navigate to your Confluent Platform installation directory and run the following command:

    confluent connect plugin install debezium/debezium-connector-mysql:latest
    

    You can install a specific version by replacing latest with a version number as shown in the following example:

    confluent connect plugin install debezium/debezium-connector-mysql:2.3.0
    
  • The Debezium MySQL Source connector has specific ACL requirements. See the ACL requirements for Debezium Source connectors to ensure you meet the specified requirements.

Install the connector manually

Download and extract the ZIP file for your connector and then follow the manual connector installation instructions.

License

The Debezium MySQL connector is an open source connector and does not require a Confluent Enterprise License.

Configuration Properties

For a complete list of configuration properties for this connector, see Configuration Reference for Debezium MySQL Source Connector for Confluent Platform.

Enable the Binary Log on MySQL Server

The MySQL server must be configured to use a row-level binary log, which is described in more detail in the MySQL documentation. MySQL’s binary log, or binlog, records all operations in the same order they are committed by the database, including changes to the schemas of tables or changes to data stored within the tables. MySQL uses its binlog for replication and recovery.

Debezium’s MySQL connector reads MySQL’s binary log to understand what and in what order data has changed. It then produces a change event for every row-level insert, update, and delete operation in the binlog, recording all the change events for each table in a separate Kafka topic. The Debezium MySQL connector requires every event it processes to be formatted as row-level events.

Important

  • If the MySQL server used by the connector is a replica, then all of that replica’s MySQL primary servers must also be configured to use a row-level binary log. Without this, the events from the primary will be in the wrong format and will be replicated to the secondary’s binlog with the wrong format.
  • If the MySQL binlog events are formatted incorrectly, the connector fails with an error message similar to binlog probably contains events generated with statement or mixed based replication format, identifying the event. If the event is for a table other than the tables being captured by the connector, you can add a regular expression that matches that problematic event’s DML or DDL statement to the schema.history.internal.ddl.filter connector configuration property. However, if the event is for a table that is being captured, then skipping the event would result in lost changes. To fix this problem, you must capture another snapshot of the table(s) by creating a new connector with a different name. This can be expensive, and is why it’s important to ensure the MySQL server’s binlog format is set correctly before starting a connector.

Setting the MySQL binlog format is most often done in the MySQL server configuration file, and will look similar to the following fragment:

server-id         = 223344
log_bin           = mysql-bin
binlog_format     = row
binlog_row_image  = full
expire_logs_days  = 10

where:

  • the value for server-id must be unique for each server and replication client within the MySQL cluster. When you set up the connector, you also assign the connector a unique server ID.
  • the value for log_bin is the base name for the sequence of binlog files.
  • the value for binlog_format must be set to row or ROW.
  • the value for binlog_row_image must be set to full or FULL.
  • the value for expire_log_days is the number of days for automatic binary log file removal. The default is 0, which means “no automatic removal,” so be sure to set a value that is appropriate for your environment.

Quick Start

Debezium’s MySQL Connector is a source connector that can record events for each table in a separate Kafka topic, where they can be easily consumed by applications and services.

For an example of how to get Kafka Connect connected to Confluent Cloud, see Connect Self-Managed Kafka Connect to Confluent Cloud.

Install the Connector

Refer to the Debezium tutorial if you want to use Docker images for setting up Kafka, ZooKeeper, and Kafka Connect. Note that as of Confluent Platform 7.5, ZooKeeper is deprecated for new deployments. Confluent recommends KRaft mode for new deployments. For the following tutorial, you need to have a local setup of Confluent Platform.

Navigate to your Confluent Platform installation directory and run the following command to install the connector:

confluent connect plugin install debezium/debezium-connector-mysql:0.9.4

Adding a new connector plugin requires restarting Connect. Use the Confluent CLI to restart Connect.

Tip

The command syntax for the Confluent CLI development commands changed in 5.3.0. These commands have been moved to confluent local. For example, the syntax for confluent start is now confluent local services start. For more information, see confluent local.

confluent local services connect stop && confluent local services connect start
Using CONFLUENT_CURRENT: /Users/username/Sandbox/confluent-snapshots/var/confluent.NuZHxXfq
Starting Zookeeper
Zookeeper is [UP]
Starting Kafka
Kafka is [UP]
Starting Schema Registry
Schema Registry is [UP]
Starting Kafka REST
Kafka REST is [UP]
Starting Connect
Connect is [UP]

Check if the MySQL plugin has been installed correctly and picked up by the plugin loader:

curl -sS localhost:8083/connector-plugins | jq .[].class | grep mysql
"io.debezium.connector.mysql.MySqlConnector"

Set up MySQL using Docker (Optional)

If you do not have a native installation of MySQL, you may use the following command to start a new container that runs a MySQL database server preconfigured with an inventory database:

#Run docker container
docker run -it --rm --name mysql \
-p 3306:3306 \
-e MYSQL_ROOT_PASSWORD=debezium \
-e MYSQL_USER=mysqluser \
-e MYSQL_PASSWORD=mysqlpw debezium/example-mysql:0.9

Start a MySQL command line client.

docker run -it --rm --name mysqlterm --link mysql --rm mysql:5.7 sh -c 'exec mysql -h"$MYSQL_PORT_3306_TCP_ADDR" -P"$MYSQL_PORT_3306_TCP_PORT" -uroot -p"$MYSQL_ENV_MYSQL_ROOT_PASSWORD"'

Start the Debezium SQL Server connector

Create the file register-mysql.json to store the following connector configuration:

{
 "name": "inventory-connector",
 "config": {
     "connector.class": "io.debezium.connector.mysql.MySqlConnector",
     "tasks.max": "1",
     "database.hostname": "mysql",
     "database.port": "3306",
     "database.user": "debezium",
     "database.password": "dbz",
     "database.server.id": "184054",
     "database.server.name": "dbserver1",
     "database.include.list": "inventory",
     "database.history.kafka.bootstrap.servers": "localhost:9092",
     "database.history.kafka.topic": "schema-changes.inventory"
     }
 }

Start the connector.

curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-mysql.json

Start your Kafka consumer

Start the consumer in a new terminal session.

confluent local services kafka consume dbserver1.inventory.customers --from-beginning

When you enter SQL queries in MySQL bash to add or modify records in the database, messages populate and are displayed on your consumer terminal showing the added or modified records.

# Explore the sample inventory database already populated in your MySQL client running in Docker
use inventory;
SELECT * FROM customers;
# Type these queries to see change events in the consumer terminal
UPDATE customers SET first_name='Anne Marie' WHERE id=1004;
DELETE FROM customers WHERE id=1004;

Clean up resources

Delete the connector and stop Confluent services.

curl -X DELETE localhost:8083/connectors/inventory-connector
confluent local stop

Stop SQL Server container.

docker stop mysqlterm mysql

Note

Portions of the information provided here derives from documentation originally produced by the Debezium Community. Work produced by Debezium is licensed under Creative Commons 3.0.