JDBC Sink Connector for Confluent Platform

The Kafka Connect JDBC Sink connector allows you to export data from Apache Kafka® topics to any relational database with a JDBC driver. This connector can support a wide variety of databases. The connector polls data from Kafka to write to the database based on the topics subscription. It is possible to achieve idempotent writes with upserts. Auto-creation of tables and limited auto-evolution is also supported.

Features

The JDBC Sink connector includes the following features:

At least once delivery

This connector guarantees that records are delivered at least once from the Kafka topic.

Dead Letter Queue

This connector supports the Dead Letter Queue (DLQ) functionality. For information about accessing and using the DLQ, see Confluent Platform Dead Letter Queue.

Multiple tasks

The JDBC Sink connector supports running one or more tasks. You can specify the number of tasks in the tasks.max configuration parameter. This can lead to performance gains when multiple files need to be parsed.

Data mapping

The sink connector requires knowledge of schemas, so you should use a suitable converter. For example, the Avro converter that comes with スキーマレジストリ, the JSON converter with schemas enabled, or the Protobuf converter. Kafka record keys, if present, can be primitive types or a Connect struct, and the record value must be a Connect struct. Fields being selected from Connect structs must be of primitive types. If the data in the topic is not of a compatible format, implementing a custom Converter may be necessary.

Key handling

The default is for primary keys to not be extracted with pk.mode set to none, which is not suitable for advanced usage such as upsert semantics and when the connector is responsible for auto-creating the destination table. There are different modes that enable to use fields from the Kafka record key, the Kafka record value, or the Kafka coordinates for the record.

Refer to primary key configuration options for further detail.

Delete mode

The connector can delete rows in a database table when it consumes a tombstone record, which is a Kafka record that has a non-null key and a null value. This behavior is disabled by default, meaning that any tombstone records will result in a failure of the connector, making it easy to upgrade the JDBC connector and keep prior behavior.

Deletes can be enabled with delete.enabled=true, but only when the pk.mode is set to record_key. This is because deleting a row from the table requires the primary key be used as criteria.

Enabling delete mode does not affect the insert.mode.

Idempotent writes

The default insert.mode is insert. If it is configured as upsert, the connector will use upsert semantics rather than plain INSERT statements. Upsert semantics refer to atomically adding a new row or updating the existing row if there is a primary key constraint violation, which provides idempotence.

If there are failures, the Kafka offset used for recovery may not be up-to-date with what was committed as of the time of the failure, which can lead to re-processing during recovery. The upsert mode is highly recommended as it helps avoid constraint violations or duplicate data if records need to be re-processed.

重要

When a target table includes columns with CLOB, INSERT or UPSERT performance may be degraded. Try to use VARCHAR or VARCHAR2 instead.

Aside from failure recovery, the source topic may also naturally contain multiple records over time with the same primary key, making upserts desirable.

As there is no standard syntax for upsert, the following table describes the database-specific DML (dialect) that is used.

Database Upsert style
MySQL INSERT .. ON DUPLICATE KEY UPDATE ..
Oracle MERGE ..
PostgreSQL INSERT .. ON CONFLICT .. DO UPDATE SET ..
SQLite INSERT OR REPLACE ..
SQL Server MERGE ..
Sybase MERGE ..

Auto-creation and auto-evolution

ちなみに

Ensure the JDBC user has the appropriate permissions for DDL.

If auto.create is enabled, the connector can CREATE the destination table if it is found to be missing. The creation takes place online with records being consumed from the topic, since the connector uses the record schema as a basis for the table definition. Primary keys are specified based on the key configuration settings.

If auto.evolve is enabled, the connector can perform limited auto-evolution by issuing ALTER on the destination table when it encounters a record for which a column is found to be missing. Since data-type changes and removal of columns can be dangerous, the connector does not attempt to perform such evolutions on the table. Addition of primary key constraints is also not attempted. In contrast, if auto.evolve is disabled no evolution is performed and the connector task fails with an error stating the missing columns.

For both auto-creation and auto-evolution, the nullability of a column is based on the optionality of the corresponding field in the schema, and default values are also specified based on the default value of the corresponding field if applicable. We use the following mapping from Connect schema types to database-specific types:

Schema Type MySQL Oracle PostgreSQL SQLite SQL Server Vertica
INT8 TINYINT NUMBER(3,0) SMALLINT NUMERIC TINYINT INT
INT16 SMALLINT NUMBER(5,0) SMALLINT NUMERIC SMALLINT INT
INT32 INT NUMBER(10,0) INT NUMERIC INT INT
INT64 BIGINT NUMBER(19,0) BIGINT NUMERIC BIGINT INT
FLOAT32 FLOAT BINARY_FLOAT REAL REAL REAL FLOAT
FLOAT32 FLOAT BINARY_FLOAT REAL REAL REAL FLOAT
FLOAT64 DOUBLE BINARY_DOUBLE DOUBLE PRECISION REAL FLOAT FLOAT
BOOLEAN TINYINT NUMBER(1,0) BOOLEAN NUMERIC BIT BOOLEAN
STRING TEXT NCLOB TEXT TEXT VARCHAR(MAX) VARCHAR(1024)
BYTES VARBINARY(1024) BLOB BYTEA BLOB VARBINARY(MAX) VARBINARY(1024)
'Decimal' DECIMAL(65,s) NUMBER(*,s) DECIMAL NUMERIC DECIMAL(38,s) DECIMAL(18,s)
'Date' DATE DATE DATE NUMERIC DATE DATE
'Time' TIME(3) DATE TIME NUMERIC TIME TIME
'Timestamp' TIMESTAMP(3) TIMESTAMP TIMESTAMP NUMERIC DATETIME2 TIMESTAMP

Auto-creation or auto-evolution is not supported for databases not mentioned here.

重要

For backwards-compatible table schema evolution, new fields in record schemas must be optional or have a default value. If you need to delete a field, the table schema should be manually altered to either drop the corresponding column, assign it a default value, or make it nullable.

Identifier quoting and case sensitivity

When this connector consumes a record and the referenced database table does not exist or is a missing columns, it can issue a CREATE TABLE or ALTER TABLE statement to create a table or add columns. The ability for the connector to create a table or add columns depends on how you set the auto.create and auto.evolve DDL support properties.

By default, CREATE TABLE and ALTER TABLE use the topic name for a missing table and the record schema field name for a missing column. Also by default, these statements attempt to preserve the case of the names by quoting the table and column names.

You can use the quote.sql.identifiers configuration to control the quoting behavior. For example, when quote.sql.identifiers=never, the connector never uses quotes within any SQL DDL or DML statement it generates. The default for this property is always.

Note that SQL standards define databases to be case insensitive for identifiers and keywords unless they are quoted. What this means is that CREATE TABLE test_case creates a table named TEST_CASE and CREATE TABLE "test_case" creates a table named test_case.

参考

For additional information about identifier quoting, see Database Identifiers, Quoting, and Case Sensitivity.

License

This connector is available under the Confluent Community License.

Configuration Properties

For a complete list of configuration properties for this connector, see JDBC Sink Connector 構成プロパティ.

Quick Start

To see the basic functionality of the connector, we'll be copying Avro data from a single topic to a local SQLite database.

Prerequisites

  • Confluent Platform is installed and services are running by using the Confluent CLI. This quick start assumes that you are using the Confluent CLI, but standalone installations are also supported. By default ZooKeeper, Kafka, スキーマレジストリ, Kafka Connect REST API, and Kafka Connect are started with the confluent local services start command. For more information, see Confluent Platform.
  • SQLite is installed. You can also use another database. If you are using another database, be sure to adjust the connection.url setting. Confluent Platform includes JDBC drivers for SQLite and PostgreSQL, but if you're using a different database you must also verify that the JDBC driver is available on the Kafka Connect process's CLASSPATH.
  • Kafka and スキーマレジストリ are running locally on the default ports.

Load the JDBC Sink Connector

Load the predefined JDBC sink connector.

  1. Optional: View the available predefined connectors with this command.

    confluent local services connect connector list
    

    Your output should resemble:

    Bundled Predefined Connectors (edit configuration under etc/):
      elasticsearch-sink
      file-source
      file-sink
      jdbc-source
      jdbc-sink
      hdfs-sink
      s3-sink
    
  2. Load the jdbc-sink connector:

    confluent local services connect connector load jdbc-sink
    

    Your output should resemble:

    {
      "name": "jdbc-sink",
      "config": {
        "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
        "tasks.max": "1",
        "topics": "orders",
        "connection.url": "jdbc:sqlite:test.db",
        "auto.create": "true",
        "name": "jdbc-sink"
      },
      "tasks": [],
      "type": null
    }
    

ちなみに

For non-CLI users, you can load the JDBC sink connector with this command:

<path-to-confluent>/bin/connect-standalone \
<path-to-confluent>/etc/schema-registry/connect-avro-standalone.properties \
<path-to-confluent>/etc/kafka-connect-jdbc/sink-quickstart-sqlite.properties

Produce a Record in SQLite

  1. Produce a record into the orders topic.

     ./bin/kafka-avro-console-producer \
    --broker-list localhost:9092 --topic orders \
    --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"id","type":"int"},{"name":"product", "type": "string"}, {"name":"quantity", "type": "int"}, {"name":"price",
    "type": "float"}]}'
    

    The console producer waits for input.

  2. Copy and paste the following record into the terminal and press Enter:

    {"id": 999, "product": "foo", "quantity": 100, "price": 50}
    
  3. Query the SQLite database and you should see that the orders table was automatically created and contains the record.

    sqlite3 test.db
    sqlite> SELECT * from orders;
    foo|50.0|100|999
    

Database Considerations

Note the following issues to keep in mind.

  1. String type is mapped to CLOB when auto.create=true. For example, if you have the following Avro schema:

    {
      "connect.name": "ksql.ratings",
      "fields": [
        {
          "name": "rating_id",
          "type": "long"
        },
        {
          "name": "user_id",
          "type": "int"
        },
        ...
        {
          "name": "channel",
          "type": "string"
        },
        {
          "name": "message",
          "type": "string"
        }
      ],
      "name": "ratings",
      "namespace": "ksql",
      "type": "record"
    }
    

    The values are mapped to CLOB in the table schema:

    Name        Null?    Type
    ----------- -------- ----------
    rating_id   NOT NULL NUMBER(19)
    user_id     NOT NULL NUMBER(10)
    stars       NOT NULL NUMBER(10)
    route_id    NOT NULL NUMBER(10)
    rating_time NOT NULL NUMBER(19)
    channel     NOT NULL CLOB
    message     NOT NULL CLOB
    

    Since String is mapped to CLOB when auto.create=true, a field using the String type cannot be used as a primary key. If you want to use a String type field as a primary key, you should create a table in the database first and then use auto.create=false. If not, an exception will occur containing the following line:

    ...
    "stringValue": "Exception chain:\njava.sql.SQLException: ORA-02329:
    column of datatype LOB cannot be unique or a primary key
    ...
    
  2. The table name and column names are case sensitive. For example, if you have the following Avro schema:

    {
      "connect.name": "ksql.pageviews",
      "fields": [
        {
          "name": "viewtime",
          "type": "long"
        },
        {
          "name": "userid",
          "type": "string"
        },
        {
          "name": "pageid",
          "type": "string"
        }
      ],
      "name": "pageviews",
      "namespace": "ksql",
      "type": "record"
    }
    

    A table named PAGEVIEWS is created, which causes the exception where pageviews is not found.

    create table pageviews (
      userid VARCHAR(10) NOT NULL PRIMARY KEY,
      pageid VARCHAR(50),
      viewtime VARCHAR(50)
      );
    
    Table PAGEVIEWS created.
    
    DESC pageviews;
    Name     Null?    Type
    -------- -------- ------------
    USERID   NOT NULL VARCHAR2(10)
    PAGEID            VARCHAR2(50)
    VIEWTIME          VARCHAR2(50)
    

    An exception message similar to the following one will be in the DLQ:

    {
      "key": "__connect.errors.exception.message",
      "stringValue": "Table \"pageviews\" is missing and auto-creation
      is disabled"
    }
    

    To resolve this issue, create a table in Oracle Database first and use auto.create=false.

    create table "pageviews" (
      "userid" VARCHAR(10) NOT NULL PRIMARY KEY,
      "pageid" VARCHAR(50),
      "viewtime" VARCHAR(50)
      );
    
    Table "pageviews" created.
    
    
    DESC "pageviews";
    
    Name     Null?    Type
    -------- -------- ------------
    userid   NOT NULL VARCHAR2(10)
    pageid            VARCHAR2(50)
    viewtime          VARCHAR2(50)
    

    注釈

    The SQL standards define databases to be case insensitive for identifiers and keywords unless they are quoted. This means that CREATE TABLE test_case creates a table named TEST_CASE and CREATE TABLE "test_case" creates a table named test_case. This is also true of table column identifiers. For more information about identifier quoting, see Database Identifiers, Quoting, and Case Sensitivity.