JDBC Source Connector for Confluent Platform

The Kafka Connect JDBC Source connector allows you to import data from any relational database with a JDBC driver into an Apache Kafka® topic. This connector can support a wide variety of databases.

Data is loaded by periodically executing a SQL query and creating an output record for each row in the result set. By default, all tables in a database are copied, each to its own output topic. The database is monitored for new or deleted tables and adapts automatically. When copying data from a table, the connector can load only new or modified rows by specifying which columns should be used to detect new or modified data.

You can configure Java streams applications to deserialize and ingest data in multiple ways, including Kafka console producers, JDBC source connectors, and Java client producers. For full code examples, see Pipelining with Kafka Connect and Kafka Streams.

Important

When you include CLOB data for the JDBC connector, throughput may vary. To process CLOB data, the connector must free up the previous CLOB data to avoid an Out of Memory (OOM) exception–freeing up memory impacts throughput. If CLOB data is small, consider using VARCHAR instead of CLOB.

Features

The source connector supports copying tables with a variety of JDBC data types, adding and removing tables from the database dynamically, whitelists and blacklists, varying polling intervals, and other settings. However, the most important features for most users are the settings controlling how data is incrementally copied from the database.

Kafka Connect tracks the latest record it retrieved from each table, so it can start in the correct location on the next iteration (or in case of a crash). The source connector uses this functionality to only get updated rows from a table (or from the output of a custom query) on each iteration. Several modes are supported, each of which differs in how modified rows are detected.

The JDBC Source connector includes the following features:

At least once delivery

This connector guarantees that records are delivered to the Kafka topic at least once. If the connector restarts, there may be some duplicate records in the Kafka topic.

Supports one task

The JDBC Source connector can read one or more tables from a single task. In query mode, the connector supports running only one task.

Incremental query modes

Each incremental query mode tracks a set of columns for each row, which it uses to keep track of which rows have been processed and which rows are new or have been updated. The mode setting controls this behavior and supports the following options:

  • Incrementing Column: A single column containing a unique ID for each row, where newer rows are guaranteed to have larger IDs— that is, an AUTOINCREMENT column. Note that this mode can only detect new rows. Updates to existing rows cannot be detected, so this mode should only be used for immutable data. One example where you might use this mode is when streaming fact tables in a data warehouse, since those are typically insert-only. Incrementing columns must be integral types.

  • Timestamp Column: In this mode, a single column containing a modification timestamp is used to track the last time data was processed and to query only for rows that have been modified since that time.

    Note

    • Timestamps aren’t necessarily unique. As a result, this mode can’t guarantee that all updated data will be delivered. For example, if two rows share the same timestamp and are returned by an incremental query, and only one has been processed before a failure, the second update will be missed when the system recovers.
    • The connector might not be able to fetch some records from a table in this mode. For example, if you set batch.max.rows to 10 and have more than 10 records with the same timestamp in a table, the additional records (with the same timestamp) will be ignored.
  • Timestamp and Incrementing Columns: This is the most robust and accurate mode, combining an incrementing column with a timestamp column. By combining the two, as long as the timestamp is sufficiently granular, each (id, timestamp) tuple will uniquely identify an update to a row. Even if an update fails after partially completing, unprocessed updates will still be correctly detected and delivered when the system recovers.

  • Custom Query: The source connector supports using custom queries instead of copying whole tables. With a custom query, one of the other update automatic update modes can be used as long as the necessary WHERE clause can be correctly appended to the query. Alternatively, the specified query may handle filtering to new updates itself; however, note that no offset tracking will be performed (unlike the automatic modes where incrementing and/or timestamp column values are recorded for each record), so the query must track offsets itself.

  • Bulk: This mode is unfiltered and therefore not incremental at all. It will load all rows from a table on each iteration. This can be useful if you want to periodically dump an entire table where entries are eventually deleted and the downstream system can safely handle duplicates.

Note that all incremental query modes that use certain columns to detect changes will require indexes on those columns to efficiently perform the queries.

For incremental query modes that use timestamps, the source connector uses a configuration timestamp.delay.interval.ms to control the waiting period after a row with certain timestamp appears before you include it in the result. The additional wait allows transactions with earlier timestamps to complete and the related changes to be included in the result. For more information, see Configuration Reference for JDBC Source Connector for Confluent Platform.

Specifying a WHERE clause with query modes

You can use query with a WHERE clause if you enable mode=bulk. This loads all rows from a table at each iteration.

To use a WHERE clause with mode=timestamp, mode=incrementing, and mode=timestamp+incrementing, it must be possible to append the WHERE clause to the query. A common way to do this is to write your query within a subselect that includes the incremental and/or timestamp field in its fields returned. This means that the connector can then append its own WHERE clause to create an incremental load.

For example:

SELECT * FROM
    (SELECT ID_COL,
            TIMESTAMP_COL,
            COL1,
            COL2
       FROM TABLE_A
            INNER JOIN TABLE_B
            ON PK=FK
      WHERE COL1='FOO');

Using the example above as the value for the query connector configuration property, any one of the following three subsequent mode connector configurations are valid and will produce incremental loading.

mode=timestamp
timestamp.column.name=TIMESTAMP_COL
mode=incrementing
incrementing.column.name=ID_COL
mode=timestamp+incrementing
timestamp.column.name=TIMESTAMP_COL
incrementing.column.name=ID_COL

mode=timestamp+incrementing example connector configuration properties:

mode":"timestamp+incrementing",
"query":"SELECT * FROM (SELECT ID_COL, TIMESTAMP_COL, COL1, COL2 FROM TABLE_A INNER JOIN TABLE_B ON PK=FK WHERE COL1='FOO')",
"timestamp.column.name":"TIMESTAMP_COL",
"incrementing.column.name":"ID_COL",

Message keys

Kafka messages are key/value pairs. For a JDBC connector, the value (payload) is the contents of the table row being ingested. However, the JDBC connector does not generate the key by default.

Message keys are useful in setting up partitioning strategies. Keys can direct messages to a specific partition and can support downstream processing where joins are used. If no message key is used, messages are sent to partitions using round-robin distribution.

To set a message key for the JDBC connector, you use two Single Message Transformations (SMTs): the ValueToKey SMT and the ExtractField SMT. You add these two SMTs to the JDBC connector configuration. For example, the following shows a snippet added to a configuration that takes the id column of the accounts table to use as the message key.

curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{
     "name": "jdbc_source_mysql_01",
     "config": {
             "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
             "connection.url": "jdbc:mysql://mysql:3306/test",
             "connection.user": "connect_user",
             "connection.password": "connect_password",
             "topic.prefix": "mysql-01-",
             "poll.interval.ms" : 3600000,
             "table.whitelist" : "test.accounts",
             "mode":"bulk",
             "transforms":"createKey,extractInt",
             "transforms.createKey.type":"org.apache.kafka.connect.transforms.ValueToKey",
             "transforms.createKey.fields":"id",
             "transforms.extractInt.type":"org.apache.kafka.connect.transforms.ExtractField$Key",
             "transforms.extractInt.field":"id"
             }
     }'

Mapping column types

The source connector has a few options for controlling how column types are mapped into Kafka Connect field types. By default, the connector maps SQL/JDBC types to the most accurate representation in Java, which is straightforward for many SQL types but may be a bit unexpected for some types, as described in the following section.

Numeric mapping property

SQL’s NUMERIC and DECIMAL types have exact semantics controlled by precision and scale. The most accurate representation for these types is Connect’s Decimal logical type which uses Java’s BigDecimal representation. Avro serializes Decimal types as bytes that may be difficult to consume and that may require additional conversion to an appropriate data type. The source connector’s numeric.mapping configuration property does this by casting numeric values to the most appropriate primitive type using the numeric.mapping=best_fit value. The following values are available for the numeric.mapping configuration property:

  • none: Use this value if all NUMERIC columns are to be represented by the Kafka Connect Decimal logical type. This is the default value for this property. Decimal types are mapped to their binary representation.

  • best_fit: Use this value if all NUMERIC columns should be cast to Connect INT8, INT16, INT32, INT64, or FLOAT64 based upon the column’s precision and scale. This is the property value you should likely use if you have NUMERIC/NUMBER source data. It attempts to map NUMERIC columns to the Connect INT8, INT16, INT32, INT64, and FLOAT64 primitive type, based upon the column’s precision and scale values, as shown below:

    Precision Scale Connect primitive type
    1 to 2 -84 to 0 INT8
    3 to 4 -84 to 0 INT16
    5 to 9 -84 to 0 INT32
    10 to 18 -84 to 0 INT64
    1 to 18 positive FLOAT64
  • best_fit_eager_double: Use this value if NUMERIC columns with non-positive scale should be cast to Connect INT8, INT16, INT32 or INT64 primitive types with the same precision and scale bounds as best_fit. But, it is desired to cast all NUMERIC columns with positive scale to Connect FLOAT64 primitive types, despite the potential loss of accuracy in representing the data.

  • precision_only: Use this to map NUMERIC columns based only on the column’s precision (assuming that column’s scale is 0). This option attempts to map NUMERIC columns to Connect INT8, INT16, INT32, and INT64 types based only upon the column’s precision, and where the scale is always 0.

    Precision Scale Connect primitive type
    1 to 2 0 INT8
    3 to 4 0 INT16
    5 to 9 0 INT32
    10 to 18 0 INT64

Tip

For a deeper dive into this topic, see the Confluent blog article Bytes, Decimals, Numerics and oh my.

Note

The numeric.precision.mapping property is older and is now deprecated. When enabled, it is equivalent to numeric.mapping=precision_only. When not enabled, it is equivalent to numeric.mapping=none.

Use the TimestampConverter SMT

In your JDBC Source connector configuration file, you must add the TimestampConverter to the columns that need a date for the connector to produce a schema with a logicalType of Date. Otherwise, the connector will generate a schema containing a logicalType of timestamp-millis.

Custom Credentials Provider Support

You can configure the JDBC connector to use a custom credentials provider, instead of the default one provided in the connector. To do this, you implement a custom credentials provider, build it as a JAR file, and deploy the JAR file to use the custom provider.

Complete the following steps to use a custom credentials provider:

  1. Set a custom credentials provider class: Set the jdbc.credentials.provider.class property to a class that implements the io.confluent.connect.jdbc.util.JdbcCredentialsProvider interface. Configure the class to the fully qualified name of your custom credentials provider class.
  2. Configure additional settings (Optional): For additional configuration, prefix the configuration keys with jdbc.credentials.provider. If your custom credentials provider needs to accept additional configuration, implement the org.apache.kafka.common.Configurable interface that lets the connector receive configurations that are prefixed with jdbc.credentials.provider..
  3. Ensure a public no-args constructor: Your custom credentials provider class must have a public no-argument constructor. This is necessary because the connector creates an instance of the provider using this constructor.
  4. Package your provider: Once your custom credentials provider class is implemented, package it into a JAR file.
  5. Copy the JAR file to Connect Worker: Copy the built JAR file to the share/java/kafka-connect-jdbc directory on all Connect workers. This step ensures that the JDBC connector can access and use your custom credentials provider.

Limitations

The JDBC Source connector has the following limitations:

  • The geometry column type isn’t supported for the JDBC Source connector.

  • The connector does not support the array data type.

  • If the connector makes numerous parallel insert operations in a large source table, insert transactions can commit out of order; this is typical and means that a greater auto_increment ID (for example, 101) is committed earlier and a smaller ID (for example, 100) is committed later. The time difference may only be a few milliseconds, but the commits are out of order nevertheless.

    Note that using incrementing mode to load data from such tables always results in some data loss. This happens because when the source connector worker reads, or polls the table, the connector gets the row with a greater offset value (with the smaller offset row remaining uncommitted). In the next iteration, although the uncommitted row is committed, the offset position has moved beyond that value, so the row is skipped. Using timestamp+incrementing mode is not a good choice either, because the tables may be very large (five to eight million rows added daily) and there is a high cost for any indexing approach, with the exception of PK indexing.

License

This connector is available under the Confluent Community License.

Configuration Properties

For a complete list of configuration properties for this connector, see Configuration Reference for JDBC Source Connector for Confluent Platform.

For an example of how to get Kafka Connect connected to Confluent Cloud, see Connect Self-Managed Kafka Connect to Confluent Cloud.

JSON and JSONB for PostgreSQL

PostgreSQL supports storing table data as JSON or JSONB (JSON binary format). Both the JDBC Source and Sink connectors support sourcing from or sinking to PostgreSQL tables containing data stored as JSON or JSONB.

The JDBC Source connector stores JSON or JSONB as STRING type in Kafka. For the JDBC Sink connector, JSON or JSONB should be stored as STRING type in Kafka and matching columns should be defined as JSON or JSONB in PostgreSQL.

Install the JDBC Source Connector

The JDBC Source connector is no longer bundled with Confluent Platform, so you must install it manually.

Prerequisites

  • Confluent Platform. If you want to install the connector using Confluent Hub, you must install the Confluent Hub Client. This is installed by default with Confluent Enterprise.
  • SQLite is installed. You can also use another database. If you are using another database, be sure to adjust the connection.url setting. Confluent Platform includes JDBC drivers for SQLite and PostgreSQL, but if you’re using a different database you must also verify that the JDBC driver is available on the Kafka Connect process’s CLASSPATH.
  • Kafka and Schema Registry are running locally on the default ports.
  • If you are running a multi-node Connect cluster, the JDBC connector and JDBC driver JARs must be installed on every Connect worker in the cluster. For more information, see JDBC Drivers.

Install the connector using the Confluent CLI

To install the latest connector version using Confluent Hub Client, navigate to your Confluent Platform installation directory and run the following command:

confluent connect plugin install confluentinc/kafka-connect-jdbc:latest

You can install a specific version by replacing latest with a version number as shown in the following example:

confluent connect plugin install confluentinc/kafka-connect-jdbc:10.8.0

Install the connector manually

Download and extract the ZIP file for your connector and then follow the manual connector installation instructions.

Quick Start

To see the basic functionality of the connector, you’ll copy a single table from a local SQLite database. In this quick start, you can assume each entry in the table is assigned a unique ID and is not modified after creation.

Create SQLite Database and Load Data

  1. Create a SQLite database with this command:

    sqlite3 test.db
    

    Your output should resemble:

    SQLite version 3.19.3 2017-06-27 16:48:08
    Enter ".help" for usage hints.
    sqlite>
    
  2. In the SQLite command prompt, create a table and seed it with some data:

    sqlite> CREATE TABLE accounts(id INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL, name VARCHAR(255));
    
    sqlite> INSERT INTO accounts(name) VALUES('alice');
    
    sqlite> INSERT INTO accounts(name) VALUES('bob');
    

    Tip

    You can run SELECT * from accounts; to verify your table has been created.

Load the JDBC Source Connector

  1. Load the JDBC Sink connector using the following command (assuming you installed the connector using Confluent Hub):

    confluent local services connect connector load jdbc-source --config $CONFLUENT_HOME/share/confluent-hub-components/confluentinc-kafka-connect-jdbc/etc/source-quickstart-sqlite.properties
    

    If you installed the connector manually, the --config flag will be different.

  2. Load the jdbc-source connector. The test.db file must be in <path-to-confluent>, such as $CONFLUENT_HOME.

    confluent local services connect connector load jdbc-source
    

    Your output should resemble:

    {
      "name": "jdbc-source",
      "config": {
        "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
        "tasks.max": "1",
        "connection.url": "jdbc:sqlite:test.db",
        "mode": "incrementing",
        "incrementing.column.name": "id",
        "topic.prefix": "test-sqlite-jdbc-",
        "name": "jdbc-source"
      },
      "tasks": [],
      "type": null
    }
    

    Tip

    For non-CLI users, you can load the JDBC Source connector with this command:

    ${CONFLUENT_HOME}/bin/connect-standalone \
    ${CONFLUENT_HOME}/etc/schema-registry/connect-avro-standalone.properties \
    ${CONFLUENT_HOME}/etc/kafka-connect-jdbc/sink-quickstart-sqlite.properties
    

    To check that it has copied the data that was present when you started Kafka Connect, start a console consumer, reading from the beginning of the topic:

      ./bin/kafka-avro-console-consumer --bootstrap-server localhost:9092 --topic test-sqlite-jdbc-accounts --from-beginning
    {"id":1,"name":{"string":"alice"}}
    {"id":2,"name":{"string":"bob"}}
    

The output shows the two records as expected, one per line, in the JSON encoding of the Avro records. Each row is represented as an Avro record and each column is a field in the record. You can see both columns in the table, id and name. The IDs were auto-generated and the column is of type INTEGER NOT NULL, which can be encoded directly as an integer. The name column has type STRING and can be NULL. The JSON encoding of Avro encodes the strings in the format {"type": value}, so you can see that both rows have string values with the names specified when you inserted the data.

Add a record to the consumer

Add another record using the SQLite command prompt:

sqlite> INSERT INTO accounts(name) VALUES('cathy');

You can switch back to the console consumer and see the new record is added and, importantly, the old entries are not repeated:

{"id":3,"name":{"string":"cathy"}}

Note that the default polling interval is five seconds, so it may take a few seconds to show up. Depending on your expected rate of updates or desired latency, a smaller poll interval could be used to deliver updates more quickly.

All the features of Kafka Connect, including offset management and fault tolerance, work with the source connector. You can restart and kill the processes and they will pick up where they left off, copying only new data (as defined by the mode setting).

Configuration

The source connector gives you quite a bit of flexibility in the databases you can import data from and how that data is imported. This section first describes how to access databases whose drivers are not included with Confluent Platform, then gives a few example configuration files that cover common scenarios, then provides an exhaustive description of the available configuration options.

The full set of configuration options are listed in Configuration Reference for JDBC Source Connector for Confluent Platform, but here are a few template configurations that cover some common usage scenarios.

Use a whitelist to limit changes to a subset of tables in a MySQL database, using id and modified columns that are standard on all whitelisted tables to detect rows that have been modified. This mode is the most robust because it can combine the unique, immutable row IDs with modification timestamps to guarantee modifications are not missed even if the process dies in the middle of an incremental update query.

name=mysql-whitelist-timestamp-source
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=1

connection.url=jdbc:mysql://mysql.example.com:3306/my_database?user=alice&password=secret
table.whitelist=users,products,transactions

mode=timestamp+incrementing
timestamp.column.name=modified
incrementing.column.name=id

topic.prefix=mysql-

Use a custom query instead of loading tables, allowing you to join data from multiple tables. As long as the query does not include its own filtering, you can still use the built-in modes for incremental queries (in this case, using a timestamp column). Note that this limits you to a single output per connector and because there is no table name, the topic “prefix” is actually the full topic name in this case.

name=mysql-whitelist-timestamp-source
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=1

connection.url=jdbc:postgresql://postgres.example.com/test_db?user=bob&password=secret&ssl=true
query=SELECT users.id, users.name, transactions.timestamp, transactions.user_id, transactions.payment FROM users JOIN transactions ON (users.id = transactions.user_id)
mode=timestamp
timestamp.column.name=timestamp

topic.prefix=mysql-joined-data

Schema Evolution

The JDBC connector supports schema evolution when the Avro converter is used. When there is a change in a database table schema, the JDBC connector can detect the change, create a new Connect schema and try to register a new Avro schema in Schema Registry. Whether you can successfully register the schema or not depends on the compatibility level of Schema Registry, which is backward by default.

For example, if you remove a column from a table, the change is backward compatible and the corresponding Avro schema can be successfully registered in Schema Registry. If you modify the database table schema to change a column type or add a column, when the Avro schema is registered to Schema Registry, it will be rejected as the changes are not backward compatible.

You can change the compatibility level of Schema Registry to allow incompatible schemas or other compatibility levels. There are two ways to do this:

  • Set the compatibility level for subjects which are used by the connector using PUT /config/(string: subject). The subjects have format of topic-key and topic-value where the topic is determined by topic.prefix config and table name.
  • Configure Schema Registry to use other schema compatibility level by setting avro.compatibility.level in Schema Registry. Note that this is a global setting that applies to all schemas in Schema Registry.

However, due to the limitation of the JDBC API, some compatible schema changes may be treated as incompatible change. For example, adding a column with default value is a backward compatible change. However, limitations of the JDBC API make it difficult to map this to default values of the correct type in a Kafka Connect schema, so the default values are currently omitted. The implications is that even some changes of the database table schema is backward compatible, the schema registered in Schema Registry is not backward compatible as it doesn’t contain a default value.

If the JDBC connector is used together with the HDFS connector, there are some restrictions to schema compatibility as well. When Hive integration is enabled, schema compatibility is required to be backward, forward and full to ensure that the Hive schema is able to query the whole data under a topic. As some compatible schema change will be treated as incompatible schema change, those changes will not work as the resulting Hive schema will not be able to query the whole data for a topic.

Pre-execution SQL logging

If the connector does not behave as expected, you can enable the connector to log the actual queries and statements before the connector sends them to the database for execution. This allows you to view the complete SQL statements and queries in the log for troubleshooting.

Complete the steps below to troubleshoot the JDBC source connector using pre-execution SQL logging:

  1. Temporarily change the default Connect log4j.logger.io.confluent.connect.jdbc.source property from INFO to TRACE. You can do this in the connect-log4j.properties file or by entering the following curl command:

    curl -Ss -X PUT '{"level": "TRACE"}' http://localhost:8083/admin/loggers/io.confluent.connect.jdbc.source
    

    Important

    • You should use TRACE logging temporarily. After troubleshooting is completed, change this property back to INFO.
    • This change affects all JDBC source connectors running in the Connect cluster.
  2. Review the log. When using the Confluent CLI to run Confluent Platform locally for development, you can display JDBC source connector log messages using the following CLI command:

    confluent local services connect log | grep JdbcSourceConnector
    

    Search for messages in the output that resemble the example below:

    “Statement to execute: <query-or-statement>“
    
  3. After troubleshooting, return the level to INFO using the following curl command:

    curl -Ss -X PUT '{"level": "INFO"}' http://localhost:8083/admin/loggers/io.confluent.connect.jdbc.source