Important
You are viewing documentation for an older version of Confluent Platform. For the latest, click here.
Google BigQuery Sink Connector for Confluent Platform¶
Note
If you are using Confluent Cloud, see https://docs.confluent.io/cloud/current/connectors/cc-gcp-bigquery-sink.html for the cloud Quick Start.
The Kafka Connect Google BigQuery Sink Connector is used to stream data into BigQuery tables. When streaming data from Apache Kafka® topics (that have registered schemas), the sink connector can automatically create BigQuery tables with the appropriate BigQuery table schema. The BigQuery table schema is based upon information in the Kafka schema for the topic.
Features¶
The connector supports insert operations and attempts to detect duplicates. See BigQuery troubleshooting for additional information.
The connector uses the BigQuery insertAll streaming api which inserts records one at a time. The records are immediately available in the table for querying.
The connector supports streaming from a list of topics into corresponding tables in BigQuery.
Even though the connector streams records one at a time by default (as opposed to running in batch mode), the connector is scalable because it contains an internal thread pool that allows it to stream records in parallel.
Note
The internal thread pool defaults to 10 threads. This is configurable.
Note
Make sure to review BigQuery rate limits if you are planning to use multiple connectors with a high number of tasks.
Prerequisites¶
The following are required to run the Kafka Connect BigQuery Sink Connector:
- Kafka Broker: Confluent Platform 3.3.0 or above, or Kafka 0.11.0 or above
- Connect: Confluent Platform 3.3.0 or above, or Kafka 0.11.0 or above
- Java 1.8
- Active Google Cloud Platform (GCP) account with authorization to create resources
Important
Streaming into BigQuery not available via the GCP free tier. If you attempt to use streaming without enabling billing, you receive the following error: BigQuery: Streaming insert is not allowed in the free tier.
See Streaming data into BigQuery for details.
Install the BigQuery Connector¶
You can install this connector by using the Confluent Hub client (recommended) or you can manually download the ZIP file.
confluent-hub install wepay/kafka-connect-bigquery:latest
You can install a specific version by replacing latest
with a version number. For example:
confluent-hub install wepay/kafka-connect-bigquery:1.6.1
Install the connector manually¶
Download and extract the ZIP file for your connector and then follow the manual connector installation instructions.
License¶
The GCP BigQuery Sink Connector is an open source connector and does not require a Confluent Enterprise License.
Configuration Properties¶
For a complete list of configuration properties for this connector, see Google BigQuery Sink Connector Configuration Properties.
Quick Start¶
The Confluent BigQuery Sink Connector can stream table records into BigQuery from Kafka topics. These records are streamed at high throughput rates to facilitate analytical queries in near real-time.
Note
For an example of how to get Kafka Connect connected to Confluent Cloud, see Distributed Cluster in Connect Kafka Connect to Confluent Cloud.
Install the Connector¶
For the following tutorial, you need to have Confluent Platform running locally.
Navigate to your Confluent Platform installation directory and enter the following command:
confluent-hub install wepay/kafka-connect-bigquery:latest
Adding a new connector plugin requires restarting Kafka Connect. Use the Confluent CLI to restart Connect.
Tip
The command syntax for the Confluent CLI development commands changed in 5.3.0.
These commands have been moved to confluent local
. For example, the syntax for confluent start
is now
confluent local start
. For more information, see confluent local.
confluent local stop connect && confluent local start connect
Using CONFLUENT_CURRENT: /Users/username/Sandbox/confluent-snapshots/var/confluent.NuZHxXfq
Starting zookeeper
zookeeper is [UP]
Starting kafka
kafka is [UP]
Starting schema-registry
schema-registry is [UP]
Starting kafka-rest
kafka-rest is [UP]
Starting connect
connect is [UP]
Verify that the BigQuery sink connector plugin has been installed correctly and recognized by the plugin loader:
curl -sS localhost:8083/connector-plugins | jq .[].class | grep BigQuerySinkConnector
"com.wepay.kafka.connect.bigqueryl.BigQuerySinkConnector"
Setting up the GCP BigQuery Connector¶
Make sure you have the following prerequisites completed before setting up the connector.
Important
Streaming into BigQuery not available via the GCP free tier. If you attempt to use streaming without enabling billing, you receive the following error: BigQuery: Streaming insert is not allowed in the free tier.
See Streaming data into BigQuery for details.
- Prerequisites
An active GCP account with authorization to create resources.
A BigQuery project is required. The project can be created using the Google Cloud Console.
A BigQuery dataset is required in the project.
A service account that can access the BigQuery project containing the dataset. You can create this service account in the Google Cloud Console.
The service account must have access to the BigQuery project containing the dataset. You create and download a key when creating a service account. The key must be downloaded as a JSON file. It resembles the example below:
{ "type": "service_account", "project_id": "confluent-842583", "private_key_id": "...omitted...", "private_key": "-----BEGIN PRIVATE ...omitted... =\n-----END PRIVATE KEY-----\n", "client_email": "confluent2@confluent-842583.iam.gserviceaccount.com", "client_id": "...omitted...", "auth_uri": "https://accounts.google.com/oauth2/auth", "token_uri": "https://oauth2.googleapis.com/token", "auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/certs", "client_x509_cert_url": "https://www.googleapis.com/robot/metadata/confluent2%40confluent-842583.iam.gserviceaccount.com" }
According to GCP specifications, the service account will either have to have the BigQueryEditor primitive IAM role or the bigquery.dataEditor predefined IAM role. The minimum permissions are:
bigquery.datasets.get bigquery.tables.create bigquery.tables.get bigquery.tables.getData bigquery.tables.list bigquery.tables.update bigquery.tables.updateData
Start the BigQuery sink connector¶
Create the file register-kcbd-connect-bigquery.json
to store the connector configuration.
Connect Distributed REST quick start connector properties:
{
"name": "kcbq-connect1",
"config": {
"connector.class": "com.wepay.kafka.connect.bigquery.BigQuerySinkConnector",
"tasks.max" : "1",
"topics" : "kcbq-quickstart1",
"sanitizeTopics" : "true",
"autoCreateTables" : "true",
"autoUpdateSchemas" : "true",
"schemaRetriever" : "com.wepay.kafka.connect.bigquery.schemaregistry.schemaretriever.SchemaRegistrySchemaRetriever",
"schemaRegistryLocation":"http://localhost:8081",
"bufferSize": "100000",
"maxWriteSize":"10000",
"tableWriteWait": "1000",
"project" : "confluent-243016",
"datasets" : ".*=ConfluentDataSet",
"keyfile" : " /Users/titomccutcheon/dev/confluent_fork/kafka-connect-bigquery/kcbq-connector/quickstart/properties/confluent-243016-384a24e2de1a.json"
}
}
Note
The project
key is the id
value of the BigQuery project in GCP.
For datasets
, the value ConfluentDataSet
is the ID of the dataset
entered by the user during GCP dataset creation. Please include
.*=
before the dataset id. keyfile
is the service account key JSON
file location.
If you don’t want this connector to create a BigQuery table automatically,
create a BigQuery table with Partitioning: Partition by ingestion time
and a proper schema.
Start the connector.
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-kcbd-connect-bigquery.json
Start your Kafka producer¶
First go to the Kafka bin folder and start a producer in a new terminal session. Type the following command which waits on terminal input.
./kafka-avro-console-producer --broker-list localhost:9092 --topic kcbq-quickstart1 --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}'
Next, enter the text for two test records. Press Enter after typing each line.
./kafka-avro-console-producer --broker-list localhost:9092 --topic kcbq-quickstart1 --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}'
{"f1":"Testing the Kafka-BigQuery Connector!"}
{"f1":"Testing the Kafka-BigQuery Connector for a second time!"}
Check the results in BigQuery¶
Go to the BigQuery query editor in GCP and enter the following SQL SELECT statement.
SELECT * FROM ConfluentDataSet.kcbq_quickstart1;
Note
ConfluentDataSet
is the dataset ID and kcbq_quickstart1
is the name of the BigQuery table taken from the Kafka topic. The Connector converts the topic kcbq-quickstart1
to table name kcbq_quickstart1
.
Clean up resources¶
Delete the connector and stop Confluent services.
curl -X DELETE localhost:8083/connectors/kcbq-connect1
confluent local stop
Partitioning Features¶
To use timestamp partitioning by field name, the bigQueryPartitionDecorator
needs to be set to false
. Then, set the timestampPartitionFieldName
property to the field name that contains the partitioning timestamps (for example, timestampPartitionFieldName=f2
).
Using the BigQuery console, the following query can be used to output a list of existing partitions:
SELECT f2 as pt, FORMAT_TIMESTAMP("%Y%m%d", f2) as partition_id
FROM `PROJECT_ID.DATASET_ID.TABLE_ID`
GROUP BY f2
ORDER BY f2
The output lists all rows that have different dates and an additional partition_id
column. Records that have a timestamp within a day’s range will have the same partition_id.