Kafka Connect GCP BigQuery Sink Connector

The GCP BigQuery Sink Connector is a sink connector that is capable of streaming data into Google BigQuery tables. When streaming data from Apache Kafka® topics that have registered schemas, the sink connector can automatically create BigQuery tables with appropriate BigQuery table schema based upon information in the Kafka schema for the topic.

  • The connector supports insert operations and attempts to detect duplicates. See this BigQuery troubleshooting for details
  • The connector uses the BigQuery insertAll streaming api which inserts records one at a time. The records are immediately available in the table for querying.
  • The connector supports streaming from a list of topics into corresponding tables in BigQuery.
  • Even though the connector streams records one at a time by default (as opposed to running in batch mode), the connector is scalable as it contains an internal thread pool for streaming records in parallel.

Note

The internal thread pool defaults to 10 threads. This is configurable.

Prerequisites

The following are required to run the Kafka Connect BigQuery Sink Connector:

  • Kafka Broker: Confluent Platform 3.3.0 or above, or Kafka 0.11.0 or above
  • Connect: Confluent Platform 3.3.0 or above, or Kafka 0.11.0 or above
  • Java 1.8
  • Active Google Cloud Platform (GCP) account with authorization to create resources

Install the BigQuery Connector

You can install this connector by using the Confluent Hub client (recommended) or you can manually download the ZIP file.

confluent-hub install wepay/kafka-connect-bigquery:latest

You can install a specific version by replacing latest with a version number. For example:

confluent-hub install wepay/kafka-connect-bigquery:1.1.2

License

The GCP BigQuery Sink Connector is an open source connector and does not require a Confluent Enterprise License.

Quick Start

The Confluent BigQuery Sink Connector can stream table records into BigQuery from Kafka topics. These records are streamed at high throughput rates to facilitate analytical queries in near real-time.

Install the Connector

For the following tutorial, you need to have Confluent Platform running locally.

Navigate to your Confluent Platform installation directory and enter the following command:

confluent-hub install wepay/kafka-connect-bigquery:latest

Adding a new connector plugin requires restarting Kafka Connect. Use the Confluent CLI to restart Connect:

confluent stop connect && confluent start connect
Using CONFLUENT_CURRENT: /Users/username/Sandbox/confluent-snapshots/var/confluent.NuZHxXfq
Starting zookeeper
zookeeper is [UP]
Starting kafka
kafka is [UP]
Starting schema-registry
schema-registry is [UP]
Starting kafka-rest
kafka-rest is [UP]
Starting connect
connect is [UP]

Verify that the BigQuery sink connector plugin has been installed correctly and recognized by the plugin loader:

curl -sS localhost:8083/connector-plugins | jq .[].class | grep BigQuerySinkConnector
"com.wepay.kafka.connect.bigqueryl.BigQuerySinkConnector"

Setting up the GCP BigQuery Connector

You need the following before starting the connector:

  • An active GCP account with authorization to create resources.
  • A BigQuery project is required. The project maybe created using the GCP Console.
  • A BigQuery dataset is also required and should be created in the the project.
  • A service account that can access the BigQuery project containing the dataset. You can create this service account in the Google Cloud Console.
  • The service account must have access to the BigQuery project containing the dataset.
  • You create and download a key when creating a service account. The key can be downloaded as a JSON file. It resembles the example below:

Example service account key file:

{
 "type": "service_account",
 "project_id": "confluent-243016",
 "private_key_id": "c386effb7fadb7bce56c249b28e0a9865ca3c595",
 "private_key": "-----BEGIN PRIVATE  deleted for brevity =\n-----END PRIVATE KEY-----\n",
 "client_email": "confluent2@confluent-243016.iam.gserviceaccount.com",
 "client_id": "111348633408307923943",
 "auth_uri": "https://accounts.google.com/o/oauth2/auth",
 "token_uri": "https://oauth2.googleapis.com/token",
 "auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs",
 "client_x509_cert_url": "https://www.googleapis.com/robot/v1/metadata/x509/confluent2%40confluent-243016.iam.gserviceaccount.com"
}

Start the BigQuery sink connector

Create the file register-kcbd-connect-bigquery.json to store the connector configuration.

Connect Distributed REST quick start connector properties:

 {
   "name": "kcbq-connect1",
   "config": {
     "connector.class": "com.wepay.kafka.connect.bigquery.BigQuerySinkConnector",
     "tasks.max" : "1",
     "topics" : "kcbq-quickstart1",
     "sanitizeTopics" : "true",
     "autoCreateTables" : "true",
     "autoUpdateSchemas" : "true",
     "schemaRetriever" : "com.wepay.kafka.connect.bigquery.schemaregistry.schemaretriever.SchemaRegistrySchemaRetriever",
     "schemaRegistryLocation":"http://localhost:8081",
     "bufferSize": "100000",
     "maxWriteSize":"10000",
     "tableWriteWait": "1000",
     "project" : "confluent-243016",
     "datasets" : ".*=ConfluentDataSet",
     "keyfile" : " /Users/titomccutcheon/dev/confluent_fork/kafka-connect-bigquery/kcbq-connector/quickstart/properties/confluent-243016-384a24e2de1a.json"
   }
 }

Note

The project key is the id value of the BigQuery project in GCP. For datasets, the value ConfluentDataSet is the ID of the dataset entered by the user during GCP dataset creation. Please include .*= before the dataset id. keyfile is the service account key JSON file location.

If you don’t want this connector to create a BigQuery table automatically, create a BigQuery table with Partitioning: Partition by ingestion time and a proper schema.

Start the connector.

curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-kcbd-connect-bigquery.json

Start your Kafka producer

First go to the Kafka bin folder and start a producer in a new terminal session. Type the following command which waits on terminal input.

./kafka-avro-console-producer --broker-list localhost:9092 --topic kcbq-quickstart1 --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}'

Next, enter the text for two test records. Press Enter after typing each line.

./kafka-avro-console-producer --broker-list localhost:9092 --topic kcbq-quickstart1 --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}'
{"f1":"Testing the Kafka-BigQuery Connector!"}
{"f1":"Testing the Kafka-BigQuery Connector for a second time!"}

Check the results in BigQuery

Go to the BigQuery query editor in GCP and enter the following SQL SELECT statement.

SELECT * FROM ConfluentDataSet.kcbq_quickstart1;

Note

ConfluentDataSet is the dataset ID and kcbq_quickstart1 is the name of the BigQuery table taken from the Kafka topic. The Connector converts the topic kcbq-quickstart1 to table name kcbq_quickstart1

Clean up resources

Delete the connector and stop Confluent services.

curl -X DELETE localhost:8083/connectors/kcbq-connect1
confluent stop

Additional Documentation