.. _bigquery-connector:

|kconnect-long| |gcp| BigQuery Sink Connector
---------------------------------------------

The |gcp| BigQuery Sink Connector is a sink connector that is capable of streaming data into Google BigQuery tables. When streaming data from |ak-tm| topics that have registered schemas, the sink connector can automatically create BigQuery tables with appropriate BigQuery table schema based upon information in the |ak| schema for the topic. 

* The connector supports insert operations and attempts to detect duplicates. See this  `BigQuery troubleshooting <https://cloud.google.com/bigquery/streaming-data-into-bigquery#troubleshooting>`_ for details
* The connector uses the BigQuery  `insertAll streaming api <https://googleapis.dev/java/google-cloud-clients/latest/com/google/cloud/bigquery/InsertAllRequest.html>`_ which inserts records one at a time. The records are immediately available in the table for querying.
* The connector supports streaming from a list of topics into corresponding tables in BigQuery.
* Even though the connector streams records one at a time by default (as opposed to running in batch mode), the connector is scalable as it contains an internal thread pool for streaming records in parallel.  

.. note::

    The internal thread pool defaults to 10 threads. This is configurable.
    
--------------
Prerequisites
--------------

The following are required to run the |kconnect-long| BigQuery Sink Connector:

* |ak| Broker: |cp| 3.3.0 or above, or |ak| 0.11.0 or above
* |kconnect|: |cp| 3.3.0 or above, or |ak| 0.11.0 or above
* Java 1.8
* Active Google Cloud Platform (GCP) account with authorization to create resources

-------------------------------
Install the BigQuery Connector
-------------------------------

.. include:: ../includes/connector-install.rst

.. codewithvars:: bash

   confluent-hub install wepay/kafka-connect-bigquery:latest

.. include:: ../includes/connector-install-version.rst

.. codewithvars:: bash

   confluent-hub install wepay/kafka-connect-bigquery:1.1.2

--------
License
--------

The |gcp| BigQuery Sink Connector is an open source connector and does not require a Confluent Enterprise License.


-----------
Quick Start
-----------

The Confluent BigQuery Sink Connector can stream table records into BigQuery
from |ak| topics. These records are streamed at high throughput rates to
facilitate analytical queries in near real-time.

Install the Connector
^^^^^^^^^^^^^^^^^^^^^

For the following tutorial, you need to have |cp| running locally.

Navigate to your |cp| installation directory and enter the following command:

.. codewithvars:: bash

   confluent-hub install wepay/kafka-connect-bigquery:latest

Adding a new connector plugin requires restarting |kconnect-long|. Use the
Confluent CLI to restart |kconnect|:

.. codewithvars:: bash

   |confluent_stop| connect && |confluent_start| connect
   Using CONFLUENT_CURRENT: /Users/username/Sandbox/confluent-snapshots/var/confluent.NuZHxXfq
   Starting zookeeper
   zookeeper is [UP]
   Starting kafka
   kafka is [UP]
   Starting schema-registry
   schema-registry is [UP]
   Starting kafka-rest
   kafka-rest is [UP]
   Starting connect
   connect is [UP]

Verify that the BigQuery sink connector plugin has been installed correctly and recognized
by the plugin loader:

.. codewithvars:: bash

   curl -sS localhost:8083/connector-plugins | jq .[].class | grep BigQuerySinkConnector
   "com.wepay.kafka.connect.bigqueryl.BigQuerySinkConnector"

Setting up the |gcp| BigQuery Connector
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

You need the following before starting the connector:

* An active |gcp| account with authorization to create resources.
* A BigQuery project is required. The project maybe created using the |gcp| Console.
* A BigQuery dataset is also required and should be created in the the project.
* A service account that can access the BigQuery project containing the dataset. You can create this service account in the Google Cloud Console.
* The service account must have access to the BigQuery project containing the dataset.
* You create and download a key when creating a service account. The key can be downloaded as a JSON file. It resembles the example below:

**Example service account key file:**

.. code-block:: json

    {
     "type": "service_account",
     "project_id": "confluent-243016",
     "private_key_id": "c386effb7fadb7bce56c249b28e0a9865ca3c595",
     "private_key": "-----BEGIN PRIVATE  deleted for brevity =\n-----END PRIVATE KEY-----\n",
     "client_email": "confluent2@confluent-243016.iam.gserviceaccount.com",
     "client_id": "111348633408307923943",
     "auth_uri": "https://accounts.google.com/o/oauth2/auth",
     "token_uri": "https://oauth2.googleapis.com/token",
     "auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs",
     "client_x509_cert_url": "https://www.googleapis.com/robot/v1/metadata/x509/confluent2%40confluent-243016.iam.gserviceaccount.com"
    }


Start the BigQuery sink connector 
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

Create the file ``register-kcbd-connect-bigquery.json`` to store the connector configuration.

**Connect Distributed REST quick start connector properties:**

.. code-block:: json
   :emphasize-lines: 15,16,17

    {
      "name": "kcbq-connect1",
      "config": {
        "connector.class": "com.wepay.kafka.connect.bigquery.BigQuerySinkConnector",
        "tasks.max" : "1",
        "topics" : "kcbq-quickstart1",
        "sanitizeTopics" : "true",
        "autoCreateTables" : "true",
        "autoUpdateSchemas" : "true",
        "schemaRetriever" : "com.wepay.kafka.connect.bigquery.schemaregistry.schemaretriever.SchemaRegistrySchemaRetriever",
        "schemaRegistryLocation":"http://localhost:8081",
        "bufferSize": "100000",
        "maxWriteSize":"10000",
        "tableWriteWait": "1000",
        "project" : "confluent-243016",
        "datasets" : ".*=ConfluentDataSet",
        "keyfile" : " /Users/titomccutcheon/dev/confluent_fork/kafka-connect-bigquery/kcbq-connector/quickstart/properties/confluent-243016-384a24e2de1a.json"
      }
    }


.. note::  
   
   The ``project`` key is the ``id`` value of the BigQuery project in |gcp|.
   For ``datasets``, the value ``ConfluentDataSet`` is the ID of the dataset
   entered by the user during |gcp| dataset creation. Please include
   ``.*=`` before the dataset id.  ``keyfile`` is the service account key JSON
   file location.
   
   If you don't want this connector to create a BigQuery table automatically,
   create a BigQuery table with ``Partitioning: Partition by ingestion time`` and a proper schema.
   

Start the connector.
::

   curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-kcbd-connect-bigquery.json

Start your |ak| producer
^^^^^^^^^^^^^^^^^^^^^^^^

First go to the |ak| bin folder and start a producer in a new terminal session. Type the following command which waits on terminal input.

::
   
   ./kafka-avro-console-producer --broker-list localhost:9092 --topic kcbq-quickstart1 --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}'
   
Next, enter the text for two test records. Press Enter after typing each line.

::
   
   ./kafka-avro-console-producer --broker-list localhost:9092 --topic kcbq-quickstart1 --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}'
   {"f1":"Testing the Kafka-BigQuery Connector!"}
   {"f1":"Testing the Kafka-BigQuery Connector for a second time!"}
 
Check the results in BigQuery
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

Go to the BigQuery query editor in |gcp| and enter the following SQL SELECT statement.

::
   
   SELECT * FROM ConfluentDataSet.kcbq_quickstart1;

.. note:: ``ConfluentDataSet`` is the dataset ID and ``kcbq_quickstart1`` is the name of the BigQuery  table taken from the |ak| topic. The Connector converts the topic ``kcbq-quickstart1`` to table name ``kcbq_quickstart1``


Clean up resources
^^^^^^^^^^^^^^^^^^

Delete the connector and stop Confluent services.

.. codewithvars:: bash

 curl -X DELETE localhost:8083/connectors/kcbq-connect1
 |confluent_stop|

------------------------
Additional Documentation
------------------------

.. toctree::
  :maxdepth: 1

  kafka_connect_bigquery_config