Important

You are viewing documentation for an older version of Confluent Platform. For the latest, click here.

Kafka Connect Google Cloud Spanner Sink Connector

The Cloud Spanner Sink Connector allows moving data from Kafka to a Google Cloud Spanner database. It writes data from a topic in Kafka to a table in the specified Spanner database. Auto-creation of tables and limited auto-evolution are also supported.

Prerequisites

The following are required to run the Kafka Connect GCP Spanner Sink Connector:

  • Confluent Platform 4.0.0 or above, or Kafka 1.0.0 or above
  • Java 1.8
  • At minimum, roles/spanner.databaseUser is required for this connector. See Access control for Cloud Spanner.

Limitations

  • Cloud Spanner has table size and query limitations that apply to the connector.
  • The performance limitations of instances apply to the connector.
  • The connector does not support creation of interleaved tables.
  • When auto.evolve is enabled, if a new column with a default value is added, that default value is only used for new records. Existing records will have <NULL> as the value for the new column.

Features

Data mapping

The sink connector requires knowledge of schemas, so you should use a suitable converter, like the Avro converter that comes with Schema Registry or the JSON converter with schemas enabled. Kafka record keys, if present, can be primitive types or a Connect struct, and the record value must be a Connect struct. You may need to implement a custom Converter if the data in the topic is not in a compatible format.

Data Types

Mapping from Kafka Record types to Spanner data types is detailed below.

Kafka Record Type Spanner Type
INT8, INT16, INT32, INT64 INT64
FLOAT32, FLOAT64 FLOAT64
BOOLEAN BOOL
STRING STRING(MAX)
BYTES BYTES(MAX)
DATE DATE
TIMESTAMP TIMESTAMP
ARRAY<INT8>, ARRAY<INT16>, ARRAY<INT32>, ARRAY<INT64> ARRAY<INT64>
ARRAY<FLOAT32>, ARRAY<FLOAT64> ARRAY<FLOAT64>
ARRAY<BOOLEAN> ARRAY<BOOL>
ARRAY<STRING> ARRAY<STRING(MAX)>
NOT SUPPORTED ARRAY<BYTES(MAX)>
NOT SUPPORTED ARRAY<DATE>
NOT SUPPORTED ARRAY<TIMESTAMP>
MAP NOT SUPPORTED
STRUCT NOT SUPPORTED

Auto-creation and Auto-evolution

If auto.create is enabled, the connector can create the destination table if it is found to be missing. The connector uses the record schema as a basis for the table definition, so the creation takes place online with records consumed from the topic.

If auto.evolve is enabled, the connector can perform limited auto-evolution when it encounters a record for which a column is found to be missing. Since data-type changes and removal of columns can be dangerous, the connector does not attempt to perform such evolutions on the table.

Important

For backward-compatible table schema evolution, new fields in record schemas must be optional or have a default value.

Install the Cloud Spanner Connector

You can install this connector by using the Confluent Hub client (recommended) or you can manually download the ZIP file.

confluent-hub install confluentinc/kafka-connect-gcp-spanner:latest

You can install a specific version by replacing latest with a version number. For example:

confluent-hub install confluentinc/kafka-connect-gcp-spanner:1.0.1

Install Connector Manually

Download and extract the ZIP file for your connector and then follow the manual connector installation instructions.

License

You can use this connector for a 30-day trial period without a license key.

After 30 days, this connector is available under a Confluent enterprise license. Confluent issues enterprise license keys to subscribers, along with providing enterprise-level support for Confluent Platform and your connectors. If you are a subscriber, please contact Confluent Support at support@confluent.io for more information.

See Confluent Platform license for license properties and License topic configuration for information about the license topic.

Quick Start

In this quick start, the Spanner sink connector is used to export data produced by the Avro console producer to a database in a Spanner instance.

Prerequisites

Cloud Spanner Prerequisites
Confluent Prerequisites

Setup Credentials

Create a service account and service account key under the GCP project.

  1. Open the IAM & Admin page in the GCP Console.
  2. Select your project and click Continue.
  3. In the left nav, click Service accounts.
  4. In the top toolbar, click Create Service Account.
  5. Enter the service account name and description; for example test-service-account.
  6. Click Create and on the next page select the role Cloud Spanner Database Admin under Cloud Spanner.
  7. On the next page click Create Key and download the JSON file.
  8. For this quickstart save the file under your $home directory and name it spanner-test-credentials.json.

For more information on service account keys, see Creating and managing service account keys.

Create a Spanner Instance and Database

  1. Create a test instance named test-instance in Spanner using the console. See the Cloud Spanner Quickstart for instructions.
  2. Create a database named example-db under the test-instance. See the Cloud Spanner Quickstart for instructions.

Install and Load the Connector

  1. Install the connector through the Confluent Hub Client.

    # run from your CP installation directory
    confluent-hub install confluentinc/kafka-connect-gcp-spanner:latest
    

    Tip

    By default, it will install the plugin into share/confluent-hub-components and add the directory to the plugin path.

  2. Adding a new connector plugin requires restarting Kafka Connect. Use the Confluent CLI to restart Connect.

    confluent stop connect && confluent start connect
    
  3. Configure your connector by adding the file spanner-sink.properties, with the following properties:

    name=SpannerSinkConnector
    topics=products
    tasks.max=1
    connector.class=io.confluent.connect.gcp.spanner.SpannerSinkConnector
    
    gcp.spanner.credentials.path=$home/spanner-test-credentials.json
    gcp.spanner.instance.id=test-instance
    gcp.spanner.database.id=example-db
    auto.create=true
    table.name.format=kafka_${topic}
    
    # The following define the Confluent license stored in Kafka, so we need the Kafka bootstrap addresses.
    # `replication.factor` may not be larger than the number of Kafka brokers in the destination cluster,
    # so here we set this to '1' for demonstration purposes. Always use at least '3' in production configurations.
    confluent.license=
    confluent.topic.bootstrap.servers=localhost:9092
    confluent.topic.replication.factor=1
    

    Note

    Make sure to replace the $home with your home directory path, or any other path where the credentials file was saved.

  4. Start the Spanner sink connector by loading the connector’s configuration with the following command:

    confluent load spanner​ -d spanner-sink.properties
    

    Your output should resemble the following:

    {
      "name": "spanner",
      "config": {
        "topics": "products",
        "tasks.max": "1",
        "connector.class": "io.confluent.connect.gcp.spanner.SpannerSinkConnector",
        "gcp.spanner.credentials.path": "$home/spanner-test-credentials.json",
        "gcp.spanner.instance.id": "test-instance",
        "gcp.spanner.database.id": "example-db",
        "auto.create": "true",
        "table.name.format": "kafka_${topic}",
        "confluent.license": "",
        "confluent.topic.bootstrap.servers": "localhost:9092",
        "confluent.topic.replication.factor": "1",
        "name": "spanner"
      },
      "tasks": [
        {
          "connector": "spanner",
          "task": 0
        }
      ],
      "type": "sink"
    }
    
  5. Check the status of the connector to confirm that it is in a RUNNING state.

    confluent status spanner
    

    Your output should resemble the following:

    {
      "name": "spanner",
      "connector": {
        "state": "RUNNING",
        "worker_id": "10.200.7.192:8083"
      },
      "tasks": [
        {
          "id": 0,
          "state": "RUNNING",
          "worker_id": "10.200.7.192:8083"
        }
      ],
      "type": "sink"
    }
    

Send Data to Kafka

  1. To produce some records into the products topic, first start a Kafka producer.

    kafka-avro-console-producer \
    --broker-list localhost:9092 --topic products \
    --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"name","type":"string"},
    {"name":"price", "type": "float"}, {"name":"quantity", "type": "int"}]}'
    
  2. The console producer is now waiting for input, so you can go ahead and insert some records into the topic.

    {"name": "scissors", "price": 2.75, "quantity": 3}
    {"name": "tape", "price": 0.99, "quantity": 10}
    {"name": "notebooks", "price": 1.99, "quantity": 5}
    

Check Spanner Database for Data

To verify that the data has been written to Spanner, you can use the GCP console (https://console.cloud.google.com/spanner/instances/test-instance/databases).

Under databases you should see the kafka_products table. Clicking on the table and then DATA will show the three rows the have just been inserted.

connect_topic__ connect_partition__ connect_offset__ quantity price name
products 0 0 3 2.75 scissors
products 0 1 10 0.99 tape
products 0 2 5 1.99 notebooks

Deleting Unnecessary Resources

Delete the test instance.

  1. Click test-instance on the left sidebar.
  2. Click Delete Instance on the top toolbar and type the instance name to verify deletion.

Delete the service account credentials used for the test.

  1. Open the IAM & Admin page in the GCP Console.
  2. Select your project and click Continue.
  3. In the left nav, click Service accounts.
  4. Locate the test-service-account and click the More button under actions.
  5. Click Delete and confirm deletion.

Troubleshooting Connector and Task Failures

You can use the Connect REST API to check the status of the connectors and tasks. If a task or connector has failed, the trace field will include a reason and a stack trace.

Authorization Failures

The Cloud Spanner connector must authenticate with a Spanner instance and establish a connection. If a connection fails because of authentication, the connector will stop immediately. These errors may require changes in your Google Cloud account which may include creating service account keys. Try to rerun your connector after you make the account changes. More information on service account keys, see Creating and managing service account credentials.

Spanner Error Codes

Whenever the connector or task fails, it captures a message that includes the Spanner error code and error message. The message may also include a failure reason or suggested fix. See the Spanner error codes for error code details.

Repeated Connection Failures

If the connector stops frequently because of connection timeouts, consider changing the following connector configuration properties and restarting the connector:

  • The ​request.timeout.ms defaults to 6 seconds (6000 milliseconds) and dictates the maximum amount of time that the connector should wait for a single spanner operation to succeed. If this time is exceeded the connector will attempt to retry.
  • The ​retry.timeout.ms defaults to 1 minutes (60000 milliseconds) and specifies the maximum amount of time that the connector should continue to retry a failed request. The actual duration of the delay is random and it grows exponentially with each retry. If all retries take longer than this value, the connector task will fail. This does not apply to initial connection attempts, but it does apply to subsequent requests to reconnect. Only retry-able errors from spanner will be retried.

Quota Failures

If the connector fails due to write limits being exceeded, consider changing the max.batch.size configuration property and restarting the connector. The max.batch.size defaults to 1000 and specifies the maximum amount of rows to batch together for a Spanner operation (INSERT, UPDATE, UPSERT). If the row sizes are large, consider lowering the max batch size.

Connector Error Mode

By default, the error.mode for the connector is FAIL. This means if there is an error when writing a batch of records to Spanner the connector fails. It may be convenient in some cases to set the error.mode to WARN or INFO instead so that the connector keeps running even after a particular batch fails.

Proxy Settings

Note the following about proxy settings.

Note

When the gcp.spanner.proxy.url proxy settings are configured, the system property variables (https.proxyHost and https.proxyPort) are set globally for the entire JVM.

Enabling Debug Logging

The Connect worker log configuration controls how much detail is included in the logs. By default, the worker logs include enough detail to identify basic functionality. Enable DEBUG logs in the Connect worker’s log configuration to include more details. This change must be made on each worker and only takes affect upon worker startup. After you change the log configuration as outlined below on each Connect worker, restart all of the Connect workers. A rolling restart can be used if necessary.

Note

Trace level logging is verbose and contains many more details, and may be useful to solve certain failures. Trace level logging is enabled like debug level logging is enabled, except TRACE is used instead of DEBUG.

On-Premise Installation

For local or on-premise installations of Confluent Platform, the etc/kafka/connect-log4j.properties file defines the logging configuration of the Connect worker process. To enable DEBUG on just the Spanner connector, modify the etc/kafka/connect-log4j.properties file to include the following line:

:name: connect-log4j.properties

log4j.logger.io.confluent.gcp.spanner=DEBUG

To enable DEBUG on all of the Connect worker’s code, including all connectors, change the log4j.rootLogger= line to use DEBUG instead of INFO. For example, the default log configuration for Connect includes this line:

:name: connect-log4j.properties

log4j.rootLogger=INFO, stdout

Change this line to the following to enable DEBUG on all of the Connect worker code:

:name: connect-log4j.properties

log4j.rootLogger=DEBUG, stdout

Note

This setting causes may generate a large amount of logs from org.apache.kafka.clients packages, which can be suppressed by setting log4j.logger.org.apache.kafka.clients=ERROR.

Additional Documentation