.. _connect_bigtable: Google Cloud BigTable Sink Connector for |cp| ============================================= The |kconnect-long| BigTable Sink Connector allows moving data from |ak-tm| to Google Cloud BigTable. It writes data from a topic in |ak| to a table in the specified BigTable instance. Auto-creation of tables and the auto-creation of column families are also supported. Limitations ----------- * The connector is subject to all quotas enforced by `Google Bigtable `__. * The connector does not support batched ``insert`` operations hence the through put on inserts is expected to be lower. * BigTable does not support ``update`` operations * The Connector does not support ``delete`` operations Install the BigTable Sink Connector ----------------------------------- .. include:: ../includes/connector-install.rst .. codewithvars:: bash confluent-hub install confluentinc/kafka-connect-gcp-bigtable:latest .. include:: ../includes/connector-install-version.rst .. codewithvars:: bash confluent-hub install confluentinc/kafka-connect-gcp-bigtable:1.0.0-preview Install Connector Manually ^^^^^^^^^^^^^^^^^^^^^^^^^^ `Download and extract the ZIP file `_ for your connector and then follow the manual connector installation :ref:`instructions `. License ------- .. include:: ../includes/enterprise-license.rst See :ref:`bigtable-sink-connector-license-config` for license properties and :ref:`bigtable-sink-license-topic-configuration` for information about the license topic. .. include:: ../includes/bigtable-index.rst Authorization Failures ^^^^^^^^^^^^^^^^^^^^^^ The BigTable connector must authenticate with a BigTable instance and establish a connection. If a connection fails because of authentication, the connector will stop immediately. These errors may require changes in your Google Cloud account which may include creating service account keys. Try to rerun your connector after you make the account changes. See `service account keys `__ for more information. Quota Failures ^^^^^^^^^^^^^^ The connector might fail due to exceeding some of the `BigTable Quotas `__. Here are some commonly seen quota errors: * The connector might fail because the connector exceeds the a quota error defined ``per user per 100 seconds``. In this case make sure that ``retry.timeout.ms`` is set high enough that the connector is able to retry operation after the quota resets. The following shows an example stack trace: :: Caused by: org.apache.kafka.connect.errors.ConnectException: ... ... ERROR Could not complete RPC. Failure #0, got: Status{code=RESOURCE_EXHAUSTED, description=Quota exceeded for quota group 'TablesWriteGroup' and limit 'USER-100s' of service 'bigtableadmin.googleapis.com' for consumer 'project_number: .. * Occasionally, the connector might exceed quotas defined ``per project per day``. In this case, restarting the connector will not fix the error. * Some quota errors may be related to excessive column family creation (BigTable caps column families at a 100 per table). Consider revising the table schema so the connector is not trying to create too many column families. See `BitTable schema design `__ for additional information. Enabling Debug Logging ---------------------- The |kconnect| worker log configuration controls how much detail is included in the logs. By default, the worker logs include enough detail to identify basic functionality. Enable DEBUG logs in the |kconnect| worker's log configuration to include more details. This change must be made on each worker and only takes effect upon worker startup. After you change the log configuration as outlined below on each |kconnect| worker, restart all of the |kconnect| workers. A rolling restart can be used if necessary. .. note:: Trace-level logging is verbose and contains many more details, and may be useful to solve certain failures. Trace-level logging is enabled like debug-level logging is enabled, except ``TRACE`` is used instead of ``DEBUG``. On-Premises Installation ^^^^^^^^^^^^^^^^^^^^^^^^ For local or on-premises installations of |cp|, the ``etc/kafka/connect-log4j.properties`` file defines the logging configuration of the |kconnect| worker process. To enable DEBUG on just the BigTable connector, modify the ``etc/kafka/connect-log4j.properties`` file to include the following line: :: :name: connect-log4j.properties log4j.logger.io.confluent.gcp.bigtable=DEBUG To enable DEBUG on all of the |kconnect| worker's code, including all connectors, change the ``log4j.rootLogger=`` line to use ``DEBUG`` instead of ``INFO``. For example, the default log configuration for |kconnect| includes this line: :: :name: connect-log4j.properties log4j.rootLogger=INFO, stdout Change this line to the following to enable DEBUG on all of the |kconnect| worker code: :: :name: connect-log4j.properties log4j.rootLogger=DEBUG, stdout .. note:: This setting causes may generate a large amount of logs from ``org.apache.kafka.clients`` packages, which can be suppressed by setting ``log4j.logger.org.apache.kafka.clients=ERROR``. Quick Start ----------- In this quick start, the BigTable sink connector is used to export data produced by the Avro console producer to a table in a BigTable instance. Prerequisites ^^^^^^^^^^^^^ Cloud BigTable Prerequisites - `Google Cloud Platform (GCP) Account `_ - A GCP project and billing enabled, `steps here `__. Step 3 at this link is optional. - Set up the Cloud SDK and cbt using `these steps `__. Confluent Prerequisites - :ref:`Confluent Platform ` - :ref:`Confluent CLI ` (requires separate installation) Set up Credentials ^^^^^^^^^^^^^^^^^^ Create a service account and service account key under the GCP project. #. Open the **IAM & Admin** page in the GCP Console. #. Select your project and click **Continue**. #. In the left nav, click **Service accounts**. #. In the top toolbar, click **Create Service Account**. #. Enter the service account name and description; for example ``test-service-account``. #. Click **Create** and on the next page select the role ``BigTable Administrator`` under ``Cloud BigTable``. #. On the next page click **Create Key** and download the JSON file. #. For this quickstart save the file under your ``$home`` directory and name it ``bigtable-test-credentials.json``. More information on service account keys can be found `here `_. Create a BigTable Instance ^^^^^^^^^^^^^^^^^^^^^^^^^^ Create a test instance named ``test-instance`` in BigTable using the console. See `detailed steps `_ for creating an instance. Install and Load the Connector ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ #. Install the connector through the :ref:`confluent_hub_client`. .. codewithvars:: bash # run from your CP installation directory confluent-hub install confluentinc/kafka-connect-gcp-bigtable:latest .. tip:: By default, it will install the plugin into ``share/confluent-hub-components`` and add the directory to the plugin path. #. Adding a new connector plugin requires restarting |kconnect|. Use the Confluent CLI to restart Connect. .. codewithvars:: bash |confluent_stop| connect && |confluent_start| connect #. Configure your connector by adding the file ``etc/kafka-connect-gcp-bigtable/sink-quickstart-bigtable.properties``, with the following properties: :: name=BigTableSinkConnector topics=stats tasks.max=1 connector.class=io.confluent.connect.gcp.bigtable.BigtableSinkConnector gcp.bigtable.credentials.path=$home/bigtable-test-credentials.json gcp.bigtable.project.id=YOUR-PROJECT-ID gcp.bigtable.instance.id=test-instance auto.create.tables=true aut.create.column.families=true table.name.format=example_table # The following define the Confluent license stored in Kafka, so we need the Kafka bootstrap addresses. # `replication.factor` may not be larger than the number of Kafka brokers in the destination cluster, # so here we set this to '1' for demonstration purposes. Always use at least '3' in production configurations. confluent.license= confluent.topic.bootstrap.servers=localhost:9092 confluent.topic.replication.factor=1 .. note:: Make sure to replace ``YOUR-PROJECT-ID`` with the project ID you created in the prerequisite portion of this quick start. Make sure to replace the ``$home`` with your home directory path, or any other path where the credentials file was saved. #. Start the BigTable sink connector by loading the connector's configuration with the following command: .. codewithvars:: bash |confluent_load| bigtable|dash| -d etc/kafka-connect-gcp-bigtable/sink-quickstart-bigtable.properties Your output should resemble the following: .. sourcecode:: json { "name": "bigtable", "config": { "topics": "stats", "tasks.max": "1", "connector.class": "io.confluent.connect.gcp.bigtable.BigtableSinkConnector", "gcp.bigtable.credentials.path": "$home/bigtable-test-credentials.json", "gcp.bigtable.instance.id": "test-instance", "gcp.bigtable.project.id": "YOUR-PROJECT-ID", "auto.create.tables": "true", "auto.create.column.families": "true", "table.name.format": "example_table", "confluent.license": "", "confluent.topic.bootstrap.servers": "localhost:9092", "confluent.topic.replication.factor": "1", "name": "bigtable" }, "tasks": [ { "connector": "bigtable", "task": 0 } ], "type": "sink" } #. Check the status of the connector to confirm that it is in a ``RUNNING`` state. .. codewithvars:: bash |confluent_status| bigtable Your output should resemble the following: .. sourcecode:: bash { "name": "bigtable", "connector": { "state": "RUNNING", "worker_id": "10.200.7.192:8083" }, "tasks": [ { "id": 0, "state": "RUNNING", "worker_id": "10.200.7.192:8083" } ], "type": "sink" } Send Data to |ak| ^^^^^^^^^^^^^^^^^ #. To produce some records into the ``stats`` topic, first start a |ak| producer. .. sourcecode:: bash bin/kafka-avro-console-producer \ --broker-list localhost:9092 --topic stats \ --property parse.key=true \ --property key.separator=, \ --property key.schema='{"type" : "string", "name" : "id"}' \ --property value.schema='{"type":"record","name":"myrecord", "fields":[{"name":"users","type":{"name": "columnfamily", "type":"record","fields":[{"name": "name", "type": "string"}, {"name": "friends", "type": "string"}]}}]}' #. The console producer is now waiting for input, so you can go ahead and insert some records into the topic. :: "simple-key-1", {"users": {"name":"Bob","friends": "1000"}} "simple-key-2", {"users": {"name":"Jess","friends": "10000"}} "simple-key-3", {"users": {"name":"John","friends": "10000"}} Check BigTable for Data ^^^^^^^^^^^^^^^^^^^^^^^ Use cbt to verify that the data has been written to BigTable. :: cbt read example_table You should see output resembling the example below: :: simple-key-1 user:name @ 2019/09/10-14:51:01.365000 Bob user:friends @ 2019/09/10-14:51:01.365000 1000 simple-key-2 user:name @ 2019/09/10-14:51:01.365000 Jess user:friends @ 2019/09/10-14:51:01.365000 10000 simple-key-3 user:name @ 2019/09/10-14:51:01.365000 John user:friends @ 2019/09/10-14:51:01.365000 10000 Clean up resources ^^^^^^^^^^^^^^^^^^ #. Delete the table. :: cbt deletetable example_table #. Delete the test instance. #. Click ``Instance details`` on the left sidebar. #. Click **Delete Instance** on the top toolbar and type the instance name to verify deletion. #. Delete the service account credentials used for the test. #. Open the **IAM & Admin** page in the GCP Console. #. Select your project and click **Continue**. #. In the left nav, click **Service accounts**. #. Locate the ``test-service-account`` and click the **More** button under actions. #. Click **Delete** and confirm deletion. Additional Documentation ------------------------ .. toctree:: :maxdepth: 1 configuration_options changelog