.. _bigquery-connector: |kconnect-long| |gcp| BigQuery Sink Connector --------------------------------------------- The |gcp| BigQuery Sink Connector is a sink connector that is capable of streaming data into Google BigQuery tables. When streaming data from |ak-tm| topics that have registered schemas, the sink connector can automatically create BigQuery tables with appropriate BigQuery table schema based upon information in the |ak| schema for the topic. * The connector supports insert operations and attempts to detect duplicates. See this `BigQuery troubleshooting <https://cloud.google.com/bigquery/streaming-data-into-bigquery#troubleshooting>`_ for details * The connector uses the BigQuery `insertAll streaming api <https://googleapis.dev/java/google-cloud-clients/latest/com/google/cloud/bigquery/InsertAllRequest.html>`_ which inserts records one at a time. The records are immediately available in the table for querying. * The connector supports streaming from a list of topics into corresponding tables in BigQuery. * Even though the connector streams records one at a time by default (as opposed to running in batch mode), the connector is scalable as it contains an internal thread pool for streaming records in parallel. .. note:: The internal thread pool defaults to 10 threads. This is configurable. -------------- Prerequisites -------------- The following are required to run the |kconnect-long| BigQuery Sink Connector: * |ak| Broker: |cp| 3.3.0 or above, or |ak| 0.11.0 or above * |kconnect|: |cp| 3.3.0 or above, or |ak| 0.11.0 or above * Java 1.8 * Active Google Cloud Platform (GCP) account with authorization to create resources ------------------------------- Install the BigQuery Connector ------------------------------- .. include:: ../includes/connector-install.rst .. codewithvars:: bash confluent-hub install wepay/kafka-connect-bigquery:latest .. include:: ../includes/connector-install-version.rst .. codewithvars:: bash confluent-hub install wepay/kafka-connect-bigquery:1.1.2 -------- License -------- The |gcp| BigQuery Sink Connector is an open source connector and does not require a Confluent Enterprise License. ----------- Quick Start ----------- The Confluent BigQuery Sink Connector can stream table records into BigQuery from |ak| topics. These records are streamed at high throughput rates to facilitate analytical queries in near real-time. Install the Connector ^^^^^^^^^^^^^^^^^^^^^ For the following tutorial, you need to have |cp| running locally. Navigate to your |cp| installation directory and enter the following command: .. codewithvars:: bash confluent-hub install wepay/kafka-connect-bigquery:latest Adding a new connector plugin requires restarting |kconnect-long|. Use the Confluent CLI to restart |kconnect|: .. codewithvars:: bash |confluent_stop| connect && |confluent_start| connect Using CONFLUENT_CURRENT: /Users/username/Sandbox/confluent-snapshots/var/confluent.NuZHxXfq Starting zookeeper zookeeper is [UP] Starting kafka kafka is [UP] Starting schema-registry schema-registry is [UP] Starting kafka-rest kafka-rest is [UP] Starting connect connect is [UP] Verify that the BigQuery sink connector plugin has been installed correctly and recognized by the plugin loader: .. codewithvars:: bash curl -sS localhost:8083/connector-plugins | jq .[].class | grep BigQuerySinkConnector "com.wepay.kafka.connect.bigqueryl.BigQuerySinkConnector" Setting up the |gcp| BigQuery Connector ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ You need the following before starting the connector: * An active |gcp| account with authorization to create resources. * A BigQuery project is required. The project maybe created using the |gcp| Console. * A BigQuery dataset is also required and should be created in the the project. * A service account that can access the BigQuery project containing the dataset. You can create this service account in the Google Cloud Console. * The service account must have access to the BigQuery project containing the dataset. * You create and download a key when creating a service account. The key can be downloaded as a JSON file. It resembles the example below: **Example service account key file:** .. code-block:: json { "type": "service_account", "project_id": "confluent-243016", "private_key_id": "c386effb7fadb7bce56c249b28e0a9865ca3c595", "private_key": "-----BEGIN PRIVATE deleted for brevity =\n-----END PRIVATE KEY-----\n", "client_email": "confluent2@confluent-243016.iam.gserviceaccount.com", "client_id": "111348633408307923943", "auth_uri": "https://accounts.google.com/o/oauth2/auth", "token_uri": "https://oauth2.googleapis.com/token", "auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs", "client_x509_cert_url": "https://www.googleapis.com/robot/v1/metadata/x509/confluent2%40confluent-243016.iam.gserviceaccount.com" } Start the BigQuery sink connector ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ Create the file ``register-kcbd-connect-bigquery.json`` to store the connector configuration. **Connect Distributed REST quick start connector properties:** .. code-block:: json :emphasize-lines: 15,16,17 { "name": "kcbq-connect1", "config": { "connector.class": "com.wepay.kafka.connect.bigquery.BigQuerySinkConnector", "tasks.max" : "1", "topics" : "kcbq-quickstart1", "sanitizeTopics" : "true", "autoCreateTables" : "true", "autoUpdateSchemas" : "true", "schemaRetriever" : "com.wepay.kafka.connect.bigquery.schemaregistry.schemaretriever.SchemaRegistrySchemaRetriever", "schemaRegistryLocation":"http://localhost:8081", "bufferSize": "100000", "maxWriteSize":"10000", "tableWriteWait": "1000", "project" : "confluent-243016", "datasets" : ".*=ConfluentDataSet", "keyfile" : " /Users/titomccutcheon/dev/confluent_fork/kafka-connect-bigquery/kcbq-connector/quickstart/properties/confluent-243016-384a24e2de1a.json" } } .. note:: The ``project`` key is the ``id`` value of the BigQuery project in |gcp|. For ``datasets``, the value ``ConfluentDataSet`` is the ID of the dataset entered by the user during |gcp| dataset creation. Please include ``.*=`` before the dataset id. ``keyfile`` is the service account key JSON file location. If you don't want this connector to create a BigQuery table automatically, create a BigQuery table with ``Partitioning: Partition by ingestion time`` and a proper schema. Start the connector. :: curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-kcbd-connect-bigquery.json Start your |ak| producer ^^^^^^^^^^^^^^^^^^^^^^^^ First go to the |ak| bin folder and start a producer in a new terminal session. Type the following command which waits on terminal input. :: ./kafka-avro-console-producer --broker-list localhost:9092 --topic kcbq-quickstart1 --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}' Next, enter the text for two test records. Press Enter after typing each line. :: ./kafka-avro-console-producer --broker-list localhost:9092 --topic kcbq-quickstart1 --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}' {"f1":"Testing the Kafka-BigQuery Connector!"} {"f1":"Testing the Kafka-BigQuery Connector for a second time!"} Check the results in BigQuery ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ Go to the BigQuery query editor in |gcp| and enter the following SQL SELECT statement. :: SELECT * FROM ConfluentDataSet.kcbq_quickstart1; .. note:: ``ConfluentDataSet`` is the dataset ID and ``kcbq_quickstart1`` is the name of the BigQuery table taken from the |ak| topic. The Connector converts the topic ``kcbq-quickstart1`` to table name ``kcbq_quickstart1`` Clean up resources ^^^^^^^^^^^^^^^^^^ Delete the connector and stop Confluent services. .. codewithvars:: bash curl -X DELETE localhost:8083/connectors/kcbq-connect1 |confluent_stop| ------------------------ Additional Documentation ------------------------ .. toctree:: :maxdepth: 1 kafka_connect_bigquery_config