Google Cloud BigTable Sink Connector for Confluent Platform
The Kafka Connect BigTable Sink Connector allows moving data from Apache Kafka® to
Google Cloud BigTable. It writes data from a topic in Kafka to a table in the
specified BigTable instance. Auto-creation of tables and the auto-creation of
column families are also supported.
Features
At least once delivery
This connector guarantees that records from the Kafka topic are delivered at
least once.
Column mapping
Write operations require the specification of a column family
, a column
and a row key
for each cell
in the table. This connector expects Kafka
record values to be formatted as two level structs to be able to infer a
column family
and a column
for each value. Specifically, each Kafka
record value must fit the following schema:
{
"column family name 1": {
"column name 1": "value",
"column name 2": "value",
"...": "...",
},
"column family name 2": {
"column name 3": "value",
"column name 4": "value",
"...": "...",
},
"...": "..."
}
For example, consider the following Kafka record value:
{
"usage_stats": {
"daily_active_users": "10m",
"churn_rate": "5%"
},
"sales": {
"Jan": "10k",
"Feb": "12k"
}
}
If this record is written to an empty table, it would look like the example below:
|
usage_stats |
sales |
|
daily_active_users |
churn_rate |
Jan |
Feb |
“example_row_key” |
“10m” |
“5%” |
“10k” |
“12k” |
Where the first row represents the column families and the second row
represents the columns
If the record does not conform to this two-level struct schema, the connector
would attempt to gracefully handle the following cases:
If the record is a struct but some of the top-level fields are not structs then the values of these fields are mapped to a default column family.
As an example of this case, consider the following Kafka record value:
{
"usage_stats": {
"daily_active_users": "10m",
"churn_rate": "5%"
},
"sales": "10"
}
If this record is written to an empty table, the table would look like the example below:
|
usage_stats |
default_column_family |
|
daily_active_users |
churn_rate |
sales |
“example_row_key” |
“10m” |
“5%” |
“10k” |
Note
The default column family is the topic name and the default column name is
KAFKA_VALUE
If the record value is not a struct, the connector writes the entire value as a byte array to the default column and default column family.
If such a value were to be written to an empty table, the table would look like:
|
default_column_family |
|
default_column |
“example_row_key” |
kafka value |
Row key construction
This connector supports the construction of a row key from the Kafka record key.
Fields within the key can be concatenated together to form a row key. See the
Apache HBase Sink Connector Configuration Properties for additional
information.
Data types
Data from the Kafka record types are serialized into byte arrays before being
written. This connector uses the hbase Bytes
library to handle serializing. The following table shows how Kafka record types
are serialized in this connector.
Kafka Record Type |
Byte Array Serialization |
INT8, INT16, INT32, INT64, FLOAT32, FLOAT64, BOOLEAN, STRING |
Hbase Bytes |
BYTES |
Used as is |
DATE, TIMESTAMP |
Serialized as a Long (through Hbase Bytes) |
ARRAY, MAP, STRUCT |
Serialized as a stringified JSON object |
Auto table creation and auto column family creation
If auto.create.tables
is enabled, the connector can create the destination
table in cases where the table is missing.
If auto.create.column.families
is enabled, the connector can create missing
columns families in the table, relative to the record schema.
Note
Since it is sparse, columns are created on the fly if they don’t already
exist in the table, regardless of these settings.
Proxy settings
Note
When the proxy.url
proxy settings are configured, the system
property variables (https.proxyHost
and https.proxyPort
) are set
globally for the entire JVM.
Limitations
- The connector is subject to all quotas enforced by Google Bigtable
- The connector does not support batched
insert
operations, hence the
throughput on inserts is expected to be lower
- BigTable does not support
update
operations
- The Connector does not support
delete
operations
Install the BigTable Sink Connector
You can install this connector by using the Confluent Hub client installation
instructions or by manually
downloading the ZIP file.
Prerequisites
Important
You must install the connector on every machine where Connect will run.
An install of the Confluent Hub Client.
Note
This is installed by default with Confluent Enterprise.
An install of the latest (latest
) connector version.
To install the latest
connector version, navigate to your Confluent Platform
installation directory and run the following command:
confluent-hub install confluentinc/kafka-connect-gcp-bigtable:latest
You can install a specific version by replacing latest
with a version
number as shown in the following example:
confluent-hub install confluentinc/kafka-connect-gcp-bigtable:1.0.0-preview
Troubleshooting and Task Failures
You can use the Connect Kafka Connect REST
Interface to check the status of the connectors
and tasks. If a task or connector has failed, the trace
field will include a
reason and a stack trace. The vast majority of the errors thrown by this
connector fall into two categories:
- Record-level failures
- Connector-level failures
Table creation errors
Table creation can be a time-intensive task and sometimes the connector can fail
while attempting to create a table. In such cases, consider increasing the
retry.timeout.ms
.
Errors related to table creation might not only bubble up during table creation,
but also when trying to insert
. Following are stack trace examples for these
errors.
Caused by: org.apache.kafka.connect.errors.ConnectException: Error with inserting to table with
table name example_table: Failed to perform operation. Operation='checkAndPut', projectId='123',
tableName='example_table', rowKey='simple-key-4'
...
Caused by: io.grpc.StatusRuntimeException: FAILED_PRECONDITION: Table currently being created
Caused by: org.apache.kafka.connect.errors.ConnectException: Error with inserting to table with
table name example_table: Failed to perform operation. Operation='checkAndPut', projectId='123',
tableName='example_table', rowKey='simple-key-4'
...
Caused by: io.grpc.StatusRuntimeException: NOT_FOUND: Table not found:
Note
The retry.timeout.ms
defaults to 90
seconds and specifies the maximum
time in milliseconds allocated for retrying database operations. If
auto.create.tables
is configured consider leaving this configuration as
is, or making it higher, as table creation generally takes at least a minute or two.
Schema errors
If auto.create.column.families
is not enabled, many record-level failures
can occur because the connector may attempt to write to a column family that
does not exist. This is likely to occur if the connector does not receive a
two-level struct record value, and then attempts to write the data to the
default column family (the kafka topic). If this happens, consider using
Single Message Transformation to
reconfigure the record to fit the connector’s expectation or enable
auto.create.column.families
.
Authorization failures
The BigTable connector must authenticate with a BigTable instance and establish
a connection. If a connection fails because of authentication, the connector
will stop immediately. These errors may require changes in your Google Cloud
account which may include creating service account keys. Try to rerun your
connector after you make the account changes. See service account keys
for more information.
Quota failures
The connector might fail due to exceeding some of the BigTable Quotas.
Here are some commonly seen quota errors:
The connector might fail because the connector exceeds the message quota defined as per user per 100 seconds
. In this case, set retry.timeout.ms
high enough that the connector is able to retry operation after the quota resets.
The following shows an example stack trace:
Caused by: org.apache.kafka.connect.errors.ConnectException: ...
...
ERROR Could not complete RPC. Failure #0, got:
Status{code=RESOURCE_EXHAUSTED, description=Quota exceeded for quota
group 'TablesWriteGroup' and limit 'USER-100s' of service
'bigtableadmin.googleapis.com' for consumer 'project_number: ..
Occasionally, the connector might exceed quotas defined per project per day
. In this case, restarting the connector will not fix the error.
Some quota errors may be related to excessive column family creation (BigTable caps column families at a 100 per table). Consider revising the table schema so the connector is not trying to create too many column families. See BigTable schema design for additional information.
Enabling Debug Logging
The Connect worker log configuration controls how much detail is included in
the logs. By default, the worker logs include enough detail to identify basic
functionality. Enable DEBUG logs in the Connect worker’s log configuration to
include more details. This change must be made on each worker and only takes
effect upon worker startup. After you change the log configuration as outlined
below on each Connect worker, restart all of the Connect workers. A
rolling restart can be used if necessary.
Note
Trace-level logging is verbose and contains many more details, and may be
useful to solve certain failures. Trace-level logging is enabled like debug-level logging is enabled, except TRACE
is used instead of DEBUG
.
On-premises installation
For local or on-premises installations of Confluent Platform, the
etc/kafka/connect-log4j.properties
file defines the logging configuration of
the Connect worker process. To enable DEBUG on just the BigTable connector,
modify the etc/kafka/connect-log4j.properties
file to include the following
line:
log4j.logger.io.confluent.gcp.bigtable=DEBUG
To enable DEBUG on all of the Connect worker’s code, including all
connectors, change the log4j.rootLogger=
line to use DEBUG
instead of
INFO
. For example, the default log configuration for Connect includes
this line:
log4j.rootLogger=INFO, stdout
Change this line to the following to enable DEBUG on all of the Connect worker code:
log4j.rootLogger=DEBUG, stdout
Note
This setting may generate a large number of logs from
org.apache.kafka.clients
packages, which can be suppressed by setting
log4j.logger.org.apache.kafka.clients=ERROR
.
Quick Start
In this quick start, the BigTable sink connector is used to export data produced by the Avro
console producer to a table in a BigTable instance.
Prerequisites
- Cloud BigTable Prerequisites
-
- Confluent Prerequisites
-
Set up credentials
Create a service account and service account key under the GCP project.
- Open the IAM & Admin page in the GCP Console.
- Select your project and click Continue.
- In the left nav, click Service accounts.
- In the top toolbar, click Create Service Account.
- Enter the service account name and description; for example
test-service-account
.
- Click Create and on the next page select the role
BigTable Administrator
under Cloud BigTable
.
- On the next page click Create Key and download the JSON file.
- For this quickstart save the file under your
$home
directory and name it bigtable-test-credentials.json
.
More information on service account keys can be found here.
Create a BigTable instance
Create a test instance named test-instance
in BigTable using the console.
See detailed steps for creating an instance.
Install and load the connector
Install the connector through the Confluent Hub Client.
# run from your CP installation directory
confluent-hub install confluentinc/kafka-connect-gcp-bigtable:latest
Tip
By default, it will install the plugin into share/confluent-hub-components
and add the directory to the plugin path.
Adding a new connector plugin requires restarting Connect. Use the Confluent CLI to restart Connect.
confluent local services connect stop && confluent local services connect start
Configure your connector by adding the file etc/kafka-connect-gcp-bigtable/sink-quickstart-bigtable.properties
,
with the following properties:
name=BigTableSinkConnector
topics=stats
tasks.max=1
connector.class=io.confluent.connect.gcp.bigtable.BigtableSinkConnector
gcp.bigtable.credentials.path=$home/bigtable-test-credentials.json
gcp.bigtable.project.id=YOUR-PROJECT-ID
gcp.bigtable.instance.id=test-instance
auto.create.tables=true
aut.create.column.families=true
table.name.format=example_table
# The following define the Confluent license stored in Kafka, so we need the Kafka bootstrap addresses.
# `replication.factor` may not be larger than the number of Kafka brokers in the destination cluster,
# so here we set this to '1' for demonstration purposes. Always use at least '3' in production configurations.
confluent.license=
confluent.topic.bootstrap.servers=localhost:9092
confluent.topic.replication.factor=1
Note
Make sure to replace YOUR-PROJECT-ID
with the project ID you created
in the prerequisite portion of this quick start. Make sure to replace the
$home
with your home directory path, or any other path where the
credentials file was saved.
Start the BigTable sink connector by loading the connector’s configuration with the following command:
confluent local services connect connector load bigtable --config etc/kafka-connect-gcp-bigtable/sink-quickstart-bigtable.properties
Your output should resemble the following:
{
"name": "bigtable",
"config": {
"topics": "stats",
"tasks.max": "1",
"connector.class": "io.confluent.connect.gcp.bigtable.BigtableSinkConnector",
"gcp.bigtable.credentials.path": "$home/bigtable-test-credentials.json",
"gcp.bigtable.instance.id": "test-instance",
"gcp.bigtable.project.id": "YOUR-PROJECT-ID",
"auto.create.tables": "true",
"auto.create.column.families": "true",
"table.name.format": "example_table",
"confluent.license": "",
"confluent.topic.bootstrap.servers": "localhost:9092",
"confluent.topic.replication.factor": "1",
"name": "bigtable"
},
"tasks": [
{
"connector": "bigtable",
"task": 0
}
],
"type": "sink"
}
Check the status of the connector to confirm that it is in a RUNNING
state.
confluent local services connect connector status bigtable
Your output should resemble the following:
{
"name": "bigtable",
"connector": {
"state": "RUNNING",
"worker_id": "10.200.7.192:8083"
},
"tasks": [
{
"id": 0,
"state": "RUNNING",
"worker_id": "10.200.7.192:8083"
}
],
"type": "sink"
}
Send data to Kafka
To produce some records into the stats
topic, first start a Kafka producer.
bin/kafka-avro-console-producer \
--broker-list localhost:9092 --topic stats \
--property parse.key=true \
--property key.separator=, \
--property key.schema='{"type" : "string", "name" : "id"}' \
--property value.schema='{"type":"record","name":"myrecord",
"fields":[{"name":"users","type":{"name": "columnfamily",
"type":"record","fields":[{"name": "name", "type": "string"},
{"name": "friends", "type": "string"}]}}]}'
The console producer is now waiting for input, so you can go ahead and insert some records into the topic.
"simple-key-1", {"users": {"name":"Bob","friends": "1000"}}
"simple-key-2", {"users": {"name":"Jess","friends": "10000"}}
"simple-key-3", {"users": {"name":"John","friends": "10000"}}
Check BigTable for data
Use cbt to verify that the data has been written to BigTable.
You should see output resembling the example below:
simple-key-1
user:name @ 2019/09/10-14:51:01.365000
Bob
user:friends @ 2019/09/10-14:51:01.365000
1000
simple-key-2
user:name @ 2019/09/10-14:51:01.365000
Jess
user:friends @ 2019/09/10-14:51:01.365000
10000
simple-key-3
user:name @ 2019/09/10-14:51:01.365000
John
user:friends @ 2019/09/10-14:51:01.365000
10000
Clean up resources
Delete the table.
cbt deletetable example_table
Delete the test instance.
- Click
Instance details
on the left sidebar.
- Click Delete Instance on the top toolbar and type the instance name to verify deletion.
Delete the service account credentials used for the test.
- Open the IAM & Admin page in the GCP Console.
- Select your project and click Continue.
- In the left nav, click Service accounts.
- Locate the
test-service-account
and click the More button under Actions.
- Click Delete and confirm deletion.
Additional Documentation
GOOGLE CLOUD BIGTABLE SINK