Google Cloud BigTable Sink Connector for Confluent Platform

The Kafka Connect BigTable Sink connector allows moving data from Apache Kafka® to Google Cloud BigTable. It writes data from a topic in Kafka to a table in the specified BigTable instance. Auto-creation of tables and the auto-creation of column families are also supported.

Features

At least once delivery

This connector guarantees that records from the Kafka topic are delivered at least once.

Dead Letter Queue

This connector supports the Dead Letter Queue (DLQ) functionality. For information about accessing and using the DLQ, see Confluent Platform Dead Letter Queue.

Multiple tasks

The BigTable Sink connector supports running one or more tasks. You can specify the number of tasks in the tasks.max configuration parameter. This can lead to performance gains when multiple files need to be parsed.

Column mapping

Write operations require the specification of a column family, a column and a row key for each cell in the table. This connector expects Kafka record values to be formatted as two level structs to be able to infer a column family and a column for each value. Specifically, each Kafka record value must fit the following schema:

Important

The Hbase Sink connector requires a non-null value for every field if the input is a struct or primitive–this is because writing a null value into a cell will cause the connector to delete that cell. To prevent this behavior, ensure either of the following:

  • The fields have a non-null default.
  • Every incoming record has non-null values for all fields. If the values are primitives, they should be non-null as well.
{
  "column family name 1": {
    "column name 1": "value",
    "column name 2": "value",
    "...": "...",
  },
  "column family name 2": {
    "column name 3": "value",
    "column name 4": "value",
    "...": "...",
  },
   "...": "..."
}

For example, consider the following Kafka record value:

{
  "usage_stats": {
    "daily_active_users": "10m",
    "churn_rate": "5%"
  },
  "sales": {
    "Jan": "10k",
    "Feb": "12k"
  }
}

If this record is written to an empty table, it would look like the example below:

  usage_stats sales
  daily_active_users churn_rate Jan Feb
“example_row_key” “10m” “5%” “10k” “12k”

Where the first row represents the column families and the second row represents the columns

If the record does not conform to this two-level struct schema, the connector would attempt to gracefully handle the following cases:

  • If the record is a struct but some of the top-level fields are not structs then the values of these fields are mapped to a default column family.

    As an example of this case, consider the following Kafka record value:

    {
      "usage_stats": {
        "daily_active_users": "10m",
        "churn_rate": "5%"
      },
      "sales": "10"
    }
    

    If this record is written to an empty table, the table would look like the example below:

      usage_stats default_column_family
      daily_active_users churn_rate sales
    “example_row_key” “10m” “5%” “10k”

Note that the default column family is the topic name and the default column name is KAFKA_VALUE.

  • If the record value is not a struct, the connector writes the entire value as a byte array to the default column and default column family.

    If such a value were to be written to an empty table, the table would look like:

      default_column_family
      default_column
    “example_row_key” kafka value

Row key construction

This connector supports the construction of a row key from the Kafka record key.

Fields within the key can be concatenated together to form a row key. For more information, see the Configuration Reference for Google Cloud BigTable Sink Connector for Confluent Platform. For more complex row key construction, consider using Single Message Transformation to format the record key as desired.

Data types

Data from the Kafka record types are serialized into byte arrays before being written. This connector uses the hbase Bytes library to handle serializing. The following table shows how Kafka record types are serialized in this connector.

Kafka Record Type Byte Array Serialization
INT8, INT16, INT32, INT64, FLOAT32, FLOAT64, BOOLEAN, STRING Hbase Bytes
BYTES Used as is
DATE, TIMESTAMP Serialized as a Long (through Hbase Bytes)
ARRAY, MAP, STRUCT Serialized as a stringified JSON object

Auto table creation and auto column family creation

If auto.create.tables is enabled, the connector can create the destination table in cases where the table is missing.

If auto.create.column.families is enabled, the connector can create missing columns families in the table, relative to the record schema.

Columns are created as needed if they don’t already exist in the table, regardless of the aforementioned settings.

Proxy settings

When the proxy.url proxy settings are configured, the system property variables (https.proxyHost and https.proxyPort) are set globally for the entire JVM.

Input data formats

The BigTable Sink connector supports AVRO, JSON Schema, and PROTOBUF input data.

Limitations

  • The connector is subject to all quotas enforced by Google Bigtable
  • The connector does not support batched insert operations, hence the throughput on inserts is expected to be lower
  • BigTable does not support update operations
  • The connector does not support delete operations

Install the BigTable Sink Connector

You can install this connector by using the confluent connect plugin install command, or by manually downloading the ZIP file.

Prerequisites

  • You must install the connector on every machine where Connect will run.
  • Confluent CLI (requires separate installation)
  • An installation of the latest (latest) connector version.

Install the connector using the Confluent CLI

To install the latest connector version, navigate to your Confluent Platform installation directory and run the following command:

confluent connect plugin install confluentinc/kafka-connect-gcp-bigtable:latest

You can install a specific version by replacing latest with a version number as shown in the following example:

confluent connect plugin install confluentinc/kafka-connect-gcp-bigtable:2.0.4

Install the connector manually

Download and extract the ZIP file for your connector and then follow the manual connector installation instructions.

License

You can use this connector for a 30-day trial period without a license key.

After 30 days, you must purchase a connector subscription which includes Confluent enterprise license keys to subscribers, along with enterprise-level support for Confluent Platform and your connectors. If you are a subscriber, you can contact Confluent Support at support@confluent.io for more information.

For license properties, see Confluent Platform license. For information about the license topic, see License topic configuration.

Configuration properties

For a complete list of configuration properties for this connector, see Configuration Reference for Google Cloud BigTable Sink Connector for Confluent Platform.

Troubleshooting and task failures

You can use the Connect Kafka Connect REST Interface to check the status of the connectors and tasks. If a task or connector has failed, the trace field will include a reason and a stack trace. The vast majority of the errors thrown by this connector fall into two categories:

  • Record-level failures
  • Connector-level failures

Table creation errors

Table creation can be a time-intensive task and sometimes the connector can fail while attempting to create a table. In such cases, consider increasing the retry.timeout.ms.

Errors related to table creation might not only bubble up during table creation, but also when trying to insert. Following are stack trace examples for these errors.

Caused by: org.apache.kafka.connect.errors.ConnectException: Error with inserting to table with
table name example_table: Failed to perform operation. Operation='checkAndPut', projectId='123',
tableName='example_table', rowKey='simple-key-4'
...
Caused by: io.grpc.StatusRuntimeException: FAILED_PRECONDITION: Table currently being created
Caused by: org.apache.kafka.connect.errors.ConnectException: Error with inserting to table with
table name example_table: Failed to perform operation. Operation='checkAndPut', projectId='123',
tableName='example_table', rowKey='simple-key-4'
...
Caused by: io.grpc.StatusRuntimeException: NOT_FOUND: Table not found:

Note that the retry.timeout.ms defaults to 90 seconds and specifies the maximum time in milliseconds allocated for retrying database operations. If auto.create.tables is configured, consider leaving this configuration as is, or making it higher, as table creation generally takes at least a minute or two.

Schema errors

If auto.create.column.families is not enabled, many record-level failures can occur because the connector may attempt to write to a column family that does not exist. This is likely to occur if the connector does not receive a two-level struct record value, and then attempts to write the data to the default column family (the kafka topic). If this happens, consider using Single Message Transformation to reconfigure the record to fit the connector’s expectation or enable auto.create.column.families.

Authorization failures

The BigTable connector must authenticate with a BigTable instance and establish a connection. If a connection fails because of authentication, the connector will stop immediately. These errors may require changes in your Google Cloud account which may include creating service account keys. Try to rerun your connector after you make the account changes. See service account keys for more information.

Quota failures

The connector might fail due to exceeding some of the BigTable Quotas.

Here are some commonly seen quota errors:

  • The connector might fail because the connector exceeds the message quota defined as per user per 100 seconds. In this case, set retry.timeout.ms high enough that the connector is able to retry operation after the quota resets.

    The following shows an example stack trace:

    Caused by: org.apache.kafka.connect.errors.ConnectException: ...
    ...
    ERROR Could not complete RPC. Failure #0, got:
    Status{code=RESOURCE_EXHAUSTED, description=Quota exceeded for quota
    group 'TablesWriteGroup' and limit 'USER-100s' of service
    'bigtableadmin.googleapis.com' for consumer 'project_number: ..
    
  • Occasionally, the connector might exceed quotas defined per project per day. In this case, restarting the connector will not fix the error.

  • Some quota errors may be related to excessive column family creation (BigTable caps column families at a 100 per table). Consider revising the table schema so the connector is not trying to create too many column families. See BigTable schema design for additional information.

Enabling debug logging

The Connect worker log configuration controls how much detail is included in the logs. By default, the worker logs include enough detail to identify basic functionality. Enable DEBUG logs in the Connect worker’s log configuration to include more details. This change must be made on each worker and only takes effect upon worker startup. After you change the log configuration as outlined below on each Connect worker, restart all of the Connect workers. A rolling restart can be used if necessary.

Note that trace-level logging includes many more details and may be useful when trying to solve certain failures. You can enable trace-level logging in the same way you enable debug-level logging. The only difference is that must you use TRACE instead of DEBUG.

On-premises installation

For local or on-premises installations of Confluent Platform, the etc/kafka/connect-log4j.properties file defines the logging configuration of the Connect worker process. To enable DEBUG on just the BigTable connector, modify the etc/kafka/connect-log4j.properties file to include the following line:

log4j.logger.io.confluent.gcp.bigtable=DEBUG

To enable DEBUG on all of the Connect worker’s code, including all connectors, change the log4j.rootLogger= line to use DEBUG instead of INFO. For example, the default log configuration for Connect includes this line:

log4j.rootLogger=INFO, stdout

Change this line to the following to enable DEBUG on all of the Connect worker code:

log4j.rootLogger=DEBUG, stdout

Note that this setting may generate a large number of logs from org.apache.kafka.clients packages, which can be suppressed by setting log4j.logger.org.apache.kafka.clients=ERROR.

Quick start

In this quick start, the BigTable Sink connector is used to export data produced by the Avro console producer to a table in a BigTable instance.

For an example of how to get Kafka Connect connected to Confluent Cloud, see Connect Self-Managed Kafka Connect to Confluent Cloud.

Cloud BigTable prerequisites

Before running this quick start, ensure you have the following:

Set up credentials

Create a service account and service account key under the GCP project.

  1. Open the IAM & Admin page in the GCP Console.
  2. Select your project and click Continue.
  3. In the left nav, click Service accounts.
  4. In the top toolbar, click Create Service Account.
  5. Enter the service account name and description; for example test-service-account.
  6. Click Create and on the next page select the role BigTable Administrator under Cloud BigTable.
  7. On the next page click Create Key and download the JSON file.
  8. For this quick start, save the file under your $home directory and name it bigtable-test-credentials.json.

For more information on service account keys, see Create and delete service account keys in the Google Cloud documentation.

Create a BigTable instance

Create a test instance named test-instance in BigTable using the console. For help with creating an instance, see detailed steps in the Google Cloud documentation.

Install and load the connector

  1. Install the connector through the Confluent Hub Client.

    # run from your CP installation directory
    confluent connect plugin install confluentinc/kafka-connect-gcp-bigtable:latest
    

    Note that by default, it will install the plugin into share/confluent-hub-components and add the directory to the plugin path.

  2. Adding a new connector plugin requires restarting Connect. Use the Confluent CLI to restart Kafka Connect.

    confluent local services connect stop && confluent local services connect start
    
  3. Configure your connector by adding the file etc/kafka-connect-gcp-bigtable/sink-quickstart-bigtable.properties, with the following properties:

    name=BigTableSinkConnector
    topics=stats
    tasks.max=1
    connector.class=io.confluent.connect.gcp.bigtable.BigtableSinkConnector
    
    gcp.bigtable.credentials.path=$home/bigtable-test-credentials.json
    gcp.bigtable.project.id=YOUR-PROJECT-ID
    gcp.bigtable.instance.id=test-instance
    auto.create.tables=true
    aut.create.column.families=true
    table.name.format=example_table
    
    # The following define the Confluent license stored in Kafka, so we need the Kafka bootstrap addresses.
    # `replication.factor` may not be larger than the number of Kafka brokers in the destination cluster,
    # so here we set this to '1' for demonstration purposes. Always use at least '3' in production configurations.
    confluent.license=
    confluent.topic.bootstrap.servers=localhost:9092
    confluent.topic.replication.factor=1
    

    Ensure you replace YOUR-PROJECT-ID with the project ID you created in the prerequisite portion of this quick start. You should also replace $home with your home directory path, or any other path where the credentials file was saved.

  4. Start the BigTable Sink connector by loading the connector’s configuration using the following command:

    confluent local services connect connector load bigtable --config etc/kafka-connect-gcp-bigtable/sink-quickstart-bigtable.properties
    

    Your output should resemble the following:

    {
      "name": "bigtable",
      "config": {
        "topics": "stats",
        "tasks.max": "1",
        "connector.class": "io.confluent.connect.gcp.bigtable.BigtableSinkConnector",
        "gcp.bigtable.credentials.path": "$home/bigtable-test-credentials.json",
        "gcp.bigtable.instance.id": "test-instance",
        "gcp.bigtable.project.id": "YOUR-PROJECT-ID",
        "auto.create.tables": "true",
        "auto.create.column.families": "true",
        "table.name.format": "example_table",
        "confluent.license": "",
        "confluent.topic.bootstrap.servers": "localhost:9092",
        "confluent.topic.replication.factor": "1",
        "name": "bigtable"
      },
      "tasks": [
        {
          "connector": "bigtable",
          "task": 0
        }
      ],
      "type": "sink"
    }
    
  5. Check the status of the connector to confirm it’s in a RUNNING state.

    confluent local services connect connector status bigtable
    

    Your output should resemble the following:

    {
      "name": "bigtable",
      "connector": {
        "state": "RUNNING",
        "worker_id": "10.200.7.192:8083"
      },
      "tasks": [
        {
          "id": 0,
          "state": "RUNNING",
          "worker_id": "10.200.7.192:8083"
        }
      ],
      "type": "sink"
    }
    

Send data to Kafka

  1. To produce some records into the stats topic, first start a Kafka producer.

     bin/kafka-avro-console-producer \
     --broker-list localhost:9092 --topic stats \
    --property parse.key=true \
    --property key.separator=, \
    --property key.schema='{"type" : "string", "name" : "id"}' \
     --property value.schema='{"type":"record","name":"myrecord",
     "fields":[{"name":"users","type":{"name": "columnfamily",
     "type":"record","fields":[{"name": "name", "type": "string"},
     {"name": "friends", "type": "string"}]}}]}'
    
  2. The console producer is now waiting for input, so you can go ahead and insert some records into the topic.

    "simple-key-1", {"users": {"name":"Bob","friends": "1000"}}
    "simple-key-2", {"users": {"name":"Jess","friends": "10000"}}
    "simple-key-3", {"users": {"name":"John","friends": "10000"}}
    

Check BigTable for data

Use the cbt CLI to verify the data has been written to BigTable.

cbt read example_table

You should see output resembling the example below:

simple-key-1
  user:name                           @ 2019/09/10-14:51:01.365000
    Bob
  user:friends                        @ 2019/09/10-14:51:01.365000
    1000
simple-key-2
  user:name                           @ 2019/09/10-14:51:01.365000
    Jess
  user:friends                        @ 2019/09/10-14:51:01.365000
    10000
simple-key-3
  user:name                           @ 2019/09/10-14:51:01.365000
    John
  user:friends                        @ 2019/09/10-14:51:01.365000
    10000

Clean up resources

  1. Delete the table.

    ::

    cbt deletetable example_table

  2. Delete the test instance.

    1. Click Instance details on the left sidebar.
    2. Click Delete Instance on the top toolbar and type the instance name to verify deletion.
  3. Delete the service account credentials used for the test.

    1. Open the IAM & Admin page in the GCP Console.
    2. Select your project and click Continue.
    3. In the left navigation, click Service accounts.
    4. Locate the test-service-account and click the More button under Actions.
    5. Click Delete and confirm deletion.