Google Cloud Spanner Sink Connector for Confluent Platform¶
The Google Cloud Spanner Sink connector moves data from Apache Kafka® to a Google Cloud Spanner database. It writes data from a topic in Kafka to a table in the specified Spanner database. Table auto-creation and limited auto-evolution are also supported.
Features¶
- At least once delivery
- Dead Letter Queue
- Multiple tasks
- Data mapping
- Data types
- Auto-creation and auto-evolution
At least once delivery¶
This connector guarantees that records from the Kafka topic are delivered at least once.
Dead Letter Queue¶
This connector supports the Dead Letter Queue (DLQ) functionality. For information about accessing and using the DLQ, see Confluent Platform Dead Letter Queue.
Multiple tasks¶
The Google Cloud Spanner Sink connector supports running one or more tasks. You can
specify the number of tasks in the tasks.max
configuration parameter. This
can lead to performance gains when multiple files need to be parsed.
Data mapping¶
The sink connector requires knowledge of schemas, so you should use a suitable
converter, like the Avro converter that comes with Schema Registry or the JSON converter
with schemas enabled. Kafka record keys, if present, can be primitive types or a
Connect struct, and the record value must be a Connect struct. You may need to
implement a custom Converter
if the data in the topic is not in a compatible
format.
Data types¶
Mapping from Kafka record types to Spanner data types is detailed below.
Kafka Record Type | Spanner Type |
---|---|
INT8, INT16, INT32, INT64 | INT64 |
FLOAT32, FLOAT64 | FLOAT64 |
BOOLEAN | BOOL |
STRING | STRING(MAX) |
BYTES | BYTES(MAX) |
DATE | DATE |
TIMESTAMP | TIMESTAMP |
ARRAY<INT8>, ARRAY<INT16>, ARRAY<INT32>, ARRAY<INT64> | ARRAY<INT64> |
ARRAY<FLOAT32>, ARRAY<FLOAT64> | ARRAY<FLOAT64> |
ARRAY<BOOLEAN> | ARRAY<BOOL> |
ARRAY<STRING> | ARRAY<STRING(MAX)> |
NOT SUPPORTED | ARRAY<BYTES(MAX)> |
NOT SUPPORTED | ARRAY<DATE> |
NOT SUPPORTED | ARRAY<TIMESTAMP> |
MAP | NOT SUPPORTED |
STRUCT | NOT SUPPORTED |
Auto-creation and auto-evolution¶
If auto.create
is enabled, the connector can create the destination table if
it is found to be missing. The connector uses the record schema as a basis for
the table definition, so the creation takes place online with records consumed
from the topic.
If auto.evolve
is enabled, the connector can perform limited auto-evolution
when it encounters a record for which a column is found to be missing. Since
data-type changes and removal of columns can be dangerous, the connector does
not attempt to perform such evolutions on the table.
Important
For backward-compatible table schema evolution, new fields in record schemas must be optional or have a default value.
Limitations¶
- The connector does not support PostgreSQL dialect.
- Cloud Spanner has table size and query limitations that apply to the connector.
- The performance limitations of instances apply to the connector.
- The connector does not support creation of interleaved tables.
- When
auto.evolve
is enabled, if a new column with a default value is added, that default value is only used for new records. Existing records will have<NULL>
as the value for the new column. - The connector does not currently support Single Message Transformations (SMTs)
that modify the topic name. Additionally, the following transformations are not
allowed:
io.debezium.transforms.ByLogicalTableRouter
io.debezium.transforms.outbox.EventRouter
org.apache.kafka.connect.transforms.RegexRouter
org.apache.kafka.connect.transforms.TimestampRouter
io.confluent.connect.transforms.MessageTimestampRouter
io.confluent.connect.transforms.ExtractTopic$Key
io.confluent.connect.transforms.ExtractTopic$Value
License¶
You can use this connector for a 30-day trial period without a license key.
After 30 days, you must purchase a connector subscription which includes Confluent enterprise license keys to subscribers, along with enterprise-level support for Confluent Platform and your connectors. If you are a subscriber, you can contact Confluent Support at support@confluent.io for more information.
See Confluent Platform license for license properties and Confluent License Properties for information about the license topic.
Configuration Properties¶
For a complete list of configuration properties for this connector, see Configuration Reference for Google Cloud Spanner Sink Connector for Confluent Platform.
For an example of how to get Kafka Connect connected to Confluent Cloud, see Connect Self-Managed Kafka Connect to Confluent Cloud.
Install the Cloud Spanner Connector¶
You can install this connector by using the confluent connect plugin install command, or by manually downloading the ZIP file.
Prerequisites¶
You must install the connector on every machine where Connect will run.
Confluent Platform 4.0.0 or later, or Kafka 1.0.0 or later. For installation help, see Confluent Platform.
Confluent CLI (requires separate installation).
A GCP project and billing enabled. For instructions, see the Cloud Spanner Quickstart.
Java 1.8.
At minimum,
roles/spanner.databaseUser
is required for this connector. For details, see Access control for Cloud Spanner.An installation of the latest (
latest
) connector version.To install the
latest
connector version, navigate to your Confluent Platform installation directory and run the following command:confluent connect plugin install confluentinc/kafka-connect-gcp-spanner:latest
You can install a specific version by replacing
latest
with a version number as shown in the following example:confluent connect plugin install confluentinc/kafka-connect-gcp-spanner:1.0.8
Install the connector manually¶
Download and extract the ZIP file for your connector and then follow the manual connector installation instructions.
Quick Start¶
In this quick start, the Spanner sink connector is used to export data produced by the Avro console producer to a database in a Spanner instance.
Setup credentials¶
Create a service account and service account key under the GCP project.
- Open the IAM & Admin page in the GCP Console.
- Select your project and click Continue.
- In the left nav, click Service accounts.
- In the top toolbar, click Create Service Account.
- Enter the service account name and description; for example
test-service-account
. - Click Create and on the next page select the role
Cloud Spanner Database Admin
underCloud Spanner
. - On the next page click Create Key and download the JSON file.
- For this quick start, save the file under your
$home
directory and name itspanner-test-credentials.json
.
For more information on service account keys, see Creating and managing service account keys.
Create a Spanner Instance and Database¶
- Create a test instance named
test-instance
in Spanner using the console. See the Cloud Spanner Quickstart for instructions. - Create a database named
example-db
under thetest-instance
. See the Cloud Spanner Quickstart for instructions.
Install and load the connector¶
Install the connector using the following CLI command:
# run from your CP installation directory confluent connect plugin install confluentinc/kafka-connect-gcp-spanner:latest
Adding a new connector plugin requires restarting Kafka Connect. Use the following command to restart Connect.
confluent local services connect stop && confluent local services connect start
Configure your connector by adding the file
spanner-sink.properties
, with the following properties:name=SpannerSinkConnector topics=products tasks.max=1 connector.class=io.confluent.connect.gcp.spanner.SpannerSinkConnector gcp.spanner.credentials.path=$home/spanner-test-credentials.json gcp.spanner.instance.id=test-instance gcp.spanner.database.id=example-db auto.create=true table.name.format=kafka_${topic} # The following define the Confluent license stored in Kafka, so we need the Kafka bootstrap addresses. # `replication.factor` may not be larger than the number of Kafka brokers in the destination cluster, # so here we set this to '1' for demonstration purposes. Always use at least '3' in production configurations. confluent.license= confluent.topic.bootstrap.servers=localhost:9092 confluent.topic.replication.factor=1
Note
Ensure to replace the
$home
with your home directory path, or any other path where the credentials file was saved.Start the Spanner Sink connector by loading the connector’s configuration with the following command:
confluent local services connect connector load spanner --config spanner-sink.properties
Your output should resemble the following:
{ "name": "spanner", "config": { "topics": "products", "tasks.max": "1", "connector.class": "io.confluent.connect.gcp.spanner.SpannerSinkConnector", "gcp.spanner.credentials.path": "$home/spanner-test-credentials.json", "gcp.spanner.instance.id": "test-instance", "gcp.spanner.database.id": "example-db", "auto.create": "true", "table.name.format": "kafka_${topic}", "confluent.license": "", "confluent.topic.bootstrap.servers": "localhost:9092", "confluent.topic.replication.factor": "1", "name": "spanner" }, "tasks": [ { "connector": "spanner", "task": 0 } ], "type": "sink" }
Check the status of the connector to confirm that it is in a
RUNNING
state.confluent local services connect connector status spanner
Your output should resemble the following:
{ "name": "spanner", "connector": { "state": "RUNNING", "worker_id": "10.200.7.192:8083" }, "tasks": [ { "id": 0, "state": "RUNNING", "worker_id": "10.200.7.192:8083" } ], "type": "sink" }
Send data to Kafka¶
To produce some records into the
products
topic, first start a Kafka producer.kafka-avro-console-producer \ --broker-list localhost:9092 --topic products \ --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"name","type":"string"}, {"name":"price", "type": "float"}, {"name":"quantity", "type": "int"}]}'
The console producer is now waiting for input, so you can go ahead and insert some records into the topic.
{"name": "scissors", "price": 2.75, "quantity": 3} {"name": "tape", "price": 0.99, "quantity": 10} {"name": "notebooks", "price": 1.99, "quantity": 5}
Check Spanner database for data¶
To verify that the data has been written to Spanner, you can use the GCP console (https://console.cloud.google.com/spanner/instances/test-instance/databases).
Under databases you should see the kafka_products
table. Clicking on the
table and then DATA
will show the three rows the have just been inserted.
connect_topic__ |
connect_partition__ |
connect_offset__ |
quantity | price | name |
---|---|---|---|---|---|
products | 0 | 0 | 3 | 2.75 | scissors |
products | 0 | 1 | 10 | 0.99 | tape |
products | 0 | 2 | 5 | 1.99 | notebooks |
Deleting unnecessary resources¶
Delete the test instance.
- Click
test-instance
on the left sidebar. - Click Delete Instance on the top toolbar and type the instance name to verify deletion.
Delete the service account credentials used for the test.
- Open the IAM & Admin page in the GCP Console.
- Select your project and click Continue.
- In the left nav, click Service accounts.
- Locate the
test-service-account
and click the More button under Actions. - Click Delete and confirm deletion.
Troubleshooting Connector and Task Failures¶
You can use the Connect REST API
to check the status of the connectors and tasks. If a task or connector has
failed, the trace
field will include a reason and a stack trace.
Authorization failures¶
The Cloud Spanner connector must authenticate with a Spanner instance and establish a connection. If a connection fails because of authentication, the connector will stop immediately. These errors may require changes in your Google Cloud account which may include creating service account keys. Try to rerun your connector after you make the account changes. More information on service account keys, see Creating and managing service account credentials.
Spanner error codes¶
Whenever the connector or task fails, it captures a message that includes the Spanner error code and error message. The message may also include a failure reason or suggested fix. See the Spanner error codes for error code details.
Repeated connection failures¶
If the connector stops frequently because of connection timeouts, consider changing the following connector configuration properties and restarting the connector:
- The
request.timeout.ms
defaults to 6 seconds (6000 milliseconds) and dictates the maximum amount of time that the connector should wait for a single spanner operation to succeed. If this time is exceeded the connector will attempt to retry. - The
retry.timeout.ms
defaults to 1 minutes (60000 milliseconds) and specifies the maximum amount of time that the connector should continue to retry a failed request. The actual duration of the delay is random and it grows exponentially with each retry. If all retries take longer than this value, the connector task will fail. This does not apply to initial connection attempts, but it does apply to subsequent requests to reconnect. Only retry-able errors from spanner will be retried.
Quota failures¶
If the connector fails due to write limits being exceeded, consider changing the
max.batch.size
configuration property and restarting the connector. The
max.batch.size
defaults to 1000 and specifies the maximum amount of rows to
batch together for a Spanner operation (INSERT, UPDATE, UPSERT). If the row
sizes are large, consider lowering the max batch size.
Connector error mode¶
By default, the error.mode
for the connector is FAIL
. This means if
there is an error when writing a batch of records to Spanner the connector
fails. It may be convenient in some cases to set the error.mode
to WARN
or INFO
instead so that the connector keeps running even after a particular
batch fails.
Proxy settings¶
When the gcp.spanner.proxy.url
proxy settings are configured, the system
property variables (https.proxyHost
and https.proxyPort
) are set
globally for the entire JVM.
Enabling debug logging¶
The Connect worker log configuration controls how much detail is included in the logs. By default, the worker logs include enough detail to identify basic functionality. Enable DEBUG logs in the Connect worker’s log configuration to include more details. This change must be made on each worker and only takes affect upon worker startup. After you change the log configuration as outlined below on each Connect worker, restart all of the Connect workers. A rolling restart can be used if necessary.
Note
Trace level logging is verbose and contains many more details, and may be
useful to solve certain failures. Trace level logging is enabled like debug
level logging is enabled, except TRACE
is used instead of DEBUG
.
On-premises installation¶
For local or on-premises installations of Confluent Platform, the
etc/kafka/connect-log4j.properties
file defines the logging configuration of
the Connect worker process. To enable DEBUG on just the Spanner connector,
modify the etc/kafka/connect-log4j.properties
file to include the following
line:
log4j.logger.io.confluent.gcp.spanner=DEBUG
To enable DEBUG on all of the Connect worker’s code, including all
connectors, change the log4j.rootLogger=
line to use DEBUG
instead of
INFO
. For example, the default log configuration for Connect includes
this line:
log4j.rootLogger=INFO, stdout
Change this line to the following to enable DEBUG on all of the Connect worker code:
log4j.rootLogger=DEBUG, stdout
Note
This setting causes may generate a large amount of logs from
org.apache.kafka.clients
packages, which can be suppressed by setting
log4j.logger.org.apache.kafka.clients=ERROR
.