The Kafka Connect JDBC Sink connector allows you to export data from Apache Kafka®
topics to any relational database with a JDBC driver. This connector can support
a wide variety of databases. The connector polls data from Kafka to write to the
database based on the topics subscription. It is possible to achieve idempotent
writes with upserts. Auto-creation of tables and limited auto-evolution is also
The JDBC Sink Connector includes the following features:
At least once delivery
This connector guarantees that records are delivered at least once from the Kafka
The JDBC Sink Connector supports running one or more tasks. You can specify
the number of tasks in the
tasks.max configuration parameter. This can lead
to huge performance gains when multiple files need to be parsed.
The sink connector requires knowledge of schemas, so you should use a suitable
converter, for example, the Avro converter that comes with Schema Registry, or the JSON converter
with schemas enabled. Kafka record keys if present can be primitive types or a
Connect struct, and the record value must be a Connect struct. Fields being
selected from Connect structs must be of primitive types. If the data in the
topic is not of a compatible format, implementing a custom
Converter may be
The default is for primary keys to not be extracted with
pk.mode set to
none, which is not suitable for advanced usage such as upsert semantics and
when the connector is responsible for auto-creating the destination table. There
are different modes that enable to use fields from the Kafka record key, the Kafka
record value, or the Kafka coordinates for the record.
Refer to primary key configuration options for
The connector can delete rows in a database table when it consumes a tombstone
record, which is a Kafka record that has a non-null key and a null value. This
behavior is disabled by default, meaning that any tombstone records will result
in a failure of the connector, making it easy to upgrade the JDBC connector and
keep prior behavior.
Deletes can be enabled with
delete.enabled=true, but only when the
pk.mode is set to
record_key. This is because deleting a row from the
table requires the primary key be used as criteria.
Enabling delete mode does not affect the
insert. If it is configured as
the connector will use upsert semantics rather than plain
Upsert semantics refer to atomically adding a new row or updating the existing
row if there is a primary key constraint violation, which provides idempotence.
If there are failures, the Kafka offset used for recovery may not be up-to-date
with what was committed as of the time of the failure, which can lead to
re-processing during recovery. The upsert mode is highly recommended as it helps
avoid constraint violations or duplicate data if records need to be
Aside from failure recovery, the source topic may also naturally contain
multiple records over time with the same primary key, making upserts desirable.
As there is no standard syntax for upsert, the following table describes the
database-specific DML that is used.
INSERT .. ON DUPLICATE KEY UPDATE ..
INSERT .. ON CONFLICT .. DO UPDATE SET ..
INSERT OR REPLACE ..
Auto-creation and auto-evolution
Ensure the JDBC user has the appropriate permissions for DDL.
auto.create is enabled, the connector can CREATE the destination table
if it is found to be missing. The creation takes place online with records being
consumed from the topic, since the connector uses the record schema as a basis
for the table definition. Primary keys are specified based on the key
auto.evolve is enabled, the connector can perform limited auto-evolution
by issuing ALTER on the destination table when it encounters a record for
which a column is found to be missing. Since data-type changes and removal of
columns can be dangerous, the connector does not attempt to perform such
evolutions on the table. Addition of primary key constraints is also not
attempted. In contrast, if
auto.evolve is disabled no evolution is performed
and the connector task fails with an error stating the missing columns.
For both auto-creation and auto-evolution, the nullability of a column is based
on the optionality of the corresponding field in the schema, and default values
are also specified based on the default value of the corresponding field if
applicable. We use the following mapping from Connect schema types to
Auto-creation or auto-evolution is not supported for databases not mentioned here.
For backwards-compatible table schema evolution, new fields in record
schemas must be optional or have a default value. If you need to delete a
field, the table schema should be manually altered to either drop the
corresponding column, assign it a default value, or make it nullable.
Identifier quoting and case sensitivity
When this connector consumes a record and the referenced database table does not
exist or is a missing columns, it can issue a
CREATE TABLE or
TABLE statement to create a table or add columns. The ability for the
connector to create a table or add columns depends on how you set the
auto.evolve DDL support properties.
CREATE TABLE and
ALTER TABLE use the topic name for a
missing table and the record schema field name for a missing column. Also by
default, these statements attempt to preserve the case of the names by quoting
the table and column names.
You can use the
quote.sql.identifiers configuration to control the quoting
behavior. For example, when
quote.sql.identifiers=never, the connector never
uses quotes within any SQL DDL or DML statement it generates. The default for
this property is
Note that SQL standards define databases to be case insensitive for identifiers
and keywords unless they are quoted. What this means is that
test_case creates a table named
CREATE TABLE "test_case"
creates a table named