Google Cloud Spanner Sink Connector for Confluent Platform

The Google Cloud Spanner Sink connector moves data from Apache Kafka® to a Google Cloud Spanner database. It writes data from a topic in Kafka to a table in the specified Spanner database. Table auto-creation and limited auto-evolution are also supported.

Features

At least once delivery

This connector guarantees that records from the Kafka topic are delivered at least once.

Dead Letter Queue

This connector supports the Dead Letter Queue (DLQ) functionality. For information about accessing and using the DLQ, see Confluent Platform Dead Letter Queue.

Multiple tasks

The Google Cloud Spanner Sink connector supports running one or more tasks. You can specify the number of tasks in the tasks.max configuration parameter. This can lead to performance gains when multiple files need to be parsed.

Data mapping

The sink connector requires knowledge of schemas, so you should use a suitable converter, like the Avro converter that comes with Schema Registry or the JSON converter with schemas enabled. Kafka record keys, if present, can be primitive types or a Connect struct, and the record value must be a Connect struct. You may need to implement a custom Converter if the data in the topic is not in a compatible format.

Data types

Mapping from Kafka record types to Spanner data types is detailed below.

Kafka Record Type Spanner Type
INT8, INT16, INT32, INT64 INT64
FLOAT32, FLOAT64 FLOAT64
BOOLEAN BOOL
STRING STRING(MAX)
BYTES BYTES(MAX)
DATE DATE
TIMESTAMP TIMESTAMP
ARRAY<INT8>, ARRAY<INT16>, ARRAY<INT32>, ARRAY<INT64> ARRAY<INT64>
ARRAY<FLOAT32>, ARRAY<FLOAT64> ARRAY<FLOAT64>
ARRAY<BOOLEAN> ARRAY<BOOL>
ARRAY<STRING> ARRAY<STRING(MAX)>
NOT SUPPORTED ARRAY<BYTES(MAX)>
NOT SUPPORTED ARRAY<DATE>
NOT SUPPORTED ARRAY<TIMESTAMP>
MAP NOT SUPPORTED
STRUCT NOT SUPPORTED

Auto-creation and auto-evolution

If auto.create is enabled, the connector can create the destination table if it is found to be missing. The connector uses the record schema as a basis for the table definition, so the creation takes place online with records consumed from the topic.

If auto.evolve is enabled, the connector can perform limited auto-evolution when it encounters a record for which a column is found to be missing. Since data-type changes and removal of columns can be dangerous, the connector does not attempt to perform such evolutions on the table.

Important

For backward-compatible table schema evolution, new fields in record schemas must be optional or have a default value.

Limitations

  • The connector does not support PostgreSQL dialect.
  • Cloud Spanner has table size and query limitations that apply to the connector.
  • The performance limitations of instances apply to the connector.
  • The connector does not support creation of interleaved tables.
  • When auto.evolve is enabled, if a new column with a default value is added, that default value is only used for new records. Existing records will have <NULL> as the value for the new column.
  • The connector does not currently support Single Message Transformations (SMTs) that modify the topic name. Additionally, the following transformations are not allowed:
    • io.debezium.transforms.ByLogicalTableRouter
    • io.debezium.transforms.outbox.EventRouter
    • org.apache.kafka.connect.transforms.RegexRouter
    • org.apache.kafka.connect.transforms.TimestampRouter
    • io.confluent.connect.transforms.MessageTimestampRouter
    • io.confluent.connect.transforms.ExtractTopic$Key
    • io.confluent.connect.transforms.ExtractTopic$Value

License

You can use this connector for a 30-day trial period without a license key.

After 30 days, you must purchase a connector subscription which includes Confluent enterprise license keys to subscribers, along with enterprise-level support for Confluent Platform and your connectors. If you are a subscriber, you can contact Confluent Support at support@confluent.io for more information.

See Confluent Platform license for license properties and Confluent License Properties for information about the license topic.

Configuration Properties

For a complete list of configuration properties for this connector, see Configuration Reference for Google Cloud Spanner Sink Connector for Confluent Platform.

For an example of how to get Kafka Connect connected to Confluent Cloud, see Connect Self-Managed Kafka Connect to Confluent Cloud.

Install the Cloud Spanner Connector

You can install this connector by using the confluent connect plugin install command, or by manually downloading the ZIP file.

Prerequisites

  • You must install the connector on every machine where Connect will run.

  • Confluent Platform 4.0.0 or later, or Kafka 1.0.0 or later. For installation help, see Confluent Platform.

  • Confluent CLI (requires separate installation).

  • Google Cloud Platform (GCP) Account.

  • A GCP project and billing enabled. For instructions, see the Cloud Spanner Quickstart.

  • Java 1.8.

  • At minimum, roles/spanner.databaseUser is required for this connector. For details, see Access control for Cloud Spanner.

  • An installation of the latest (latest) connector version.

    To install the latest connector version, navigate to your Confluent Platform installation directory and run the following command:

    confluent connect plugin install confluentinc/kafka-connect-gcp-spanner:latest
    

    You can install a specific version by replacing latest with a version number as shown in the following example:

    confluent connect plugin install confluentinc/kafka-connect-gcp-spanner:1.0.8
    

Install the connector manually

Download and extract the ZIP file for your connector and then follow the manual connector installation instructions.

Quick Start

In this quick start, the Spanner sink connector is used to export data produced by the Avro console producer to a database in a Spanner instance.

Setup credentials

Create a service account and service account key under the GCP project.

  1. Open the IAM & Admin page in the GCP Console.
  2. Select your project and click Continue.
  3. In the left nav, click Service accounts.
  4. In the top toolbar, click Create Service Account.
  5. Enter the service account name and description; for example test-service-account.
  6. Click Create and on the next page select the role Cloud Spanner Database Admin under Cloud Spanner.
  7. On the next page click Create Key and download the JSON file.
  8. For this quick start, save the file under your $home directory and name it spanner-test-credentials.json.

For more information on service account keys, see Creating and managing service account keys.

Create a Spanner Instance and Database

  1. Create a test instance named test-instance in Spanner using the console. See the Cloud Spanner Quickstart for instructions.
  2. Create a database named example-db under the test-instance. See the Cloud Spanner Quickstart for instructions.

Install and load the connector

  1. Install the connector using the following CLI command:

    # run from your CP installation directory
    confluent connect plugin install confluentinc/kafka-connect-gcp-spanner:latest
    
  2. Adding a new connector plugin requires restarting Kafka Connect. Use the following command to restart Connect.

    confluent local services connect stop && confluent local services connect start
    
  3. Configure your connector by adding the file spanner-sink.properties, with the following properties:

    name=SpannerSinkConnector
    topics=products
    tasks.max=1
    connector.class=io.confluent.connect.gcp.spanner.SpannerSinkConnector
    
    gcp.spanner.credentials.path=$home/spanner-test-credentials.json
    gcp.spanner.instance.id=test-instance
    gcp.spanner.database.id=example-db
    auto.create=true
    table.name.format=kafka_${topic}
    
    # The following define the Confluent license stored in Kafka, so we need the Kafka bootstrap addresses.
    # `replication.factor` may not be larger than the number of Kafka brokers in the destination cluster,
    # so here we set this to '1' for demonstration purposes. Always use at least '3' in production configurations.
    confluent.license=
    confluent.topic.bootstrap.servers=localhost:9092
    confluent.topic.replication.factor=1
    

    Note

    Ensure to replace the $home with your home directory path, or any other path where the credentials file was saved.

  4. Start the Spanner Sink connector by loading the connector’s configuration with the following command:

    confluent local services connect connector load spanner --config spanner-sink.properties
    

    Your output should resemble the following:

    {
      "name": "spanner",
      "config": {
        "topics": "products",
        "tasks.max": "1",
        "connector.class": "io.confluent.connect.gcp.spanner.SpannerSinkConnector",
        "gcp.spanner.credentials.path": "$home/spanner-test-credentials.json",
        "gcp.spanner.instance.id": "test-instance",
        "gcp.spanner.database.id": "example-db",
        "auto.create": "true",
        "table.name.format": "kafka_${topic}",
        "confluent.license": "",
        "confluent.topic.bootstrap.servers": "localhost:9092",
        "confluent.topic.replication.factor": "1",
        "name": "spanner"
      },
      "tasks": [
        {
          "connector": "spanner",
          "task": 0
        }
      ],
      "type": "sink"
    }
    
  5. Check the status of the connector to confirm that it is in a RUNNING state.

    confluent local services connect connector status spanner
    

    Your output should resemble the following:

    {
      "name": "spanner",
      "connector": {
        "state": "RUNNING",
        "worker_id": "10.200.7.192:8083"
      },
      "tasks": [
        {
          "id": 0,
          "state": "RUNNING",
          "worker_id": "10.200.7.192:8083"
        }
      ],
      "type": "sink"
    }
    

Send data to Kafka

  1. To produce some records into the products topic, first start a Kafka producer.

    kafka-avro-console-producer \
    --broker-list localhost:9092 --topic products \
    --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"name","type":"string"},
    {"name":"price", "type": "float"}, {"name":"quantity", "type": "int"}]}'
    
  2. The console producer is now waiting for input, so you can go ahead and insert some records into the topic.

    {"name": "scissors", "price": 2.75, "quantity": 3}
    {"name": "tape", "price": 0.99, "quantity": 10}
    {"name": "notebooks", "price": 1.99, "quantity": 5}
    

Check Spanner database for data

To verify that the data has been written to Spanner, you can use the GCP console (https://console.cloud.google.com/spanner/instances/test-instance/databases).

Under databases you should see the kafka_products table. Clicking on the table and then DATA will show the three rows the have just been inserted.

connect_topic__ connect_partition__ connect_offset__ quantity price name
products 0 0 3 2.75 scissors
products 0 1 10 0.99 tape
products 0 2 5 1.99 notebooks

Deleting unnecessary resources

Delete the test instance.

  1. Click test-instance on the left sidebar.
  2. Click Delete Instance on the top toolbar and type the instance name to verify deletion.

Delete the service account credentials used for the test.

  1. Open the IAM & Admin page in the GCP Console.
  2. Select your project and click Continue.
  3. In the left nav, click Service accounts.
  4. Locate the test-service-account and click the More button under Actions.
  5. Click Delete and confirm deletion.

Troubleshooting Connector and Task Failures

You can use the Connect REST API to check the status of the connectors and tasks. If a task or connector has failed, the trace field will include a reason and a stack trace.

Authorization failures

The Cloud Spanner connector must authenticate with a Spanner instance and establish a connection. If a connection fails because of authentication, the connector will stop immediately. These errors may require changes in your Google Cloud account which may include creating service account keys. Try to rerun your connector after you make the account changes. More information on service account keys, see Creating and managing service account credentials.

Spanner error codes

Whenever the connector or task fails, it captures a message that includes the Spanner error code and error message. The message may also include a failure reason or suggested fix. See the Spanner error codes for error code details.

Repeated connection failures

If the connector stops frequently because of connection timeouts, consider changing the following connector configuration properties and restarting the connector:

  • The ​request.timeout.ms defaults to 6 seconds (6000 milliseconds) and dictates the maximum amount of time that the connector should wait for a single spanner operation to succeed. If this time is exceeded the connector will attempt to retry.
  • The ​retry.timeout.ms defaults to 1 minutes (60000 milliseconds) and specifies the maximum amount of time that the connector should continue to retry a failed request. The actual duration of the delay is random and it grows exponentially with each retry. If all retries take longer than this value, the connector task will fail. This does not apply to initial connection attempts, but it does apply to subsequent requests to reconnect. Only retry-able errors from spanner will be retried.

Quota failures

If the connector fails due to write limits being exceeded, consider changing the max.batch.size configuration property and restarting the connector. The max.batch.size defaults to 1000 and specifies the maximum amount of rows to batch together for a Spanner operation (INSERT, UPDATE, UPSERT). If the row sizes are large, consider lowering the max batch size.

Connector error mode

By default, the error.mode for the connector is FAIL. This means if there is an error when writing a batch of records to Spanner the connector fails. It may be convenient in some cases to set the error.mode to WARN or INFO instead so that the connector keeps running even after a particular batch fails.

Proxy settings

When the gcp.spanner.proxy.url proxy settings are configured, the system property variables (https.proxyHost and https.proxyPort) are set globally for the entire JVM.

Enabling debug logging

The Connect worker log configuration controls how much detail is included in the logs. By default, the worker logs include enough detail to identify basic functionality. Enable DEBUG logs in the Connect worker’s log configuration to include more details. This change must be made on each worker and only takes affect upon worker startup. After you change the log configuration as outlined below on each Connect worker, restart all of the Connect workers. A rolling restart can be used if necessary.

Note

Trace level logging is verbose and contains many more details, and may be useful to solve certain failures. Trace level logging is enabled like debug level logging is enabled, except TRACE is used instead of DEBUG.

On-premises installation

For local or on-premises installations of Confluent Platform, the etc/kafka/connect-log4j.properties file defines the logging configuration of the Connect worker process. To enable DEBUG on just the Spanner connector, modify the etc/kafka/connect-log4j.properties file to include the following line:

log4j.logger.io.confluent.gcp.spanner=DEBUG

To enable DEBUG on all of the Connect worker’s code, including all connectors, change the log4j.rootLogger= line to use DEBUG instead of INFO. For example, the default log configuration for Connect includes this line:

log4j.rootLogger=INFO, stdout

Change this line to the following to enable DEBUG on all of the Connect worker code:

log4j.rootLogger=DEBUG, stdout

Note

This setting causes may generate a large amount of logs from org.apache.kafka.clients packages, which can be suppressed by setting log4j.logger.org.apache.kafka.clients=ERROR.