Configuration Reference for JDBC Sink Connector for Confluent Platform
To use this connector, specify the name of the connector class in the connector.class configuration property.
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
Connector-specific configuration properties are described below.
Database Connection Security
In the connector configuration you will notice there are no security parameters. This is because
SSL is not part of the JDBC standard and will depend on the JDBC driver in use. In general, you
must configure SSL using the connection.url parameter. For example, with MySQL it would
look similar to the following:
connection.url="jdbc:mysql://127.0.0.1:3306/sample?verifyServerCertificate=false&useSSL=true&requireSSL=true"
Confluent recommends you view your JDBC driver documentation for support and configuration.
Connection
The JDBC connector allows you to configure any driver parameter using the prefix
connection.*. For example:
For Postgres, you can use the following:
"connection.loginTimeout": "10"
For Oracle, you can use the following:
"connection.oracle.jdbc.ReadTimeout": "45000"
For more details, log in to the Confluent Support Portal and view How to set JDBC driver configuration using JDBC connector.
connection.attemptsThe maximum number of attempts to get a valid JDBC connection. The value must be a positive integer.
Type: int
Default: 3
Importance: low
connection.backoff.msThe backoff time in milliseconds between connection attempts.
Type: long
Default: 10000
Importance: low
connection.urlJDBC connection URL.
For example:
jdbc:oracle:thin:@localhost:1521:orclpdb1,jdbc:mysql://localhost/db_name,jdbc:sqlserver://localhost;instance=SQLEXPRESS;databaseName=db_nameType: string
Importance: high
connection.userJDBC connection user.
Type: string
Default: null
Importance: high
connection.passwordJDBC connection password.
Type: password
Default: null
Importance: high
jdbc.credentials.provider.classCredentials provider to use for authentication to configure the database. By default, the connector uses
DefaultJdbcCredentialsProvider. Configure the class with the fully qualified name of your custom credentials provider class.Type: class
Default:
io.confluent.connect.jdbc.util.DefaultJdbcCredentialsProviderValid Values: Any class implementing interface:
io.confluent.connect.jdbc.util.JdbcCredentialsProviderImportance: low
dialect.nameThe name of the database dialect that should be used for this connector. By default this is empty, and the connector automatically determines the dialect based upon the JDBC connection URL. Use this if you want to override that behavior and use a specific dialect. All properly-packaged dialects in the JDBC connector plugin can be used.
Type: string
Default: “”
Valid Values: [, Db2DatabaseDialect, MySqlDatabaseDialect, SybaseDatabaseDialect, GenericDatabaseDialect, OracleDatabaseDialect, SqlServerDatabaseDialect, PostgreSqlDatabaseDialect, SqliteDatabaseDialect, DerbyDatabaseDialect, SapHanaDatabaseDialect, MockDatabaseDialect, VerticaDatabaseDialect]
Importance: low
Writes
insert.modeThe insertion mode to use.
Type: string
Default: insert
Valid Values: [insert, upsert, update]
Importance: high
The supported modes are as follows:
insertUse standard SQL
INSERTstatements.upsertUse the appropriate upsert semantics for the target database if it is supported by the connector–for example,
INSERT OR IGNORE. When usingupsertmode, you must add and define thepk.modeandpk.fieldsproperties in the connector configuration. For example:{ ... "pk.mode": "record_value", "pk.fields": "id" ... }
In the previous example,
pk.fieldsshould contain your primary key.updateUse the appropriate update semantics for the target database if it is supported by the connector–for example,
UPDATE.
batch.sizeSpecifies how many records to attempt to batch together for insertion into the destination table, when possible. Note that if you set
consumer.max.poll.recordsin the Connect worker properties to a value lower thanbatch.size, batch processing will be lost and the desiredbatch.sizewon’t be reached. You can also configure the connector’s underlying consumer’smax.poll.recordsusingconsumer.override.max.poll.recordsin the connector configuration.Type: int
Default: 3000
Valid Values: [0,…]
Importance: medium
delete.enabledWhether to treat
nullrecord values as deletes. Requirespk.modeto berecord_key.Type: boolean
Default: false
Importance: medium
Data Mapping
table.name.formatA format string for the destination table name, which may contain
${topic}as a placeholder for the originating topic name.For example,
kafka_${topic}for the topicorderswill map to the table namekafka_orders.Type: string
Default:
${topic}Importance: medium
pk.modeThe primary key mode, also refer to
pk.fieldsdocumentation for interplay. Supported modes are:noneNo keys utilized.
kafkaApache Kafka® coordinates are used as the primary key.
Important
With some JDBC dialects, for example the Oracle and MySQL dialects, an exception can occur if you set
pk.modetokafkaandauto.createtotrue. The exception occurs because the connector maps STRING to a variable length string (for example TEXT) and not a fixed length string (for example VARCHAR(256)). A primary key must have a fixed length. To avoid this exception, consider the following:Do not set
auto.createtotrue.Create the database table and primary key data type in advance.
record_keyField(s) from the record key are used, which may be a primitive or a struct.
record_valueField(s) from the record value are used, which must be a struct.
Type: string
Default: none
Valid Values: [none, kafka, record_key, record_value]
Importance: high
pk.fieldsA list of comma-separated primary key field names. The runtime interpretation of this configuration property depends on
pk.mode:Important
If the
pk.modeis set torecord_value, everypk.fieldsvalue must exist in every topic when loading data from different topics into different tables –that is, if multiple topics have their own primary key. If not, you must create distinct connector configurations.noneIgnored as no fields are used as primary key in this mode.
kafkaMust be a trio representing the Kafka coordinates, defaults to
__connect_topic,__connect_partition,__connect_offsetif empty. Custom field names that are set in this mode will rename the default column names, but keep the Kafka coordinates as the primary keys.record_keyIf empty, all fields from the key struct will be used, otherwise used to extract the desired fields - for primitive key only a single field name must be configured.
record_valueIf empty, all fields from the value struct will be used, otherwise used to extract the desired fields.
Type: list
Default: none
Importance: medium
fields.whitelistList of comma-separated record value field names. If empty, all fields from the record value are utilized, otherwise used to filter to the desired fields.
Note that
pk.fieldsis applied independently in the context of which field(s) form the primary key columns in the destination database, while this configuration is applicable for the other columns.Type: list
Default: “”
Importance: medium
db.timezoneName of the JDBC timezone that should be used in the connector when inserting time-based values. Defaults to UTC.
Type: string
Default: “UTC”
Valid Values: Any valid JDK time zone
Importance: medium
date.timezoneName of the JDBC timezone that should be used in the connector when inserting DATE type values. Defaults to DB_TIMEZONE that uses the timezone set for db.timzeone configuration (to maintain backward compatibility). It is recommended to set this to UTC to avoid conversion for DATE type values.
Type: string
Default: “DB_TIMEZONE”
Valid Values: [DB_TIMEZONE, UTC]
Importance: medium
timestamp.precision.modeConvert the timestamp with precision. If set to microseconds, the timestamp will be converted to microsecond precision. If set to nanoseconds the timestamp will be converted to nanoseconds precision.
Note that the microsecond and nanosecond precision will be available based on the values supported by timestamp type in the respective databases.
Type: string
Default: “microseconds”
Valid Values: [microseconds, nanoseconds]
Importance: medium
timestamp.fields.listList of comma-separated record value timestamp field names that should be converted to timestamps. These fields will be converted based on the precision mode specified in
timestamp.precision.mode(microseconds or nanoseconds).Note that the timestamp fields included here should be of Long or String type, and nested fields are not supported.
Type: list
Default: “”
Importance: medium
table.typesA comma-separated list of database table types to which the sink connector can write. The default value is
TABLE, but any of the following combinations is allowed:TableType.PARTITIONED_TABLEandTableType.VIEW. Not all databases support writing to views. If a database supports writing to views, the sink connector will fail if the view definition does not match the records’ schema, regardless of the value that is set inauto.evolve.Type: list
Default:
TABLEValid values:
TABLE,PARTITIONED TABLE, orVIEWImportance: low
DDL Support
auto.createWhether to automatically create the destination table based on record schema if it is found to be missing by issuing
CREATE.Type: boolean
Default: false
Importance: medium
Important
Database performance could be adversely affected if Kafka Connect auto-creates a table and uses data types that are inefficient for the target database. Confluent recommends you review the data types used in conjunction with your database administrator, or pre-create the table before loading it.
With some JDBC dialects–for example, the Oracle and MySQL dialects–an exception can occur if you set
pk.modetokafkaandauto.createtotrue. The exception occurs because the connector maps STRING to a variable length string (for example, TEXT) and not a fixed length string (for example, VARCHAR(256)). A primary key must have a fixed length. To avoid this exception, consider the following:Do not set
auto.createtotrue.Create the database table and primary key data type in advance.
auto.evolveWhether to automatically add columns in the table schema when found to be missing relative to the record schema by issuing
ALTER.Type: boolean
Default: false
Importance: medium
quote.sql.identifiersWhen to quote table names, column names, and other identifiers in SQL statements. For backward compatibility, the default is
always.Type: string
Default: always
Importance: medium
mssql.use.merge.holdlockWhether to use HOLDLOCK when performing a MERGE INTO
upsertstatement. Note that this configuration property is specific to Microsoft SQL Server only.Type: boolean
Default: true
Importance: low
Retries
max.retriesThe maximum number of times to retry on errors before failing the task.
Type: int
Default: 10
Valid Values: [0,…]
Importance: medium
retry.backoff.msThe time in milliseconds to wait following an error before a retry attempt is made.
Type: int
Default: 3000
Valid Values: [0,…]
Importance: medium
CSFLE configuration
csfle.enabled
Accepts a boolean value. CSFLE is enabled for the connector if csfle.enabled is set to True.
Type: boolean
Default: False
auto.register.schemas
Specifies if the Serializer should attempt to register the Schema with Schema Registry.
Type: boolean
Default: true
Importance: medium
use.latest.version
Only applies when auto.register.schemas is set to false. If auto.register.schemas is set to false and use.latest.version is set to true, then instead of deriving a schema for the object passed to the client for serialization, Schema Registry uses the latest version of the schema in the subject for serialization.
Type: boolean
Default: true
Importance: medium