Google BigQuery Sink Connector for Confluent Platform

Note

If you’re using Confluent Cloud, see Google BigQuery Sink connector for Confluent Cloud for the Confluent Cloud Quick Start.

The Kafka Connect Google BigQuery Sink connector is used to stream data into BigQuery tables. When streaming data from Apache Kafka® topics that have registered schemas, the sink connector can create BigQuery tables with the appropriate BigQuery table schema. The BigQuery table schema is based upon information in the Kafka schema for the topic.

Important

Google BigQuery Sink connector version 2.0.0 is not backward compatible with 1.x.x versions. See the Upgrading to 2.0.0 section for more information.

Features

Note

If you’re planning to use multiple connectors with a high number of tasks, be sure to review BigQuery rate limits.

At least once delivery

This connector guarantees that records are delivered at least once from the Kafka topic.

Dead Letter Queue

This connector supports the Dead Letter Queue (DLQ) functionality. For information about accessing and using the DLQ, see Confluent Platform Dead Letter Queue.

Multiple tasks

The Google BigQuery Sink Connector supports running one or more tasks. You can specify the number of tasks in the tasks.max configuration parameter. This can lead to huge performance gains when multiple files need to be parsed.

Insert operations and duplicate detection

The connector supports insert operations and tries to detect duplicates. See BigQuery troubleshooting for more information.

Insertion of records one at a time

The connector uses the BigQuery insertAll streaming api which inserts records one at a time. The records are available immediately in the table for querying.

Streaming from a list of topics

The connector supports streaming from a list of topics into corresponding tables in BigQuery.

Internal thread pool

Even though the BigQuery connector streams records one at a time by default (as opposed to running in batch mode), the connector is scalable because it contains an internal thread pool that allows it to stream records in parallel.

Note

The internal thread pool defaults to 10 threads, which is configurable.

Prerequisites

The following prerequisites are required to run the Kafka Connect BigQuery Sink Connector:

  • Kafka Broker: Confluent Platform 3.3.0 or above, or Kafka 0.11.0 or above
  • Connect: Confluent Platform 3.3.0 or above, or Kafka 0.11.0 or above
  • Java 1.8
  • Active Google Cloud Platform (GCP) account with authorization to create resources

Important

Streaming data into BigQuery is not available using the GCP free tier. If you try to use streaming without enabling billing, you will receive the following error: BigQuery: Streaming insert is not allowed in the free tier. See Streaming data into BigQuery for more details.

Install the BigQuery Sink Connector

You can install this connector by using the Confluent Hub client installation instructions or by manually downloading the ZIP file.

Prerequisites

Note

You must install the connector on every machine where Connect will run.

  • An installation of the Confluent Hub Client. This is installed by default with Confluent Enterprise.

  • An installation of the latest (latest) connector version.

    To install the latest connector version, navigate to your Confluent Platform installation directory and run the following command:

    confluent-hub install wepay/kafka-connect-bigquery:latest
    

    You can install a specific version by replacing latest with a version number as shown in the following example:

    confluent-hub install wepay/kafka-connect-bigquery:2.0.0
    

Install the connector manually

Download and extract the ZIP file for your connector and then follow the manual connector installation instructions.

License

The GCP BigQuery Sink connector is an open source connector and does not require a Confluent Enterprise License.

Configuration Properties

For a complete list of configuration properties for this connector, see Google BigQuery Sink Connector Configuration Properties.

Upgrading to 2.0.0

The following changes aren’t backward compatible in the BigQuery connector:

  • datasets was removed and defaultDataset has been introduced. The connector now infers the dataset from the topic name if the topic is in the form <dataset>:<tableName>. If the topic name is in the form <tablename>, the connector defaults to defaultDataset.
  • topicsToTables was removed. SMTs should be used to route topics to tables.
  • autoUpdateSchemas was replaced by allowNewBigQueryFields and allowBigQueryRequiredFieldRelaxation.

Quick Start

The Confluent BigQuery Sink connector can stream table records into BigQuery from Kafka topics. These records are streamed at high throughput rates to facilitate analytical queries in near real-time.

Note

For an example of how to get Kafka Connect connected to Confluent Cloud, see Distributed Cluster.

Installing the Connector

To install the BiqQuery connector, complete the following steps:

Note

For the following tutorial, you must have Confluent Platform running locally.

  1. Navigate to your Confluent Platform installation directory and enter the following command:

    confluent-hub install wepay/kafka-connect-bigquery:latest
    
  2. Use the Confluent CLI to restart Connect as adding a new connector plugin requires restarting Kafka Connect.

Tip

The command syntax for the Confluent CLI development commands changed in 5.3.0. These commands have been moved to confluent local. For example, the syntax for confluent start is now confluent local services start. For more information, see confluent local.

confluent local services connect stop && confluent local services connect start
Using CONFLUENT_CURRENT: /Users/username/Sandbox/confluent-snapshots/var/confluent.NuZHxXfq
Starting Zookeeper
Zookeeper is [UP]
Starting Kafka
Kafka is [UP]
Starting Schema Registry
Schema Registry is [UP]
Starting Kafka REST
Kafka REST is [UP]
Starting Connect
Connect is [UP]
  1. Verify the BigQuery Sink connector plugin has been installed correctly and recognized by the plugin loader:

    curl -sS localhost:8083/connector-plugins | jq .[].class | grep BigQuerySinkConnector
    "com.wepay.kafka.connect.bigqueryl.BigQuerySinkConnector"
    

Setting up the GCP BigQuery Connector

Important

Streaming into BigQuery not available via the GCP free tier. If you attempt to use streaming without enabling billing, you receive the following error: BigQuery: Streaming insert is not allowed in the free tier. See Streaming data into BigQuery for details.

Prerequisites

The following prerequisites are required before setting up the BigQuery connector.

  • An active GCP account with authorization to create resources.

  • A BigQuery project. You can create the project using the Google Cloud Console.

  • A BigQuery dataset in the project.

  • A service account that can access the BigQuery project containing the dataset. You can create this service account in the Google Cloud Console.

  • The service account must have access to the BigQuery project containing the dataset. You create and download a key when creating a service account. You must download the key as a JSON file as shown in the following example:

      {
         "type": "service_account",
         "project_id": "confluent-842583",
         "private_key_id": "...omitted...",
         "private_key": "-----BEGIN PRIVATE ...omitted... =\n-----END PRIVATE KEY-----\n",
         "client_email": "confluent2@confluent-842583.iam.gserviceaccount.com",
         "client_id": "...omitted...",
         "auth_uri": "https://accounts.google.com/oauth2/auth",
         "token_uri": "https://oauth2.googleapis.com/token",
         "auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/certs",
         "client_x509_cert_url": "https://www.googleapis.com/robot/metadata/confluent2%40confluent-842583.iam.gserviceaccount.com"
      }
    
    According to `GCP specifications
    <https://cloud.google.com/bigquery/docs/access-control>`__, the service
    account will either need the **BigQueryEditor** primitive IAM role or the
    **bigquery.dataEditor** predefined IAM role. The minimum permissions are as
    follows:
    
    .. code-block:: text
    
       bigquery.datasets.get
       bigquery.tables.create
       bigquery.tables.get
       bigquery.tables.getData
       bigquery.tables.list
       bigquery.tables.update
       bigquery.tables.updateData
    

Starting the BigQuery Sink Connector

To start the BigQuery Sink connector, complete the following steps:

  1. Create the file register-kcbd-connect-bigquery.json to store the connector configuration.

    Connect Distributed REST quick start connector properties:

    {
          "name": "kcbq-connect1",
          "config": {
          "connector.class": "com.wepay.kafka.connect.bigquery.BigQuerySinkConnector",
          "tasks.max" : "1",
          "topics" : "kcbq-quickstart1",
          "sanitizeTopics" : "true",
          "autoCreateTables" : "true",
          "autoUpdateSchemas" : "true",
          "schemaRetriever" : "com.wepay.kafka.connect.bigquery.retrieve.IdentitySchemaRetriever",
          "project" : "confluent-243016",
          "defaultDataset" : "ConfluentDataSet",
          "keyfile" : " /Users/titomccutcheon/dev/confluent_fork/kafka-connect-bigquery/kcbq-connector/quickstart/properties/confluent-243016-384a24e2de1a.json",
          "transforms" : "RegexTransformation",
          "transforms.RegexTransformation.type" : "org.apache.kafka.connect.transforms.RegexRouter",
          "transforms.RegexTransformation.regex" : "(kcbq_)(.*)",
          "transforms.RegexTransformation.replacement" : "$2"
        }
    }
    

    Note

    The project key is the id value of the BigQuery project in GCP. For datasets, the value ConfluentDataSet is the ID of the dataset entered by the user during GCP dataset creation.``keyfile`` is the service account key JSON file location.

    If you don’t want this connector to create a BigQuery table automatically, create a BigQuery table with Partitioning: Partition by ingestion time and a proper schema.

    Note

    The properties prefixed with transforms are used to set up SMTs. The following is an example regex router SMT that strips kcbq_ from the topic name. Replace with relevant regex to replace the topic of each sink record with destination dataset and table name in the format <dataset>:<tableName> or only the destination table name in the format <tableName>

  2. Start the connector.

    curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-kcbd-connect-bigquery.json
    

Starting your Kafka producer

To start your Kafka producer, complete the following steps:

  1. Go to the Kafka bin folder and start a producer in a new terminal session.

  2. Type the following command which waits on terminal input.

    ./kafka-avro-console-producer --broker-list localhost:9092 --topic kcbq-quickstart1 --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}'
    
  3. Enter text for two test records, and press Enter after typing each line.

    ./kafka-avro-console-producer --broker-list localhost:9092 --topic kcbq-quickstart1 --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}'
    {"f1":"Testing the Kafka-BigQuery Connector!"}
    {"f1":"Testing the Kafka-BigQuery Connector for a second time!"}
    

Checking results in BigQuery

To check the results in BigQuery, complete the following steps:

  1. Go to the BigQuery editor in GCP.

  2. Enter the following SQL SELECT statement.

    SELECT * FROM ConfluentDataSet.quickstart1;
    

    Note

    ConfluentDataSet is the dataset ID and quickstart1 is the name of the BigQuery table taken from the Kafka topic. In this case, the SMT strips kcbq_ from the topic: the Connector converts this topic name kcbq_quickstart1 to table name quickstart1.

Cleaning up resources

To clean up resources in BiqQuery, complete the following steps:

  1. Delete the connector:

    curl -X DELETE localhost:8083/connectors/kcbq-connect1
    
  2. Stop Confluent services:

    confluent local stop
    

Partitioning Features

To use timestamp partitioning by field name, you must set the bigQueryPartitionDecorator to false, and then set the timestampPartitionFieldName property to the field name that contains the partitioning timestamps (for example, timestampPartitionFieldName=f2). With the BigQuery console, you can use the following query to output a list of existing partitions:

SELECT f2 as pt, FORMAT_TIMESTAMP("%Y%m%d", f2) as partition_id
FROM `PROJECT_ID.DATASET_ID.TABLE_ID`
GROUP BY f2
ORDER BY f2

The output lists all rows that have different dates and an additional partition_id column. Records that have a timestamp within a day’s range will have the same partition_id.

Additional Documentation