Generalized Google Cloud Storage Source Connector for Confluent Platform

The Generalized Kafka Connect Google Cloud Storage (GCS) Source connector can read data from any type of file naming convention listed under a GCS bucket, and the filenames don’t have to be in a specific format. As long as the files are in any of the supported formats (for example, JSON, Avro and Byte Array) the connector will be able to read them.

Features

The Generalized GCS Source connector includes the following features:

At least once delivery

In the event of a task failure the connector guarantees no messages are lost, although the last few messages may be processed again.

Multiple tasks

The Generalized GCS Source connector supports running one or more tasks. You can specify the number of tasks in the tasks.max configuration parameter. This can lead to performance gains when multiple files need to be parsed.

Limitations

The Generalized GCS Source connector has the following limitations:

  • The connector won’t reload data during the following scenarios:
    • Renaming a file which the connector has already read.
    • Uploading a newer version of a file with a new record.

You should also be aware of the following connector actions:

  • The connector ignores any GCS object with a name that does not start with the configured topics.dir directory. This name is topics/ by default.
  • The connector uses the connector name to store offsets on how much of the bucket it has processed. Deleting a connector and using the same name will not cause the connector to reprocess from the beginning but will save the progress of the original connector unless the corresponding entry in the offset topic is cleared.

Install the Generalized GCS Source Connector

You can install this connector by using the confluent connect plugin install command, or by manually downloading the ZIP file.

Prerequisites

  • You must install the connector on every machine where Connect will run.

  • Kafka Broker: Confluent Platform 3.3.0 or later, or Kafka 0.11.0 or later.

  • Connect: Confluent Platform 4.0.0 or later, or Kafka 1.0.0 or later.

  • Java 1.8.

  • An installation of the latest (latest) connector version.

    To install the latest connector version, navigate to your Confluent Platform installation directory and run the following command:

    confluent connect plugin install confluentinc/kafka-connect-gcs-source:latest
    

    You can install a specific version by replacing latest with a version number as shown in the following example:

    confluent connect plugin install confluentinc/kafka-connect-gcs-source:2.6.0
    

Permissions

The following permissions are required for the Generalized GCS Source connector:

  • storage.buckets.get
  • storage.objects.get
  • storage.objects.list

Or, you may grant the Service Account the following roles on a bucket:

  • Storage Object Viewer
  • Storage Legacy Bucket Reader

Install the connector manually

Download and extract the ZIP file for your connector and then follow the manual connector installation instructions.

License

You can use this connector for a 30-day trial period without a license key.

After 30 days, you must purchase a connector subscription which includes Confluent enterprise license keys to subscribers, along with enterprise-level support for Confluent Platform and your connectors. If you are a subscriber, you can contact Confluent Support at support@confluent.io for more information.

See Confluent Platform license for license properties and Confluent License Properties for information about the license topic.

Configuration Properties

For a complete list of configuration properties for this connector, see Configuration Reference for Google Cloud Storage Source Connector for Confluent Platform.

For an example of how to get Kafka Connect connected to Confluent Cloud, see Connect Self-Managed Kafka Connect to Confluent Cloud.

Quick Start

In the following example, the GCS Source connector reads all data listed under a specific GCS bucket and then loads them into a Kafka topic. It does not matter what file naming convention you use for writing data to the GCS bucket.

  1. Upload the following data under a folder name quickstart within the targeted GCS bucket. In this example JSON format, which supports line-delimited JSON, concatenated JSON, and a JSON array of records, is used.

    {"f1": "value1"}
    {"f1": "value2"}
    {"f1": "value3"}
    {"f1": "value4"}
    {"f1": "value5"}
    {"f1": "value6"}
    {"f1": "value7"}
    {"f1": "value8"}
    {"f1": "value9"}
    
  2. Install the connector through the Confluent Hub Client by running the following command from your Confluent Platform installation directory:

    confluent connect plugin install confluentinc/kafka-connect-gcs-source:latest
    

    Tip

    By default, the connector will install the plugin into the share/confluent-hub-components directory and add the directory to the plugin path. For the plugin path change to take effect, you must restart the Connect worker.

  3. Create a quickstart-gcssource-generalized.properties file with the following contents:

    {
       "name": "quickstart-gcs-source",
       "config": {
       "connector.class": "io.confluent.connect.gcs.GcsSourceConnector",
       "tasks.max": "1",
       "value.converter": "org.apache.kafka.connect.json.JsonConverter",
       "mode": "GENERIC",
       "topics.dir": "quickstart",
       "topic.regex.list": "quick-start-topic:.*",
       "format.class": "io.confluent.connect.gcs.format.json.JsonFormat",
       "gcs.bucket.name": "<bucket-name>",
       "gcs.credentials.path": "</full/path/to/credentials/keys.json>",
       "value.converter.schemas.enable": "false"
      }
    }
    
  4. Load the Generalized GCS Source connector by running the following command:

    confluent local services connect connector load quickstart-gcs-source --config quickstart-gcssource-generalized.properties
    
  5. Verify the connector is in a RUNNING state:

    confluent local services connect connector status quickstart-gcs-source
    
  6. Verify the messages are being sent to Kafka:

    kafka-console-consumer \
       --bootstrap-server localhost:9092 \
       --topic quick-start-topic \
       --from-beginning
    
  7. You should see output similar to the following:

    {"f1": "value1"}
    {"f1": "value2"}
    {"f1": "value3"}
    {"f1": "value4"}
    {"f1": "value5"}
    {"f1": "value6"}
    {"f1": "value7"}
    {"f1": "value8"}
    {"f1": "value9"}
    

Property-based example

  1. Create a gcs-source-connector.properties file with the following contents. This file is included with the connector in etc/kafka-connect-gcs/gcs-source-connector.properties. This configuration is used typically along with standalone workers.:

    {
       "name" : "GCSSourceConnector",
       "config" : {
         "format.class": "io.confluent.connect.gcs.format.avro.AvroFormat",
         "connector.class" : "io.confluent.connect.gcs.GcsSourceConnector",
         "gcs.bucket.name" : "<bucket-name>",
         "gcs.credentials.path" : "</full/path/to/credentials/keys.json>",
         "tasks.max" : "1",
         "confluent.topic.bootstrap.servers" : "localhost:9092",
         "confluent.topic.replication.factor" : "1",
         "confluent.license" : ""
       }
     }
    

    Tip

    The following define the Confluent license stored in Kafka, so you need the Kafka bootstrap addresses. replication.factor may not be larger than the number of Kafka brokers in the destination cluster, so you set this to ‘1’ for demonstration purposes. Always use at least ‘3’ in production configurations.

  2. Edit the gcs-source-connector.properties file to add the following properties:

    {
       "transforms" : "AddPrefix",
       "transforms.AddPrefix.type" : "org.apache.kafka.connect.transforms.RegexRouter",
       "transforms.AddPrefix.regex" : ".*",
       "transforms.AddPrefix.replacement" : "copy_of_$0"
    }
    

    Adding the previous properties renames the output topic of the messages to copy_of_gcs_topic which prevents a continuous feedback loop of messages.

  3. Load the Generalized GCS Source connector.

    Caution

    You must include a double dash (--) between the connector name and your flag. For more information, see this post.

    confluent local services connect connector load gcs-source --config gcs-source-connector.properties
    

    Important

    Don’t use the local CLI commands in a production environment. The Confluent CLI is used in production environments. See confluent local. for more information about local CLI commands.

  4. Verify the connector is in a RUNNING state.

    confluent local services connect connector status gcs-source
    
  5. Verify messages are being sent to Kafka.

    kafka-avro-console-consumer \
        --bootstrap-server localhost:9092 \
        --property schema.registry.url=http://localhost:8081 \
        --topic copy_of_gcs_topic \
        --from-beginning | jq '.'
    
  6. The response should be 9 records as shown in the following example:

    {"f1": "value1"}
    {"f1": "value2"}
    {"f1": "value3"}
    {"f1": "value4"}
    {"f1": "value5"}
    {"f1": "value6"}
    {"f1": "value7"}
    {"f1": "value8"}
    {"f1": "value9"}
    

REST-based example

  1. Use this setting with distributed workers. Write the following JSON, which can be used to read all the data list directly under a GCS bucket, to config.json, configure all of the required values, and use the following command to post the configuration to one of the distributed connect workers. Check here for more information about the Kafka Connect REST API.

    {
       "name": "gcs-source-generalized",
       "config": {
       "connector.class": "io.confluent.connect.gcs.GcsSourceConnector",
       "tasks.max": "1",
       "value.converter": "org.apache.kafka.connect.json.JsonConverter",
       "mode": "GENERIC",
       "topics.dir": " ",
       "topic.regex.list": "mytopic:.",
       "format.class": "io.confluent.connect.gcs.format.json.JsonFormat",
       "gcs.bucket.name": "<bucket-name>",
       "gcs.credentials.path": "</full/path/to/credentials/keys.json>",
       "value.converter.schemas.enable": "false",
       "confluent.topic.bootstrap.servers" : "localhost:9092",
       "confluent.topic.replication.factor" : "1",
       "confluent.license" : " Omit to enable trial mode ",
       "transforms" : "AddPrefix",
       "transforms.AddPrefix.type" : "org.apache.kafka.connect.transforms.RegexRouter",
       "transforms.AddPrefix.regex" : ".",
       "transforms.AddPrefix.replacement" : "copy_of_$0"
      }
    }
    

    Note

    Change the confluent.topic.bootstrap.servers property to include your broker address(es), and change the confluent.topic.replication.factor to 3 for staging or production use.

  2. Use curl to post a configuration to one of the Kafka Connect Workers. Change http://localhost:8083/ to the endpoint of one of your Kafka Connect worker(s).

    curl -s -X POST -H 'Content-Type: application/json' --data @config.json http://localhost:8083/connectors
    
  3. Use the following command to update the configuration for an existing connector.

    curl -s -X PUT -H 'Content-Type: application/json' --data @config.json http://localhost:8083/connectors/GCSSourceConnector/config
    
  4. To consume records written by the connector to the configured Kafka topic, run the following command:

    kafka-avro-console-consumer --bootstrap-server localhost:9092 --property schema.registry.url=http://localhost:8081  --topic copy_of_gcs_topic --from-beginning
    

Generalized GCS Source Connector Data Formats

The Generalized GCS Source connector supports several data formats:

  • Avro: For supporting Avro format. To use Avro format, you must set the format.class property to io.confluent.connect.gcs.format.avro.AvroFormat.
  • JSON: For supporting JSON format. To use JSON format, you must set the format.class property to io.confluent.connect.gcs.format.json.JsonFormat.
  • Raw Bytes: For supporting Raw Bytes format. To use the Raw Bytes format, you must set the format.class property to io.confluent.connect.gcs.format.bytearray.ByteArrayFormat.

Custom Credentials Provider

You can configure the GCS connector to use a custom credentials provider, instead of the default one provided by the GCS connector. To do this, you implement a custom credentials provider, build it as a JAR file, and deploy the JAR file to use the custom provider.

Complete the following steps to use a custom credentials provider:

  1. Set a custom credentials provider class: Set the gcs.credentials.provider.class property to a class that implements the com.google.api.gax.core.CredentialsProvider interface. Configure the class to the fully qualified name of your custom credentials provider class.
  2. Configure additional settings (Optional): For additional configuration, prefix the configuration keys with gcs.credentials.provider. If your custom credentials provider needs to accept additional configuration, implement the org.apache.kafka.common.Configurable interface that lets the connector receive configurations that are prefixed with gcs.credentials.provider..
  3. Ensure a public no-args constructor: Your custom credentials provider class must have a public no-argument constructor. This is necessary because the connector creates an instance of the provider using this constructor.
  4. Package your provider: Once your custom credentials provider class is implemented, package it into a JAR file.
  5. Copy the JAR file to Connect Worker: Copy the built JAR file to the share/java/kafka-connect-gcs directory on all Connect workers. This step ensures that the GCS connector can access and use your custom credentials provider.

Troubleshooting Connector and Task Failures

Stack Trace

You can use the Connect REST API to check the status of the connectors and tasks. If a task or connector has failed, the trace field will include a reason and a stack trace.