Google BigQuery Sink Connector for Confluent Platform (Legacy)¶
The Kafka Connect Google BigQuery Sink connector is used to stream data into BigQuery tables. When streaming data from Apache Kafka® topics that have registered schemas, the sink connector can create BigQuery tables with the appropriate BigQuery table schema. The BigQuery table schema is based upon information in the Kafka schema for the topic.
It is important to note the following:
- The BigQuery Sink V2 connector is not supported in Confluent Platform. It is only supported in Confluent Cloud.
- Version 2.x.x of the Google BigQuery Sink connector for Confluent Platform is not backward compatible with 1.x.x versions. For more information, see the Upgrading to 2.x.x section.
- Google BigQuery Sink connector for Confluent Platform (legacy) will remain supported until stated otherwise and should not be considered as End of Support (EoS).
- If you want to use multiple connectors with a high number of tasks, ensure you review BigQuery rate limits.
Looking for the fully-managed Google BigQuery Sink Connector?
Check out the Google BigQuery Sink V2 Connector for Confluent Cloud and start ingesting data using the BigQuery Storage Write API.
Features¶
- At least once delivery
- Dead Letter Queue
- Multiple tasks
- Insert operations and duplicate detection
- Insertion of records one at a time
- Streaming from a list of topics
- Internal thread pool
At least once delivery¶
This connector guarantees that records are delivered at least once from the Kafka topic.
Dead Letter Queue¶
This connector supports routing invalid records to the Dead Letter
Queue (DLQ). This includes any records
having a 400
code (invalid error message) from BigQuery. Note that DLQ
routing does not work if allowSchemaUnionization
is set to false
and
allowNewBigQueryFields
and allowBigQueryRequiredFieldRelaxation
are set
to true
(which is equivalent to setting autoUpdateSchemas
to true
in
versions earlier than 2.0.0 of this connector)
and the connector detects that the failure is due to schema mismatch. For
information about accessing and using the DLQ, see Confluent Platform
Dead Letter Queue.
Multiple tasks¶
The Google BigQuery Sink connector supports running one or more tasks. You can specify
the number of tasks in the tasks.max
configuration parameter. This can lead
to performance gains when multiple files need to be parsed.
Insert operations and duplicate detection¶
The InsertAll API supports insert operations and tries to detect duplicates. For more information, see BigQuery troubleshooting.
Insertion of records one at a time¶
The connector uses the BigQuery insertAll streaming api which inserts records which are available immediately in the table for querying.
Streaming from a list of topics¶
The connector supports streaming from a list of topics into corresponding tables in BigQuery.
Internal thread pool¶
Even though the BigQuery connector streams records one at a time by default (as opposed to running in batch mode), the connector is scalable because it contains an internal thread pool that allows it to stream records in parallel. Note that the internal thread pool defaults to 10 threads, which is configurable.
Limitations¶
The BigQuery Sink connector has the following limitations:
- The connector does not support schemas with recursion.
- The connector does not support schemas having float fields with NaN or +Infinity values.
- Auto schema update does not support removing columns.
- Auto schema update does not support recursive schemas.
When the connector is configured with
upsertEnabled
ordeleteEnabled
, it does not support Single Message Transformations (SMTs) that modify the topic name. Additionally, the following transformations are not allowed:io.debezium.transforms.ByLogicalTableRouter
io.debezium.transforms.outbox.EventRouter
org.apache.kafka.connect.transforms.RegexRouter
org.apache.kafka.connect.transforms.TimestampRouter
io.confluent.connect.transforms.MessageTimestampRouter
io.confluent.connect.transforms.ExtractTopic$Key
io.confluent.connect.transforms.ExtractTopic$Value
Note
When the connector is not configured with
upsertEnabled
ordeleteEnabled
, these SMTs can be used without any issue.
Supported data types¶
The following list contains the supported BigQuery data types and the associated
connector mapping. Note that this mapping applies when the connector creates a
table automatically or updates schemas (that is, if either auto.create.tables
or
auto.update.schemas
is set to true
).
BigQuery Data Type | Connector Mapping |
---|---|
STRING | String |
INTEGER | INT8 |
INTEGER | INT16 |
INTEGER | INT32 |
INTEGER | INT64 |
FLOAT | FLOAT32 |
FLOAT | FLOAT64 |
BOOLEAN | Boolean |
BYTES | Bytes |
TIMESTAMP | Logical TIMESTAMP |
TIME | Logical TIME |
DATE | Logical DATE |
FLOAT | Logical Decimal |
DATE | Debezium Date |
TIME | Debezium MicroTime |
TIME | Debezium Time |
TIMESTAMP | Debezium MicroTimestamp |
TIMESTAMP | Debezium TIMESTAMP |
TIMESTAMP | Debezium ZonedTimestamp |
Install the BigQuery connector¶
You can install this connector by using the confluent connect plugin install command, or by manually downloading the ZIP file.
Prerequisites¶
- You must install the connector on every machine where Connect will run.
- If you want to install the connector using Confluent Hub, you must install the Confluent Hub Client. This is installed by default with Confluent Enterprise.
- Kafka Broker: Confluent Platform 3.3.0 or later, or Kafka 0.11.0 or later
- Connect: Confluent Platform 3.3.0 or later, or Kafka 0.11.0 or later
- Java 1.8
- Active Google Cloud Platform (GCP) account with authorization to create resources
Important
Streaming into BigQuery is not available with the Google Cloud free tier. If you try
to use streaming without enabling billing, you receive the following error:
BigQuery: Streaming insert is not allowed in the free tier.
. For more
details, see Streaming data into BigQuery.
Install the connector using the Confluent CLI¶
To install the latest
connector version using Confluent Hub Client, navigate to your Confluent Platform installation directory
and run the following command:
confluent connect plugin install wepay/kafka-connect-bigquery:latest
You can install a specific version by replacing latest
with a version number
as shown in the following example:
confluent connect plugin install wepay/kafka-connect-bigquery:2.6.0
Install the connector manually¶
Download and extract the ZIP file for your connector and then follow the manual connector installation instructions.
License¶
The Google Cloud BigQuery Sink Connector is an open source connector and does not require a Confluent Enterprise License.
Configuration properties¶
For a complete list of configuration properties for this connector, see Configuration Reference for Google BigQuery Sink Connector for Confluent Platform (Legacy).
Upgrading to 2.x.x¶
The following changes aren’t backward compatible in the BigQuery connector:
datasets
was removed anddefaultDataset
has been introduced. The connector now infers the dataset from the topic name if the topic is in the form<dataset>:<tableName>
. If the topic name is in the form<tablename>
, the connector defaults todefaultDataset
.topicsToTables
was removed. You should use SMT RegexRouter to route topics to tables.autoUpdateSchemas
was replaced byallowNewBigQueryFields
andallowBigQueryRequiredFieldRelaxation
.value.converter.enhanced.avro.schema.support
should be set to false or removed. If this property is not removed or set to false, you may receive the following error:Invalid field name "com.examples.project-super-important.v1.MyData". Fields must contain only letters, numbers, and underscores, start with a letter or underscore, and be at most 300 characters long.
Quick start¶
The Confluent BigQuery Sink connector can stream table records into BigQuery from Kafka topics. These records are streamed at high throughput rates to facilitate analytical queries in near real-time.
For an example of how to get Kafka Connect connected to Confluent Cloud, see Connect Self-Managed Kafka Connect to Confluent Cloud.
Install the connector¶
To install the BiqQuery connector, complete the following steps. Note that to run the following steps, you must have Confluent Platform running locally.
Navigate to your Confluent Platform installation directory and enter the following command:
confluent connect plugin install wepay/kafka-connect-bigquery:latest
Use the Confluent CLI to restart Connect as adding a new connector plugin requires restarting Kafka Connect.
Tip
The command syntax for the Confluent CLI development commands changed in 5.3.0.
These commands have been moved to confluent local
. For example, the syntax for confluent start
is now
confluent local services start
. For more information, see confluent local.
confluent local services connect stop && confluent local services connect start
Using CONFLUENT_CURRENT: /Users/username/Sandbox/confluent-snapshots/var/confluent.NuZHxXfq
Starting Zookeeper
Zookeeper is [UP]
Starting Kafka
Kafka is [UP]
Starting Schema Registry
Schema Registry is [UP]
Starting Kafka REST
Kafka REST is [UP]
Starting Connect
Connect is [UP]
Verify the BigQuery Sink Connector plugin has been installed correctly and recognized by the plugin loader:
curl -sS localhost:8083/connector-plugins | jq .[].class | grep BigQuerySinkConnector "com.wepay.kafka.connect.bigqueryl.BigQuerySinkConnector"
Set up the Google Cloud BigQuery connector¶
Use the topics in this section to set up the BigQuery Sink connector for Confluent Platform.
Prerequisites¶
The following prerequisites are required before setting up the BigQuery connector.
An active Google Cloud account with authorization to create resources.
A BigQuery project. You can create the project using the Google Cloud Console.
A BigQuery dataset in the project.
A service account that can access the BigQuery project containing the dataset. You can create this service account in the Google Cloud Console.
The service account must have access to the BigQuery project containing the dataset. You create and download a key when creating a service account. You must download the key as a JSON file as shown in the following example:
{ "type": "service_account", "project_id": "confluent-842583", "private_key_id": "...omitted...", "private_key": "-----BEGIN PRIVATE ...omitted... =\n-----END PRIVATE KEY-----\n", "client_email": "confluent2@confluent-842583.iam.gserviceaccount.com", "client_id": "...omitted...", "auth_uri": "https://accounts.google.com/oauth2/auth", "token_uri": "https://oauth2.googleapis.com/token", "auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/certs", "client_x509_cert_url": "https://www.googleapis.com/robot/metadata/confluent2%40confluent-842583.iam.gserviceaccount.com" } According to `GCP specifications <https://cloud.google.com/bigquery/docs/access-control>`__, the service account will either need the **BigQueryEditor** primitive IAM role or the **bigquery.dataEditor** predefined IAM role. The minimum permissions are as follows: .. code-block:: text bigquery.datasets.get bigquery.tables.create bigquery.tables.get bigquery.tables.getData bigquery.tables.list bigquery.tables.update bigquery.tables.updateData
Start the BigQuery Sink connector¶
To start the BigQuery Sink Connector, complete the following steps:
Create the file
register-kcbd-connect-bigquery.json
to store the connector configuration.Connect Distributed REST quick start connector properties:
{ "name": "kcbq-connect1", "config": { "connector.class": "com.wepay.kafka.connect.bigquery.BigQuerySinkConnector", "tasks.max" : "1", "topics" : "kcbq-quickstart1", "sanitizeTopics" : "true", "autoCreateTables" : "true", "allowNewBigQueryFields" : "true", "allowBigQueryRequiredFieldRelaxation" : "true", "schemaRetriever" : "com.wepay.kafka.connect.bigquery.retrieve.IdentitySchemaRetriever", "project" : "confluent-243016", "defaultDataset" : "ConfluentDataSet", "keyfile" : " /Users/titomccutcheon/dev/confluent_fork/kafka-connect-bigquery/kcbq-connector/quickstart/properties/confluent-243016-384a24e2de1a.json", "transforms" : "RegexTransformation", "transforms.RegexTransformation.type" : "org.apache.kafka.connect.transforms.RegexRouter", "transforms.RegexTransformation.regex" : "(kcbq_)(.*)", "transforms.RegexTransformation.replacement" : "$2" } }
- Note that the
project
key is theid
value of the BigQuery project in Google Cloud. For
datasets
, the valueConfluentDataSet
is the ID of the dataset entered by the user during Google Cloud dataset creation.``keyfile`` is the service account key JSON file location.If you don’t want this connector to create a BigQuery table automatically, create a BigQuery table with
Partitioning: Partition by ingestion time
and a proper schema.Also, note that the properties prefixed with
transforms
are used to set up SMTs. The following is an example regex router SMT that stripskcbq_
from the topic name. Replace with relevant regex to replace the topic of each sink record with destination dataset and table name in the format<dataset>:<tableName>
or only the destination table name in the format<tableName>
Start the connector.
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-kcbd-connect-bigquery.json
Start your Kafka producer¶
To start your Kafka producer, complete the following steps:
Go to the Kafka bin folder and start a producer in a new terminal session.
Type the following command which waits on terminal input.
./kafka-avro-console-producer --broker-list localhost:9092 --topic kcbq-quickstart1 --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}'
Enter text for two test records, and press
Enter
after typing each line../kafka-avro-console-producer --broker-list localhost:9092 --topic kcbq-quickstart1 --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}' {"f1":"Testing the Kafka-BigQuery Connector!"} {"f1":"Testing the Kafka-BigQuery Connector for a second time!"}
Checking results in BigQuery¶
To the check the results in BigQuery, complete the following steps:
Go to the BigQuery editor in Google Cloud.
Enter the following SQL
SELECT
statement.SELECT * FROM ConfluentDataSet.quickstart1;
Note
ConfluentDataSet
is the dataset ID andquickstart1
is the name of the BigQuery table taken from the Kafka topic. In this case, the SMT stripskcbq_
from the topic: the Connector converts this topic namekcbq_quickstart1
to table namequickstart1
.
Clean up resources¶
To clean up resources in BiqQuery, complete the following steps:
Delete the connector:
curl -X DELETE localhost:8083/connectors/kcbq-connect1
Stop Confluent services:
confluent local stop
Partition features¶
To use timestamp partitioning by field name, you must set the
bigQueryPartitionDecorator
to false
, and then set the
timestampPartitionFieldName
property to the field name that contains the
partitioning timestamps (for example, timestampPartitionFieldName=f2
). With
the BigQuery console, you can use the following query to output a list of
existing partitions:
SELECT f2 as pt, FORMAT_TIMESTAMP("%Y%m%d", f2) as partition_id
FROM `PROJECT_ID.DATASET_ID.TABLE_ID`
GROUP BY f2
ORDER BY f2
The output lists all rows that have different dates and an additional
partition_id
column. Records that have a timestamp within a day’s range will
have the same partition_id
.