The Kafka Connect JDBC Sink connector allows you to export data from Apache Kafka®
topics to any relational database with a JDBC driver. This connector can support
a wide variety of databases. The connector polls data from Kafka to write to the
database based on the topics subscription. It is possible to achieve idempotent
writes with upserts. Auto-creation of tables and limited auto-evolution is also
supported.
Features
The JDBC Sink Connector includes the following features:
At least once delivery
This connector guarantees that records are delivered at least once from the Kafka
topic.
Data mapping
The sink connector requires knowledge of schemas, so you should use a suitable
converter, for example, the Avro converter that comes with Schema Registry, or the JSON converter
with schemas enabled. Kafka record keys if present can be primitive types or a
Connect struct, and the record value must be a Connect struct. Fields being
selected from Connect structs must be of primitive types. If the data in the
topic is not of a compatible format, implementing a custom Converter
may be
necessary.
Key handling
The default is for primary keys to not be extracted with pk.mode
set to
none, which is not suitable for advanced usage such as upsert semantics and
when the connector is responsible for auto-creating the destination table. There
are different modes that enable to use fields from the Kafka record key, the Kafka
record value, or the Kafka coordinates for the record.
Refer to primary key configuration options for
further detail.
Delete mode
The connector can delete rows in a database table when it consumes a tombstone
record, which is a Kafka record that has a non-null key and a null value. This
behavior is disabled by default, meaning that any tombstone records will result
in a failure of the connector, making it easy to upgrade the JDBC connector and
keep prior behavior.
Deletes can be enabled with delete.enabled=true
, but only when the
pk.mode
is set to record_key
. This is because deleting a row from the
table requires the primary key be used as criteria.
Enabling delete mode does not affect the insert.mode
.
Idempotent writes
The default insert.mode
is insert
. If it is configured as upsert
,
the connector will use upsert semantics rather than plain INSERT
statements.
Upsert semantics refer to atomically adding a new row or updating the existing
row if there is a primary key constraint violation, which provides idempotence.
If there are failures, the Kafka offset used for recovery may not be up-to-date
with what was committed as of the time of the failure, which can lead to
re-processing during recovery. The upsert mode is highly recommended as it helps
avoid constraint violations or duplicate data if records need to be
re-processed.
Aside from failure recovery, the source topic may also naturally contain
multiple records over time with the same primary key, making upserts desirable.
As there is no standard syntax for upsert, the following table describes the
database-specific DML that is used.
Database |
Upsert style |
MySQL |
INSERT .. ON DUPLICATE KEY UPDATE .. |
Oracle |
MERGE .. |
PostgreSQL |
INSERT .. ON CONFLICT .. DO UPDATE SET .. |
SQLite |
INSERT OR REPLACE .. |
SQL Server |
MERGE .. |
Other |
not supported |
Auto-creation and auto-evolution
Tip
Ensure the JDBC user has the appropriate permissions for DDL.
If auto.create
is enabled, the connector can CREATE the destination table
if it is found to be missing. The creation takes place online with records being
consumed from the topic, since the connector uses the record schema as a basis
for the table definition. Primary keys are specified based on the key
configuration settings.
If auto.evolve
is enabled, the connector can perform limited auto-evolution
by issuing ALTER on the destination table when it encounters a record for
which a column is found to be missing. Since data-type changes and removal of
columns can be dangerous, the connector does not attempt to perform such
evolutions on the table. Addition of primary key constraints is also not
attempted. In contrast, if auto.evolve
is disabled no evolution is performed
and the connector task fails with an error stating the missing columns.
For both auto-creation and auto-evolution, the nullability of a column is based
on the optionality of the corresponding field in the schema, and default values
are also specified based on the default value of the corresponding field if
applicable. We use the following mapping from Connect schema types to
database-specific types:
Schema Type |
MySQL |
Oracle |
PostgreSQL |
SQLite |
SQL Server |
Vertica |
INT8 |
TINYINT |
NUMBER(3,0) |
SMALLINT |
NUMERIC |
TINYINT |
INT |
INT16 |
SMALLINT |
NUMBER(5,0) |
SMALLINT |
NUMERIC |
SMALLINT |
INT |
INT32 |
INT |
NUMBER(10,0) |
INT |
NUMERIC |
INT |
INT |
INT64 |
BIGINT |
NUMBER(19,0) |
BIGINT |
NUMERIC |
BIGINT |
INT |
FLOAT32 |
FLOAT |
BINARY_FLOAT |
REAL |
REAL |
REAL |
FLOAT |
FLOAT64 |
DOUBLE |
BINARY_DOUBLE |
DOUBLE PRECISION |
REAL |
FLOAT |
FLOAT |
BOOLEAN |
TINYINT |
NUMBER(1,0) |
BOOLEAN |
NUMERIC |
BIT |
BOOLEAN |
STRING |
VARCHAR(256) |
NCLOB |
TEXT |
TEXT |
VARCHAR(MAX) |
VARCHAR(1024) |
BYTES |
VARBINARY(1024) |
BLOB |
BYTEA |
BLOB |
VARBINARY(MAX) |
VARBINARY(1024) |
‘Decimal’ |
DECIMAL(65,s) |
NUMBER(*,s) |
DECIMAL |
NUMERIC |
DECIMAL(38,s) |
DECIMAL(18,s) |
‘Date’ |
DATE |
DATE |
DATE |
NUMERIC |
DATE |
DATE |
‘Time’ |
TIME(3) |
DATE |
TIME |
NUMERIC |
TIME |
TIME |
‘Timestamp’ |
TIMESTAMP(3) |
TIMESTAMP |
TIMESTAMP |
NUMERIC |
DATETIME2 |
TIMESTAMP |
Auto-creation or auto-evolution is not supported for databases not mentioned here.
Important
For backwards-compatible table schema evolution, new fields in record
schemas must be optional or have a default value. If you need to delete a
field, the table schema should be manually altered to either drop the
corresponding column, assign it a default value, or make it nullable.
Identifier quoting and case sensitivity
When this connector consumes a record and the referenced database table does not
exist or is a missing columns, it can issue a CREATE TABLE
or ALTER
TABLE
statement to create a table or add columns. The ability for the
connector to create a table or add columns depends on how you set the
auto.create
and auto.evolve
DDL support properties.
By default, CREATE TABLE
and ALTER TABLE
use the topic name for a
missing table and the record schema field name for a missing column. Also by
default, these statements attempt to preserve the case of the names by quoting
the table and column names.
You can use the quote.sql.identifiers
configuration to control the quoting
behavior. For example, when quote.sql.identifiers=never
, the connector never
uses quotes within any SQL DDL or DML statement it generates. The default for
this property is always
.
Note that SQL standards define databases to be case insensitive for identifiers
and keywords unless they are quoted. What this means is that CREATE TABLE
test_case
creates a table named TEST_CASE
and CREATE TABLE "test_case"
creates a table named test_case
.