Configuration Reference for JDBC Sink Connector for Confluent Platform¶
To use this connector, specify the name of the connector class in the connector.class
configuration property.
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
Connector-specific configuration properties are described below.
Database Connection Security¶
In the connector configuration you will notice there are no security parameters. This is because
SSL is not part of the JDBC standard and will depend on the JDBC driver in use. In general, you
must configure SSL using the connection.url
parameter. For example, with MySQL it would
look similar to the following:
connection.url="jdbc:mysql://127.0.0.1:3306/sample?verifyServerCertificate=false&useSSL=true&requireSSL=true"
Confluent recommends you view your JDBC driver documentation for support and configuration.
Connection¶
The JDBC connector allows you to configure any driver parameter using the prefix
connection.*
. For example:
For Postgres, you can use the following:
"connection.loginTimeout": "10"
For Oracle, you can use the following:
"connection.oracle.jdbc.ReadTimeout": "45000"
For more details, log in to the Confluent Support Portal and view How to set JDBC driver configuration using JDBC connector.
connection.attempts
The maximum number of attempts to get a valid JDBC connection. The value must be a positive integer.
- Type: int
- Default: 3
- Importance: low
connection.backoff.ms
The backoff time in milliseconds between connection attempts.
- Type: long
- Default: 10000
- Importance: low
connection.url
JDBC connection URL.
For example:
jdbc:oracle:thin:@localhost:1521:orclpdb1
,jdbc:mysql://localhost/db_name
,jdbc:sqlserver://localhost;instance=SQLEXPRESS;databaseName=db_name
- Type: string
- Importance: high
connection.user
JDBC connection user.
- Type: string
- Default: null
- Importance: high
connection.password
JDBC connection password.
- Type: password
- Default: null
- Importance: high
jdbc.credentials.provider.class
Credentials provider to use for authentication to configure the database. By default, the connector uses
DefaultJdbcCredentialsProvider
. Configure the class with the fully qualified name of your custom credentials provider class.- Type: class
- Default:
io.confluent.connect.jdbc.util.DefaultJdbcCredentialsProvider
- Valid Values: Any class implementing interface:
io.confluent.connect.jdbc.util.JdbcCredentialsProvider
- Importance: low
dialect.name
The name of the database dialect that should be used for this connector. By default this is empty, and the connector automatically determines the dialect based upon the JDBC connection URL. Use this if you want to override that behavior and use a specific dialect. All properly-packaged dialects in the JDBC connector plugin can be used.
- Type: string
- Default: “”
- Valid Values: [, Db2DatabaseDialect, MySqlDatabaseDialect, SybaseDatabaseDialect, GenericDatabaseDialect, OracleDatabaseDialect, SqlServerDatabaseDialect, PostgreSqlDatabaseDialect, SqliteDatabaseDialect, DerbyDatabaseDialect, SapHanaDatabaseDialect, MockDatabaseDialect, VerticaDatabaseDialect]
- Importance: low
Writes¶
insert.mode
The insertion mode to use.
- Type: string
- Default: insert
- Valid Values: [insert, upsert, update]
- Importance: high
The supported modes are as follows:
insert
Use standard SQL
INSERT
statements.upsert
Use the appropriate upsert semantics for the target database if it is supported by the connector–for example,
INSERT OR IGNORE
. When usingupsert
mode, you must add and define thepk.mode
andpk.fields
properties in the connector configuration. For example:{ ... "pk.mode": "record_value", "pk.fields": "id" ... }
In the previous example,
pk.fields
should contain your primary key.update
Use the appropriate update semantics for the target database if it is supported by the connector–for example,
UPDATE
.
batch.size
Specifies how many records to attempt to batch together for insertion into the destination table, when possible. Note that if you set
consumer.max.poll.records
in the Connect worker properties to a value lower thanbatch.size
, batch processing will be lost and the desiredbatch.size
won’t be reached. You can also configure the connector’s underlying consumer’smax.poll.records
usingconsumer.override.max.poll.records
in the connector configuration.- Type: int
- Default: 3000
- Valid Values: [0,…]
- Importance: medium
delete.enabled
Whether to treat
null
record values as deletes. Requirespk.mode
to berecord_key
.- Type: boolean
- Default: false
- Importance: medium
Data Mapping¶
table.name.format
A format string for the destination table name, which may contain
${topic}
as a placeholder for the originating topic name.For example,
kafka_${topic}
for the topicorders
will map to the table namekafka_orders
.- Type: string
- Default:
${topic}
- Importance: medium
pk.mode
The primary key mode, also refer to
pk.fields
documentation for interplay. Supported modes are:none
- No keys utilized.
kafka
Apache Kafka® coordinates are used as the primary key.
Important
With some JDBC dialects, for example the Oracle and MySQL dialects, an exception can occur if you set
pk.mode
tokafka
andauto.create
totrue
. The exception occurs because the connector maps STRING to a variable length string (for example TEXT) and not a fixed length string (for example VARCHAR(256)). A primary key must have a fixed length. To avoid this exception, consider the following:- Do not set
auto.create
totrue
. - Create the database table and primary key data type in advance.
- Do not set
record_key
- Field(s) from the record key are used, which may be a primitive or a struct.
record_value
- Field(s) from the record value are used, which must be a struct.
- Type: string
- Default: none
- Valid Values: [none, kafka, record_key, record_value]
- Importance: high
pk.fields
A list of comma-separated primary key field names. The runtime interpretation of this configuration property depends on
pk.mode
:Important
If the
pk.mode
is set torecord_value
, everypk.fields
value must exist in every topic when loading data from different topics into different tables –that is, if multiple topics have their own primary key. If not, you must create distinct connector configurations.none
- Ignored as no fields are used as primary key in this mode.
kafka
- Must be a trio representing the Kafka coordinates, defaults to
__connect_topic,__connect_partition,__connect_offset
if empty. Custom field names that are set in this mode will rename the default column names, but keep the Kafka coordinates as the primary keys. record_key
- If empty, all fields from the key struct will be used, otherwise used to extract the desired fields - for primitive key only a single field name must be configured.
record_value
- If empty, all fields from the value struct will be used, otherwise used to extract the desired fields.
- Type: list
- Default: none
- Importance: medium
fields.whitelist
List of comma-separated record value field names. If empty, all fields from the record value are utilized, otherwise used to filter to the desired fields.
Note that
pk.fields
is applied independently in the context of which field(s) form the primary key columns in the destination database, while this configuration is applicable for the other columns.- Type: list
- Default: “”
- Importance: medium
db.timezone
Name of the JDBC timezone that should be used in the connector when inserting time-based values. Defaults to UTC.
- Type: string
- Default: “UTC”
- Valid Values: Any valid JDK time zone
- Importance: medium
date.timezone
Name of the JDBC timezone that should be used in the connector when inserting DATE type values. Defaults to DB_TIMEZONE that uses the timezone set for db.timzeone configuration (to maintain backward compatibility). It is recommended to set this to UTC to avoid conversion for DATE type values.
- Type: string
- Default: “DB_TIMEZONE”
- Valid Values: [DB_TIMEZONE, UTC]
- Importance: medium
table.types
A comma-separated list of database table types to which the sink connector can write. The default value is
TABLE
, but any of the following combinations is allowed:TableType.PARTITIONED_TABLE
andTableType.VIEW
. Not all databases support writing to views. If a database supports writing to views, the sink connector will fail if the view definition does not match the records’ schema, regardless of the value that is set inauto.evolve
.- Type: list
- Default:
TABLE
- Valid values:
TABLE
,PARTITIONED TABLE
, orVIEW
- Importance: low
DDL Support¶
auto.create
Whether to automatically create the destination table based on record schema if it is found to be missing by issuing
CREATE
.- Type: boolean
- Default: false
- Importance: medium
Important
- Database performance could be adversely affected if Kafka Connect auto-creates a table and uses data types that are inefficient for the target database. Confluent recommends you review the data types used in conjunction with your database administrator, or pre-create the table before loading it.
- With some JDBC dialects–for example, the Oracle and MySQL dialects–an
exception can occur if you set
pk.mode
tokafka
andauto.create
totrue
. The exception occurs because the connector maps STRING to a variable length string (for example, TEXT) and not a fixed length string (for example, VARCHAR(256)). A primary key must have a fixed length. To avoid this exception, consider the following:- Do not set
auto.create
totrue
. - Create the database table and primary key data type in advance.
- Do not set
auto.evolve
Whether to automatically add columns in the table schema when found to be missing relative to the record schema by issuing
ALTER
.- Type: boolean
- Default: false
- Importance: medium
quote.sql.identifiers
When to quote table names, column names, and other identifiers in SQL statements. For backward compatibility, the default is
always
.- Type: string
- Default: always
- Importance: medium
mssql.use.merge.holdlock
Whether to use HOLDLOCK when performing a MERGE INTO
upsert
statement. Note that this configuration property is specific to Microsoft SQL Server only.- Type: boolean
- Default: true
- Importance: low
Retries¶
max.retries
The maximum number of times to retry on errors before failing the task.
- Type: int
- Default: 10
- Valid Values: [0,…]
- Importance: medium
retry.backoff.ms
The time in milliseconds to wait following an error before a retry attempt is made.
- Type: int
- Default: 3000
- Valid Values: [0,…]
- Importance: medium