JDBC Sink Connector for Confluent Platform¶
The Kafka Connect JDBC Sink connector allows you to export data from Apache Kafka® topics to any relational database with a JDBC driver. This connector can support a wide variety of databases. The connector polls data from Kafka to write to the database based on the topics subscription. It is possible to achieve idempotent writes with upserts. Auto-creation of tables and limited auto-evolution is also supported.
Features¶
The JDBC Sink connector includes the following features:
- At least once delivery
- Dead Letter Queue
- Multiple tasks
- Data mapping
- Key handling
- Delete mode
- Idempotent writes
- Auto-creation and auto-evolution
- Identifier quoting and case sensitivity
- Table partitioning
- Custom Credentials Provider Support
At least once delivery¶
This connector guarantees that records are delivered at least once from the Kafka topic.
Dead Letter Queue¶
This connector supports the Dead Letter Queue (DLQ) functionality. Users using this feature should note the following:
- The
max.retries
configuration property determines how many times the JDBC Sink connector will try to insert the data before it unwraps the batch and sends the errant record to DLQ. Note that this retry only happens if it is anSQLException
. If it is an exception while creating or altering tables, the connector will not retry but will skip to unwrap the batch and send errant records to DLQ. - The
connection.attempts
property determines how many times the connector will attempt to connect to the database before the task is killed–that is, no records are sent to the DLQ.
For more information about accessing and using the DLQ, see Confluent Platform Dead Letter Queue.
Multiple tasks¶
The JDBC Sink connector supports running one or more tasks. You can specify the
number of tasks in the tasks.max
configuration parameter. This can lead to
performance gains when multiple files need to be parsed.
Data mapping¶
The sink connector requires knowledge of schemas, so you should use a suitable
converter. For example, the Avro converter that comes with Schema Registry, the JSON
converter with schemas enabled, or the Protobuf converter. Kafka record keys, if
present, can be primitive types or a Connect struct, and the record value
must be a Connect struct. Fields being selected from Connect structs must
be of primitive types. If the data in the topic is not of a compatible format,
implementing a custom Converter
may be necessary.
Key handling¶
The default is for primary keys to not be extracted with pk.mode
set to
none, which is not suitable for advanced usage such as upsert semantics and
when the connector is responsible for auto-creating the destination table. There
are different modes that enable to use fields from the Kafka record key, the Kafka
record value, or the Kafka coordinates for the record.
Refer to primary key configuration options for further detail.
Delete mode¶
The connector can delete rows in a database table when it consumes a tombstone record, which is a Kafka record that has a non-null key and a null value. This behavior is disabled by default, meaning that any tombstone records will result in a failure of the connector, making it easy to upgrade the JDBC connector and keep prior behavior.
Deletes can be enabled with delete.enabled=true
, but only when the
pk.mode
is set to record_key
. This is because deleting a row from the
table requires the primary key be used as criteria.
Enabling delete mode does not affect the insert.mode
.
Idempotent writes¶
The default insert.mode
is insert
. If it is configured as upsert
,
the connector will use upsert semantics rather than plain INSERT
statements.
Upsert semantics refer to atomically adding a new row or updating the existing
row if there is a primary key constraint violation, which provides idempotence.
If there are failures, the Kafka offset used for recovery may not be up-to-date with what was committed as of the time of the failure, which can lead to re-processing during recovery. The upsert mode is highly recommended as it helps avoid constraint violations or duplicate data if records need to be re-processed.
It is important to note that when a target table includes columns with
CLOB
, INSERT
or UPSERT
performance may be degraded. Try to use
VARCHAR
or VARCHAR2
instead.
Aside from failure recovery, the source topic may also naturally contain multiple records over time with the same primary key, making upserts desirable.
As there is no standard syntax for upsert, the following table describes the database-specific DML (dialect) that is used.
Database | Upsert style |
---|---|
MySQL | INSERT .. ON DUPLICATE KEY UPDATE .. |
Oracle | MERGE .. |
PostgreSQL | INSERT .. ON CONFLICT .. DO UPDATE SET .. |
SQLite | INSERT OR REPLACE .. |
SQL Server | MERGE .. |
Sybase | MERGE .. |
Auto-creation and auto-evolution¶
For auto-creation and auto-evolution, Confluent recommends you ensure the JDBC user has the appropriate permissions for DDL.
If auto.create
is enabled, the connector can CREATE the destination table
if it is found to be missing. The creation takes place online with records being
consumed from the topic, since the connector uses the record schema as a basis
for the table definition. Primary keys are specified based on the key
configuration settings.
If auto.evolve
is enabled, the connector can perform limited auto-evolution
by issuing ALTER on the destination table when it encounters a record for
which a column is found to be missing. Since data-type changes and removal of
columns can be dangerous, the connector does not attempt to perform such
evolutions on the table. Addition of primary key constraints is also not
attempted. In contrast, if auto.evolve
is disabled no evolution is performed
and the connector task fails with an error stating the missing columns.
For both auto-creation and auto-evolution, the nullability of a column is based on the optionality of the corresponding field in the schema, and default values are also specified based on the default value of the corresponding field if applicable. We use the following mapping from Connect schema types to database-specific types:
Schema Type | MySQL | Oracle | PostgreSQL | SQLite | SQL Server | Vertica |
---|---|---|---|---|---|---|
INT8 | TINYINT | NUMBER(3,0) | SMALLINT | NUMERIC | TINYINT | INT |
INT16 | SMALLINT | NUMBER(5,0) | SMALLINT | NUMERIC | SMALLINT | INT |
INT32 | INT | NUMBER(10,0) | INT | NUMERIC | INT | INT |
INT64 | BIGINT | NUMBER(19,0) | BIGINT | NUMERIC | BIGINT | INT |
FLOAT32 | FLOAT | BINARY_FLOAT | REAL | REAL | REAL | FLOAT |
FLOAT64 | DOUBLE | BINARY_DOUBLE | DOUBLE PRECISION | REAL | FLOAT | FLOAT |
BOOLEAN | TINYINT | NUMBER(1,0) | BOOLEAN | NUMERIC | BIT | BOOLEAN |
STRING | TEXT | NCLOB | TEXT | TEXT | VARCHAR(MAX) | VARCHAR(1024) |
BYTES | VARBINARY(1024) | BLOB | BYTEA | BLOB | VARBINARY(MAX) | VARBINARY(1024) |
‘Decimal’ | DECIMAL(65,s) | NUMBER(*,s) | DECIMAL | NUMERIC | DECIMAL(38,s) | DECIMAL(18,s) |
‘Date’ | DATE | DATE | DATE | NUMERIC | DATE | DATE |
‘Time’ | TIME(3) | DATE | TIME | NUMERIC | TIME | TIME |
‘Timestamp’ | TIMESTAMP(3) | TIMESTAMP | TIMESTAMP | NUMERIC | DATETIME2 | TIMESTAMP |
Auto-creation and auto-evolution are not supported for databases not mentioned here. Also, it is important to note that for backward-compatible table schema evolution, new fields in record schemas must be optional or have a default value. If you need to delete a field, the table schema should be manually altered to either drop the corresponding column, assign it a default value, or make it nullable.
Identifier quoting and case sensitivity¶
When this connector consumes a record and the referenced database table does not
exist or is a missing columns, it can issue a CREATE TABLE
or ALTER
TABLE
statement to create a table or add columns. The ability for the
connector to create a table or add columns depends on how you set the
auto.create
and auto.evolve
DDL support properties.
By default, CREATE TABLE
and ALTER TABLE
use the topic name for a
missing table and the record schema field name for a missing column. Also by
default, these statements attempt to preserve the case of the names by quoting
the table and column names.
You can use the quote.sql.identifiers
configuration to control the quoting
behavior. For example, when quote.sql.identifiers=never
, the connector never
uses quotes within any SQL DDL or DML statement it generates. The default for
this property is always
.
Note that SQL standards define databases to be case insensitive for identifiers
and keywords unless they are quoted. What this means is that CREATE TABLE
test_case
creates a table named TEST_CASE
and CREATE TABLE "test_case"
creates a table named test_case
.
For more information about identifier quoting, see Database Identifiers, Quoting, and Case Sensitivity.
Table partitioning¶
Important
This section is only applicable to Postgres Dialect.
Versions 10.0.0, 10.0.1, 10.0.2, and 10.1.0 of this connector support the
PARTITIONED TABLE
table type out of the box. All other versions of this
connector do not support the PARTITIONED TABLE
table type, with the
exception of versions 10.6.0 and later. For versions 10.6.0 and later, you must
specify “PARTITIONED TABLE” as a comma-separated value in the table.types
parameter–for example, "table.types": "PARTITIONED TABLE,TABLE"
. For more
details about configuring table types, see the table.type configuration property.
Table truncation¶
When writing to a PostgreSQL database, the JDBC Sink connector shortens the
names of tables it writes to if it determines that the table names exceed the
maximum-permitted length for the database. For example, with
the default settings for PostgreSQL 14,
the maximum length of a table is 63 bytes. If you configure the connector with a
table.name.format
and a Kafka topic that when combined exceeds 63 characters,
the connector will only use first 63 characters of that table name.
Additionally, if the table name is truncated, and the connector receives records from different upstream topics, the records will map to the same table name after truncation takes place. This may result in table name collision. Confluent recommends you avoid running the connector with very long Kafka topic names and table names.
Table parsing¶
This section describes how the JDBC Sink connector parses table information from topic names.
To begin, the connector splits the topic name on any table delimiter character that is not quoted.
For example, with a delimiter of .
and an opening and
closing quote of "
, the topic name foo.bar
would be split into the parts
foo
and bar
, while the topic name "foo.bar"
would be parsed as
foo.bar
. The delimiting and quoting characters the connector uses are
dialect-dependent; the characters from the example are common but not every
dialect uses them.
After parsing the topic name, the connector reads information from those parts depending on how many are available and which dialect is in use:
- If there are no parts, the connector fails.
- If there is only one part, it is used as the table name, and no catalog or schema is used.
- If there are three parts, the first part is used as the catalog, the second part is used as the schema, and the third is used as the table name.
Otherwise, the first part is used as either the catalog or schema (depending on the dialect in use), the second part is used as the table name, and no catalog is used.
Custom Credentials Provider Support¶
You can configure the JDBC connector to use a custom credentials provider, instead of the default one provided in the connector. To do this, you implement a custom credentials provider, build it as a JAR file, and deploy the JAR file to use the custom provider.
Complete the following steps to use a custom credentials provider:
- Set a custom credentials provider class: Set the
jdbc.credentials.provider.class
property to a class that implements theio.confluent.connect.jdbc.util.JdbcCredentialsProvider
interface. Configure the class to the fully qualified name of your custom credentials provider class. - Configure additional settings (Optional): For additional configuration, prefix the configuration keys with
jdbc.credentials.provider.
If your custom credentials provider needs to accept additional configuration, implement theorg.apache.kafka.common.Configurable
interface that lets the connector receive configurations that are prefixed withjdbc.credentials.provider.
. - Ensure a public no-args constructor: Your custom credentials provider class must have a public no-argument constructor. This is necessary because the connector creates an instance of the provider using this constructor.
- Package your provider: Once your custom credentials provider class is implemented, package it into a JAR file.
- Copy the JAR file to Connect Worker: Copy the built JAR file to the
share/java/kafka-connect-jdbc
directory on all Connect workers. This step ensures that the JDBC connector can access and use your custom credentials provider.
Limitations¶
If you configure the JDBC Sink connector to upsert into an Oracle Database with
a fixed-length CHAR
data type as the primary key, the upsert will fail with
the following error:
ORA-00001: unique constraint ($PK_FIELD) violated
To circumvent this limitation, use VARCHAR2
as the primary key data type.
License¶
This connector is available under the Confluent Community License.
Configuration properties¶
For a complete list of configuration properties for this connector, see Configuration Reference for JDBC Sink Connector for Confluent Platform.
For an example of how to get Kafka Connect connected to Confluent Cloud, see Connect Self-Managed Kafka Connect to Confluent Cloud.
JSON and JSONB for PostgreSQL¶
PostgreSQL supports storing table data as JSON or JSONB (JSON binary format). Both the JDBC Source and Sink connectors support sourcing from or sinking to PostgreSQL tables containing data stored as JSON or JSONB.
The JDBC Source connector stores JSON or JSONB as STRING type in Kafka. For the JDBC Sink connector, JSON or JSONB should be stored as STRING type in Kafka and matching columns should be defined as JSON or JSONB in PostgreSQL.
Install the JDBC Sink connector¶
The JDBC Sink connector is no longer bundled with Confluent Platform, so you must install it manually.
Prerequisites¶
- Confluent Platform. If you want to install the connector using Confluent Hub, you must install the Confluent Hub Client. This is installed by default with Confluent Enterprise.
- SQLite is installed. You can also use
another database. If you are using another database, be sure to adjust the
connection.url
setting. Confluent Platform includes JDBC drivers for SQLite and PostgreSQL, but if you’re using a different database you must also verify that the JDBC driver is available on the Kafka Connect process’sCLASSPATH
. - Kafka and Schema Registry are running locally on the default ports.
Note that if you are running a multi-node Connect cluster, the JDBC connector and JDBC driver JARs must be installed on every Connect worker in the cluster. For more information, see JDBC Connector Drivers for Confluent Platform.
Install the connector using the Confluent CLI¶
To install the latest
connector version using Confluent Hub Client, navigate to your Confluent Platform installation directory
and run the following command:
confluent connect plugin install confluentinc/kafka-connect-jdbc:latest
You can install a specific version by replacing latest
with a version number
as shown in the following example:
confluent connect plugin install confluentinc/kafka-connect-jdbc:10.8.0
Install the connector manually¶
Download and extract the ZIP file for your connector and then follow the manual connector installation instructions.
Quick start¶
To see the basic functionality of the connector, this quick start copies Avro data from a single topic to a local SQLite database.
For an example of how to get Kafka Connect connected to Confluent Cloud, see Connect Self-Managed Kafka Connect to Confluent Cloud.
Load the JDBC Sink connector¶
Load the JDBC Sink connector using the following command (assuming you installed the connector using Confluent Hub):
confluent local services connect connector load jdbc-sink --config $CONFLUENT_HOME/share/confluent-hub-components/confluentinc-kafka-connect-jdbc/etc/sink-quickstart-sqlite.properties
If you installed the connector manually, the --config
flag will be different.
Produce a record in SQLite¶
Produce a record into the
orders
topic../bin/kafka-avro-console-producer \ --broker-list localhost:9092 --topic orders \ --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"id","type":"int"},{"name":"product", "type": "string"}, {"name":"quantity", "type": "int"}, {"name":"price", "type": "float"}]}'
The console producer waits for input.
Copy and paste the following record into the terminal and press Enter:
{"id": 999, "product": "foo", "quantity": 100, "price": 50}
Query the SQLite database and you should see that the
orders
table was created and contains the record.sqlite3 test.db sqlite> SELECT * from orders; foo|50.0|100|999
Database considerations¶
Note the following issues to keep in mind.
String type is mapped to CLOB when
auto.create=true
. For example, if you have the following Avro schema:{ "connect.name": "ksql.ratings", "fields": [ { "name": "rating_id", "type": "long" }, { "name": "user_id", "type": "int" }, ... { "name": "channel", "type": "string" }, { "name": "message", "type": "string" } ], "name": "ratings", "namespace": "ksql", "type": "record" }
The values are mapped to CLOB in the table schema:
Name Null? Type ----------- -------- ---------- rating_id NOT NULL NUMBER(19) user_id NOT NULL NUMBER(10) stars NOT NULL NUMBER(10) route_id NOT NULL NUMBER(10) rating_time NOT NULL NUMBER(19) channel NOT NULL CLOB message NOT NULL CLOB
Since String is mapped to CLOB when
auto.create=true
, a field using the String type cannot be used as a primary key. If you want to use a String type field as a primary key, you should create a table in the database first and then useauto.create=false
. If not, an exception will occur containing the following line:... "stringValue": "Exception chain:\njava.sql.SQLException: ORA-02329: column of datatype LOB cannot be unique or a primary key ...
The table name and column names are case sensitive. For example, if you have the following Avro schema:
{ "connect.name": "ksql.pageviews", "fields": [ { "name": "viewtime", "type": "long" }, { "name": "userid", "type": "string" }, { "name": "pageid", "type": "string" } ], "name": "pageviews", "namespace": "ksql", "type": "record" }
A table named
PAGEVIEWS
is created, which causes the exception wherepageviews
is not found.create table pageviews ( userid VARCHAR(10) NOT NULL PRIMARY KEY, pageid VARCHAR(50), viewtime VARCHAR(50) ); Table PAGEVIEWS created. DESC pageviews; Name Null? Type -------- -------- ------------ USERID NOT NULL VARCHAR2(10) PAGEID VARCHAR2(50) VIEWTIME VARCHAR2(50)
An exception message similar to the following one will be in the DLQ:
{ "key": "__connect.errors.exception.message", "stringValue": "Table \"pageviews\" is missing and auto-creation is disabled" }
To resolve this issue, create a table in Oracle Database first and use
auto.create=false
.create table "pageviews" ( "userid" VARCHAR(10) NOT NULL PRIMARY KEY, "pageid" VARCHAR(50), "viewtime" VARCHAR(50) ); Table "pageviews" created. DESC "pageviews"; Name Null? Type -------- -------- ------------ userid NOT NULL VARCHAR2(10) pageid VARCHAR2(50) viewtime VARCHAR2(50)
Note
The SQL standards define databases to be case insensitive for identifiers and keywords unless they are quoted. This means that
CREATE TABLE test_case
creates a table namedTEST_CASE
andCREATE TABLE "test_case"
creates a table namedtest_case
. This is also true of table column identifiers. For more information about identifier quoting, see Database Identifiers, Quoting, and Case Sensitivity.