Important
You are viewing documentation for an older version of Confluent Platform. For the latest, click here.
Kafka Connect Google Cloud Spanner Sink Connector¶
The Cloud Spanner Sink Connector allows moving data from Kafka to a Google Cloud Spanner database. It writes data from a topic in Kafka to a table in the specified Spanner database. Auto-creation of tables and limited auto-evolution are also supported.
Prerequisites¶
The following are required to run the Kafka Connect GCP Spanner Sink Connector:
- Confluent Platform 4.0.0 or above, or Kafka 1.0.0 or above
- Java 1.8
- At minimum,
roles/spanner.databaseUser
is required for this connector. See Access control for Cloud Spanner.
Limitations¶
- Cloud Spanner has table size and query limitations that apply to the connector.
- The performance limitations of instances apply to the connector.
- The connector does not support creation of interleaved tables.
- When
auto.evolve
is enabled, if a new column with a default value is added, that default value is only used for new records. Existing records will have<NULL>
as the value for the new column.
Features¶
Data mapping¶
The sink connector requires knowledge of schemas, so you should use a suitable
converter, like the Avro converter that comes with Schema Registry or the JSON converter
with schemas enabled. Kafka record keys, if present, can be primitive types or a
Connect struct, and the record value must be a Connect struct. You may need to
implement a custom Converter
if the data in the topic is not in a compatible
format.
Data Types¶
Mapping from Kafka Record types to Spanner data types is detailed below.
Kafka Record Type | Spanner Type |
---|---|
INT8, INT16, INT32, INT64 | INT64 |
FLOAT32, FLOAT64 | FLOAT64 |
BOOLEAN | BOOL |
STRING | STRING(MAX) |
BYTES | BYTES(MAX) |
DATE | DATE |
TIMESTAMP | TIMESTAMP |
ARRAY<INT8>, ARRAY<INT16>, ARRAY<INT32>, ARRAY<INT64> | ARRAY<INT64> |
ARRAY<FLOAT32>, ARRAY<FLOAT64> | ARRAY<FLOAT64> |
ARRAY<BOOLEAN> | ARRAY<BOOL> |
ARRAY<STRING> | ARRAY<STRING(MAX)> |
NOT SUPPORTED | ARRAY<BYTES(MAX)> |
NOT SUPPORTED | ARRAY<DATE> |
NOT SUPPORTED | ARRAY<TIMESTAMP> |
MAP | NOT SUPPORTED |
STRUCT | NOT SUPPORTED |
Auto-creation and Auto-evolution¶
If auto.create
is enabled, the connector can create the destination table if
it is found to be missing. The connector uses the record schema as a basis for
the table definition, so the creation takes place online with records consumed
from the topic.
If auto.evolve
is enabled, the connector can perform limited auto-evolution when it encounters a record for which a column is found to be missing.
Since data-type changes and removal of columns can be dangerous, the connector does not attempt to perform such evolutions on the table.
Important
For backward-compatible table schema evolution, new fields in record schemas must be optional or have a default value.
Install the Cloud Spanner Connector¶
You can install this connector by using the Confluent Hub client (recommended) or you can manually download the ZIP file.
confluent-hub install confluentinc/kafka-connect-gcp-spanner:latest
You can install a specific version by replacing latest
with a version number. For example:
confluent-hub install confluentinc/kafka-connect-gcp-spanner:1.0.1
Install Connector Manually¶
Download and extract the ZIP file for your connector and then follow the manual connector installation instructions.
License¶
You can use this connector for a 30-day trial period without a license key.
After 30 days, this connector is available under a Confluent enterprise license. Confluent issues enterprise license keys to subscribers, along with providing enterprise-level support for Confluent Platform and your connectors. If you are a subscriber, please contact Confluent Support at support@confluent.io for more information.
See Confluent Platform license for license properties and License topic configuration for information about the license topic.
Quick Start¶
In this quick start, the Spanner sink connector is used to export data produced by the Avro console producer to a database in a Spanner instance.
Prerequisites¶
- Cloud Spanner Prerequisites
- Google Cloud Platform (GCP) Account
- A GCP project and billing enabled. For instructions, see the Cloud Spanner Quickstart.
- Confluent Prerequisites
- Confluent Platform
- Confluent CLI (requires separate installation)
Setup Credentials¶
Create a service account and service account key under the GCP project.
- Open the IAM & Admin page in the GCP Console.
- Select your project and click Continue.
- In the left nav, click Service accounts.
- In the top toolbar, click Create Service Account.
- Enter the service account name and description; for example
test-service-account
. - Click Create and on the next page select the role
Cloud Spanner Database Admin
underCloud Spanner
. - On the next page click Create Key and download the JSON file.
- For this quickstart save the file under your
$home
directory and name itspanner-test-credentials.json
.
For more information on service account keys, see Creating and managing service account keys.
Create a Spanner Instance and Database¶
- Create a test instance named
test-instance
in Spanner using the console. See the Cloud Spanner Quickstart for instructions. - Create a database named
example-db
under thetest-instance
. See the Cloud Spanner Quickstart for instructions.
Install and Load the Connector¶
Install the connector through the Confluent Hub Client.
# run from your CP installation directory confluent-hub install confluentinc/kafka-connect-gcp-spanner:latest
Tip
By default, it will install the plugin into
share/confluent-hub-components
and add the directory to the plugin path.Adding a new connector plugin requires restarting Kafka Connect. Use the Confluent CLI to restart Connect.
confluent stop connect && confluent start connect
Configure your connector by adding the file
spanner-sink.properties
, with the following properties:name=SpannerSinkConnector topics=products tasks.max=1 connector.class=io.confluent.connect.gcp.spanner.SpannerSinkConnector gcp.spanner.credentials.path=$home/spanner-test-credentials.json gcp.spanner.instance.id=test-instance gcp.spanner.database.id=example-db auto.create=true table.name.format=kafka_${topic} # The following define the Confluent license stored in Kafka, so we need the Kafka bootstrap addresses. # `replication.factor` may not be larger than the number of Kafka brokers in the destination cluster, # so here we set this to '1' for demonstration purposes. Always use at least '3' in production configurations. confluent.license= confluent.topic.bootstrap.servers=localhost:9092 confluent.topic.replication.factor=1
Note
Make sure to replace the
$home
with your home directory path, or any other path where the credentials file was saved.Start the Spanner sink connector by loading the connector’s configuration with the following command:
confluent load spanner -d spanner-sink.properties
Your output should resemble the following:
{ "name": "spanner", "config": { "topics": "products", "tasks.max": "1", "connector.class": "io.confluent.connect.gcp.spanner.SpannerSinkConnector", "gcp.spanner.credentials.path": "$home/spanner-test-credentials.json", "gcp.spanner.instance.id": "test-instance", "gcp.spanner.database.id": "example-db", "auto.create": "true", "table.name.format": "kafka_${topic}", "confluent.license": "", "confluent.topic.bootstrap.servers": "localhost:9092", "confluent.topic.replication.factor": "1", "name": "spanner" }, "tasks": [ { "connector": "spanner", "task": 0 } ], "type": "sink" }
Check the status of the connector to confirm that it is in a
RUNNING
state.confluent status spanner
Your output should resemble the following:
{ "name": "spanner", "connector": { "state": "RUNNING", "worker_id": "10.200.7.192:8083" }, "tasks": [ { "id": 0, "state": "RUNNING", "worker_id": "10.200.7.192:8083" } ], "type": "sink" }
Send Data to Kafka¶
To produce some records into the
products
topic, first start a Kafka producer.kafka-avro-console-producer \ --broker-list localhost:9092 --topic products \ --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"name","type":"string"}, {"name":"price", "type": "float"}, {"name":"quantity", "type": "int"}]}'
The console producer is now waiting for input, so you can go ahead and insert some records into the topic.
{"name": "scissors", "price": 2.75, "quantity": 3} {"name": "tape", "price": 0.99, "quantity": 10} {"name": "notebooks", "price": 1.99, "quantity": 5}
Check Spanner Database for Data¶
To verify that the data has been written to Spanner, you can use the GCP console (https://console.cloud.google.com/spanner/instances/test-instance/databases).
Under databases you should see the kafka_products
table. Clicking on the
table and then DATA
will show the three rows the have just been inserted.
connect_topic__ |
connect_partition__ |
connect_offset__ |
quantity | price | name |
---|---|---|---|---|---|
products | 0 | 0 | 3 | 2.75 | scissors |
products | 0 | 1 | 10 | 0.99 | tape |
products | 0 | 2 | 5 | 1.99 | notebooks |
Deleting Unnecessary Resources¶
Delete the test instance.
- Click
test-instance
on the left sidebar. - Click Delete Instance on the top toolbar and type the instance name to verify deletion.
Delete the service account credentials used for the test.
- Open the IAM & Admin page in the GCP Console.
- Select your project and click Continue.
- In the left nav, click Service accounts.
- Locate the
test-service-account
and click the More button under actions. - Click Delete and confirm deletion.
Troubleshooting Connector and Task Failures¶
You can use the Connect REST API to check the
status of the connectors and tasks. If a task or connector has failed, the
trace
field will include a reason and a stack trace.
Authorization Failures¶
The Cloud Spanner connector must authenticate with a Spanner instance and establish a connection. If a connection fails because of authentication, the connector will stop immediately. These errors may require changes in your Google Cloud account which may include creating service account keys. Try to rerun your connector after you make the account changes. More information on service account keys, see Creating and managing service account credentials.
Spanner Error Codes¶
Whenever the connector or task fails, it captures a message that includes the Spanner error code and error message. The message may also include a failure reason or suggested fix. See the Spanner error codes for error code details.
Repeated Connection Failures¶
If the connector stops frequently because of connection timeouts, consider changing the following connector configuration properties and restarting the connector:
- The
request.timeout.ms
defaults to 6 seconds (6000 milliseconds) and dictates the maximum amount of time that the connector should wait for a single spanner operation to succeed. If this time is exceeded the connector will attempt to retry. - The
retry.timeout.ms
defaults to 1 minutes (60000 milliseconds) and specifies the maximum amount of time that the connector should continue to retry a failed request. The actual duration of the delay is random and it grows exponentially with each retry. If all retries take longer than this value, the connector task will fail. This does not apply to initial connection attempts, but it does apply to subsequent requests to reconnect. Only retry-able errors from spanner will be retried.
Quota Failures¶
If the connector fails due to write limits being exceeded, consider changing the
max.batch.size
configuration property and restarting the connector. The max.batch.size
defaults to 1000 and specifies the maximum amount of rows to batch together for a Spanner operation (INSERT, UPDATE, UPSERT). If the row sizes are large, consider lowering the max batch size.
Connector Error Mode¶
By default, the error.mode
for the connector is FAIL
. This means if
there is an error when writing a batch of records to Spanner the connector
fails. It may be convenient in some cases to set the error.mode
to WARN
or INFO
instead so that the connector keeps running even after a particular
batch fails.
Proxy Settings¶
Note the following about proxy settings.
Note
When the gcp.spanner.proxy.url
proxy settings are configured, the system
property variables (https.proxyHost
and https.proxyPort
) are set
globally for the entire JVM.
Enabling Debug Logging¶
The Connect worker log configuration controls how much detail is included in the logs. By default, the worker logs include enough detail to identify basic functionality. Enable DEBUG logs in the Connect worker’s log configuration to include more details. This change must be made on each worker and only takes affect upon worker startup. After you change the log configuration as outlined below on each Connect worker, restart all of the Connect workers. A rolling restart can be used if necessary.
Note
Trace level logging is verbose and contains many more details, and may be
useful to solve certain failures. Trace level logging is enabled like debug
level logging is enabled, except TRACE
is used instead of DEBUG
.
On-Premise Installation¶
For local or on-premise installations of Confluent Platform, the
etc/kafka/connect-log4j.properties
file defines the logging configuration of
the Connect worker process. To enable DEBUG on just the Spanner connector,
modify the etc/kafka/connect-log4j.properties
file to include the following
line:
:name: connect-log4j.properties
log4j.logger.io.confluent.gcp.spanner=DEBUG
To enable DEBUG on all of the Connect worker’s code, including all
connectors, change the log4j.rootLogger=
line to use DEBUG
instead of
INFO
. For example, the default log configuration for Connect includes
this line:
:name: connect-log4j.properties
log4j.rootLogger=INFO, stdout
Change this line to the following to enable DEBUG on all of the Connect worker code:
:name: connect-log4j.properties
log4j.rootLogger=DEBUG, stdout
Note
This setting causes may generate a large amount of logs from
org.apache.kafka.clients
packages, which can be suppressed by setting
log4j.logger.org.apache.kafka.clients=ERROR
.