Configuration Reference for Google BigQuery Sink Connector for Confluent Platform (Legacy)
The BigQuery Sink connector can be configured using a variety of configuration properties.
Note
- These are properties for the self-managed connector. If you are using Confluent Cloud, see Google BigQuery Sink Connector for Confluent Cloud. 
- New tables and updated schemas take a few minutes to be detected by the Google Client Library. For more information see the Google Cloud BigQuery API guide. 
- defaultDataset
- The default dataset to be used - Type: string 
- Importance: high 
 - Note - defaultDatasetreplaced the- datasetsparameter of older versions of this connector.
- project
- The BigQuery project to write to. - Type: string 
- Importance: high 
 
- topics
- A list of Kafka topics to read from. - Type: list 
- Importance: high 
 
- autoCreateTables
- Create BigQuery tables if they don’t already exist. This property should only be enabled for Schema Registry-based inputs: Avro, Protobuf, or JSON Schema (JSON_SR). Table creation is not supported for JSON input. - Type: boolean 
- Default: false 
- Importance: high 
 
- gcsBucketName
- The name of the bucket where Google Cloud Storage (GCS) blobs are located. These blobs are used to batch-load to BigQuery. This is applicable only if - enableBatchLoadis configured.- Type: string 
- Default: “” 
- Importance: high 
 
- queueSize
- The maximum size (or -1 for no maximum size) of the worker queue for BigQuery write requests before all topics are paused. This is a soft limit; the size of the queue can go over this before topics are paused. All topics resume once a flush is triggered or the size of the queue drops under half of the maximum size. - Type: long 
- Default: -1 
- Valid Values: [-1,…] 
- Importance: high 
 
- bigQueryRetry
- The number of retry attempts made for a BigQuery request that fails with a backend error or a quota exceeded error. - Type: int 
- Default: 0 
- Valid Values: [0,…] 
- Importance: medium 
 
- bigQueryRetryWait
- The minimum amount of time, in milliseconds, to wait between retry attempts for a BigQuery backend or quota exceeded error. - Type: long 
- Default: 1000 
- Valid Values: [0,…] 
- Importance: medium 
 
- bigQueryMessageTimePartitioning
- Whether or not to use the message time when inserting records. Default uses the connector processing time. - Type: boolean 
- Default: false 
- Importance: high 
 
- bigQueryPartitionDecorator
- Whether or not to append partition decorator to BigQuery table name when inserting records. Default is true. Setting this to true appends partition decorator to table name (e.g. table$yyyyMMdd depending on the configuration set for bigQueryPartitionDecorator). Setting this to false bypasses the logic to append the partition decorator and uses raw table name for inserts. - Type: boolean 
- Default: true 
- Importance: high 
 
- timestampPartitionFieldName
- The name of the field in the value that contains the timestamp to partition by in BigQuery and enable timestamp partitioning for each table. Leave this configuration blank, to enable ingestion time partitioning for each table. - Type: string 
- Default: null 
- Importance: low 
 
- clusteringPartitionFieldNames
- Comma-separated list of fields where data is clustered in BigQuery. - Type: list 
- Default: null 
- Importance: low 
 
- timePartitioningType
- The time partitioning type to use when creating tables. Existing tables will not be altered to use this partitioning type. - Type: string 
- Default: DAY 
- Valid Values: (case insensitive) [MONTH, YEAR, HOUR, DAY] 
- Importance: low 
 
- keySource
- Determines whether the keyfile configuration is the path to the credentials JSON file or to the JSON itself. Available values are - FILEand- JSON. This property is available in BigQuery sink connector version 1.3 (and later).- Type: string 
- Default: FILE 
- Importance: medium 
 
- keyfile
- keyfilecan be either a string representation of the Google credentials file or the path to the Google credentials file itself. The string representation of the Google credentials file is supported in BigQuery sink connector version 1.3 (and later).- Type: string 
- Default: null 
- Importance: medium 
 
- sanitizeTopics
- Designates whether to automatically sanitize topic names before using them as table names. If not enabled, topic names are used as table names. - Type: boolean 
- Default: false 
- Importance: medium 
 
- schemaRetriever
- A class that can be used for automatically creating tables and/or updating schemas. Note that in version 2.0.0, SchemaRetriever API changed to retrieve the schema from each SinkRecord, which will help support multiple schemas per topic. - SchemaRegistrySchemaRetrieverhas been removed as it retrieves schema based on the topic.- Type: class 
- Default: - com.wepay.kafka.connect.bigquery.retrieve.IdentitySchemaRetriever
- Importance: medium 
 
- threadPoolSize
- The size of the BigQuery write thread pool. This establishes the maximum number of concurrent writes to BigQuery. - Type: int 
- Default: 10 
- Valid Values: [1,…] 
- Importance: medium 
 
- allBQFieldsNullable
- If true, no fields in any produced BigQuery schema are REQUIRED. All non-nullable Avro fields are translated as - NULLABLE(or- REPEATED, if arrays).- Type: boolean 
- Default: false 
- Importance: low 
 
- avroDataCacheSize
- The size of the cache to use when converting schemas from Avro to Kafka Connect. - Type: int 
- Default: 100 
- Valid Values: [0,…] 
- Importance: low 
 
- batchLoadIntervalSec
- The interval, in seconds, in which to attempt to run GCS to BigQuery load jobs. Only relevant if - enableBatchLoadis configured.- Type: int 
- Default: 120 
- Importance: low 
 
- convertDoubleSpecialValues
- Designates whether +Infinity is converted to Double.MAX_VALUE and whether -Infinity and NaN are converted to Double.MIN_VALUE to ensure successful delivery to BigQuery. - Type: boolean 
- Default: false 
- Importance: low 
 
- enableBatchLoad
- Beta Feature Use with caution. The sublist of topics to be batch loaded through GCS. - Type: list 
- Default: “” 
- Importance: low 
 
- includeKafkaData
- Whether to include an extra block containing the Kafka source topic, offset, and partition information in the resulting BigQuery rows. - Type: boolean 
- Default: false 
- Importance: low 
 
- upsertEnabled
- Enable upsert functionality on the connector through the use of record keys, intermediate tables, and periodic merge flushes. Row-matching will be performed based on the contents of record keys. This feature won’t work with SMTs that change the name of the topic and doesn’t support JSON input. - Type: boolean 
- Default: false 
- Importance: low 
 
- deleteEnabled
- Enable delete functionality on the connector through the use of record keys, intermediate tables, and periodic merge flushes. A delete will be performed when a record with a null value (that is–a tombstone record) is read. This feature will not work with SMTs that change the name of the topic and doesn’t support JSON input. - Type: boolean 
- Default: false 
- Importance: low 
 
- intermediateTableSuffix
- A suffix that will be appended to the names of destination tables to create the names for the corresponding intermediate tables. Multiple intermediate tables may be created for a single destination table, but their names will always start with the name of the destination table, followed by this suffix, and possibly followed by an additional suffix. - Type: string 
- Default: “tmp” 
- Importance: low 
 
- mergeIntervalMs
- How often (in milliseconds) to perform a merge flush, if upsert/delete is enabled. Can be set to - -1to disable periodic flushing.- Type: long 
- Default: 60_000L 
- Importance: low 
 
- mergeRecordsThreshold
- How many records to write to an intermediate table before performing a merge flush, if upsert/delete is enabled. Can be set to - -1to disable record count-based flushing.- Type: long 
- Default: -1 
- Importance: low 
 
- autoCreateBucket
- Whether to automatically create the given bucket, if it does not exist. - Type: boolean 
- Default: true 
- Importance: medium 
 
- allowNewBigQueryFields
- If true, new fields can be added to BigQuery tables during subsequent schema updates. - Type: boolean 
- Default: false 
- Importance: medium 
 
- allowBigQueryRequiredFieldRelaxation
- If true, fields in BigQuery Schema can be changed from - REQUIREDto- NULLABLE. Note that- allowNewBigQueryFieldsand- allowBigQueryRequiredFieldRelaxationreplaced the- autoUpdateSchemasparameter of older versions of this connector.- Type: boolean 
- Default: false 
- Importance: medium 
 
- allowSchemaUnionization
- If true, the existing table schema (if one is present) will be unionized with new record schemas during schema updates. If false, the record of the last schema in a batch will be used for any necessary table creation and schema update attempts. - Note that setting - allowSchemaUnionizationto- falseand- allowNewBigQueryFieldsand- allowBigQueryRequiredFieldRelaxationto- trueis equivalent to setting- autoUpdateSchemasto- truein older (pre-2.0.0) versions of this connector. This should only be enabled for Schema Registry-based inputs: Avro, Protobuf, or JSON Schema (JSON_SR). Table schema updates are not supported for JSON input.- If you set - allowSchemaUnionizationto- falseand- allowNewBigQueryFieldsand- allowBigQueryRequiredFieldRelaxationto- trueif BigQuery raises a schema validation exception or a table doesn’t exist when a writing a batch, the connector will try to remediate by required field relaxation and/or adding new fields.- If - allowSchemaUnionization,- allowNewBigQueryFields, and- allowBigQueryRequiredFieldRelaxationare- true, the connector will create or update tables with a schema whose fields are a union of the existing table schema’s fields and the ones present in all of the records of the current batch.- The key difference is that with unionization disabled, new record schemas have to be a superset of the table schema in BigQuery. - In general when enabled, - allowSchemaUnionizationis useful to make things work. For instance, if you’d like to remove fields from data upstream, the updated schemas still work in the connector. Similarly it is useful when different tasks see records whose schemas contain different fields that are not in the table. However note with caution that if- allowSchemaUnionizationis set and some bad records are in the topic, the BigQuery schema may be permanently changed. This presents two issues: first, since BigQuery doesn’t allow columns to be dropped from tables, they’ll add unnecessary noise to the schema. Second, since BigQuery doesn’t allow column types to be modified, they could completely break pipelines down the road where well-behaved records have schemas whose field names overlap with the accidentally-added columns in the table, but use a different type.- Type: boolean 
- Default: false 
- Importance: medium 
 
- kafkaDataFieldName
- The Kafka data field name. The default value is null, which means the Kafka Data field will not be included. - Type: string 
- Default: null 
- Importance: low 
 
- kafkaKeyFieldName
- The Kafka key field name. The default value is null, which means the Kafka Key field will not be included. - Type: string 
- Default: null 
- Importance: low 
 
- topic2TableMap
- Map of topics to tables (optional). Format: comma-separated tuples, e.g. <topic-1>:<table-1>,<topic-2>:<table-2>,.. Note that topic name should not be modified using regex SMT while using this option. Also note that SANITIZE_TOPICS_CONFIG would be ignored if this config is set. Lastly, if the topic2table map doesn’t contain the topic for a record, a table with the same name as the topic name would be created. - Type: string 
- Default: “” 
- Importance: low 
 
CSFLE configuration
csfle.enabled
Accepts a boolean value. CSFLE is enabled for the connector if csfle.enabled is set to True.
- Type: boolean 
- Default: False 
auto.register.schemas
Specifies if the Serializer should attempt to register the Schema with Schema Registry.
- Type: boolean 
- Default: true 
- Importance: medium 
use.latest.version
Only applies when auto.register.schemas is set to false. If auto.register.schemas is set to false and use.latest.version is set to true, then instead of deriving a schema for the object passed to the client for serialization, Schema Registry uses the latest version of the schema in the subject for serialization.
- Type: boolean 
- Default: true 
- Importance: medium