Google Cloud Storage (GCS) Source Connector for Confluent Platform

The Kafka Connect GCS Source Connector provides the capability to read data exported to GCS by the Kafka Connect GCS Sink connector and publish it back to a Kafka topic. Depending on the format and partitioner used to write the data to GCS, this connector can write to the destination topic using the same partitions as the original messages exported to GCS and maintain the same message order. The connector selects folders based on the partitioner configuration and reads each folders GCS objects in alphabetical order. Each record is read based on the format selected. Configuration is designed to mirror the Kafka Connect GCS Sink connector and should be possible to create source connector configs with only minor changes to the original sink configuration.

Important

The recommended practice is to create topics manually in the destination Kafka cluster with the correct number of partitions before running the source connector. If the topics do not exist, Connect relies on Auto Topic Creation and the number of partitions are based upon the Kafka broker defaults. If there are more partitions in the destination cluster, the extra partitions are not used. If there are fewer partitions in the destination cluster, the connector task throws an exception and stops the moment it tries to write to a Kafka partition that does not exist.

Be aware of the following connector actions:

  • The connector ignores any GCS object with a name that does not start with the configured topics directory. This name is /topics/ by default.
  • The connector ignores any GCS object that is below the topics directory but has an extension that does not match the configured format. For example, a JSON file is ignored when format.class is set for Avro files.
  • The connector stops and fails if the GCS object’s name does not match the expected format or is in an unexpected location.
  • The connector ignores any GCS object that is below the topics directory but has a partitioner structure that does not match the configured class. For example, DefaultPartitioner structure is ignored when partitioner.class is set for TimeBasedPartitioner.

Avoid the following configuration issues:

  • A file with the correct extension and a valid name format e.g. <topic>+<partition>+<offset>.<extension>, placed in a folder of a different topic will be read normally and written to whatever topic as defined by its filename.
  • If a field partitioner is incorrectly configured to match the expected folder, it can break the ordering guarantees of GCS sink that used a deterministic sink partitioner.

Features

The GCS Source Connector offers a variety of features:

  • Pluggable Data Format with or without Schema: Out of the box, the connector supports reading data from GCS in Avro and JSON format. Besides records with schema, the connector supports importing plain JSON records without schema in text files, one record per line. In general, the connector may accept any format that provides an implementation of the Format interface.
  • At Least Once Delivery: In the event of a task failure the connector guarantees no messages are lost, although the last few messages may be processed again.
  • Matching Source Partitioning: Messages will be put back on to the same Kafka partition for that topic when it was written.
  • Source Partition Ordering: The connector will read records back in time order in each topic-source partition if the DefaultPartitioner or a TimeBasedPartitioner is used. If a FieldPartitioner is used it isn’t possible to guarantee the order of these messages.
  • Pluggable Partitioner: The connector comes out of the box with partitioners that support default partitioning based on Kafka partitions, field partitioning, and time-based partitioning in days or hours. You may implement your own partitioners by extending the Partitioner class. Additionally, you can customize time based partitioning by extending the TimeBasedPartitioner class.

Important

All partitioners will notice new topic folders with the inbuilt task reconfiguration thread. The DefaultPartitioner will detect new partition folders. The FieldPartitioner will notice new folders for the fields specified. However, the TimeBasedPartititoner will not currently detect new files for a new time period.

Important

Be careful when both the Connect GCS sink connector and the GCS Source Connector use the same Kafka cluster, since this results in the source connector writing to the same topic being consumed by the sink connector. This causes a continuous feedback loop that creates an ever-increasing number of duplicate Kafka records and GCS objects. It is possible to avoid this feedback loop by writing to a different topic than the one being consumed by the sink connector. Use the RegexRouter with the source connector to change the names of the topics where the records are written. Or, use the Extract Topic SMT with the source connector to change the topic name based upon a field in each message.

Prerequisites

The following are required to run the Kafka Connect GCS Source Connector:

  • Kafka Broker: Confluent Platform 3.3.0 or above, or Kafka 0.11.0 or above
  • Connect: Confluent Platform 4.0.0 or above, or Kafka 1.0.0 or above
  • Java 1.8

Install the GCS Source Connector

You can install this connector by using the Confluent Hub client (recommended) or you can manually download the ZIP file.

Install the connector using Confluent Hub

Prerequisite
Confluent Hub Client must be installed. This is installed by default with Confluent Enterprise.

Navigate to your Confluent Platform installation directory and run the following command to install the latest (latest) connector version. The connector must be installed on every machine where Connect will run.

confluent-hub install confluentinc/kafka-connect-gcs-source:latest

You can install a specific version by replacing latest with a version number. For example:

confluent-hub install confluentinc/kafka-connect-gcs-source:1.0.0-preview

Install the connector manually

Download and extract the ZIP file for your connector and then follow the manual connector installation instructions.

License

You can use this connector for a 30-day trial period without a license key.

After 30 days, this connector is available under a Confluent enterprise license. Confluent issues enterprise license keys to subscribers, along with providing enterprise-level support for Confluent Platform and your connectors. If you are a subscriber, please contact Confluent Support at support@confluent.io for more information.

See Confluent Platform license for license properties and License topic configuration for information about the license topic.

Configuration Properties

For a complete list of configuration properties for this connector, see Google Cloud Storage Source Connector Configuration Properties.

Note

For an example of how to get Kafka Connect connected to Confluent Cloud, see Distributed Cluster in Connect Kafka Connect to Confluent Cloud.

Quick Start

The following uses the GcsSinkConnector to write a file from the Kafka topic named gcs_topic to GCS. Then, the GcsSourceConnector loads that Avro file from GCS to the Kafka topic named copy_of_gcs_topic.

  1. Follow the instructions from the GCS Sink Connector quick start to set up the data to use below.

  2. Install the connector through the Confluent Hub Client.

    # run from your Confluent Platform installation directory
    confluent-hub install confluentinc/kafka-connect-gcs-source:latest
    

    Tip

    By default, the plugin is installed into share/confluent-hub-components and the directory is added to the plugin path. If this is the first connector you have installed, you may need to restart the connect server for the plugin path change to take effect.

Property-based example

  1. Create a gcs-source-connector.properties file with the following contents. This file is included with the connector in etc/kafka-connect-gcs/gcs-source-connector.properties. This configuration is used typically along with standalone workers.:

    name=gcs-source
    tasks.max=1
    connector.class=io.confluent.connect.gcs.GcsSourceConnector
    
    # enter the bucket name and GCS credentials here
    gcs.bucket.name=<bucket-name>
    gcs.credentials.path=</full/path/to/credentials/keys.json>
    
    format.class=io.confluent.connect.gcs.format.avro.AvroFormat
    
    confluent.topic.bootstrap.servers=localhost:9092
    confluent.topic.replication.factor=1
    
    # for production environments, enter the Confluent license here
    # confluent.license=
    

    Tip

    The following define the Confluent license stored in Kafka, so we need the Kafka bootstrap addresses. replication.factor may not be larger than the number of Kafka brokers in the destination cluster, so here we set this to ‘1’ for demonstration purposes. Always use at least ‘3’ in production configurations.

  2. Edit the gcs-source-connector.properties to add the following properties:

    transforms=AddPrefix
    transforms.AddPrefix.type=org.apache.kafka.connect.transforms.RegexRouter
    transforms.AddPrefix.regex=.*
    transforms.AddPrefix.replacement=copy_of_$0
    

    Important

    Adding this renames the output of topic of the messages to copy_of_gcs_topic. This prevents a continuous feedback loop of messages.

  3. Load the GCS Source Connector.

    Caution

    You must include a double dash (--) between the connector name and your flag. For more information, see this post.

    confluent local services connect connector load gcs-source --config gcs-source-connector.properties
    

    Important

    Don’t use the Confluent CLI in production environments.

  4. Confirm that the connector is in a RUNNING state.

    confluent local services connect connector status gcs-source
    
  5. Confirm that the messages are being sent to Kafka.

    kafka-avro-console-consumer \
        --bootstrap-server localhost:9092 \
        --property schema.registry.url=http://localhost:8081 \
        --topic copy_of_gcs_topic \
        --from-beginning | jq '.'
    
  6. The response should be 9 records as follows.

    {"f1": "value1"}
    {"f1": "value2"}
    {"f1": "value3"}
    {"f1": "value4"}
    {"f1": "value5"}
    {"f1": "value6"}
    {"f1": "value7"}
    {"f1": "value8"}
    {"f1": "value9"}
    

REST-based example

  1. Use this setting with distributed workers. Write the following JSON to config.json, configure all of the required values, and use the following command to post the configuration to one of the distributed connect workers. Check here for more information about the Kafka Connect REST API.

    {
      "name" : "GCSSourceConnector",
      "config" : {
        "format.class": "io.confluent.connect.gcs.format.avro.AvroFormat",
        "connector.class" : "io.confluent.connect.gcs.GcsSourceConnector",
        "gcs.bucket.name" : "confluent-kafka-connect-gcs-testing",
        "gcs.credentials.path" : "#/path/to/credentials/keys.json",
        "tasks.max" : "1",
        "confluent.topic.bootstrap.servers" : "localhost:9092",
        "confluent.topic.replication.factor" : "1",
        "confluent.license" : " Omit to enable trial mode ",
        "transforms" : "AddPrefix",
        "transforms.AddPrefix.type" : "org.apache.kafka.connect.transforms.RegexRouter",
        "transforms.AddPrefix.regex" : ".*",
        "transforms.AddPrefix.replacement" : "copy_of_$0"
      }
    }
    

    Note

    Change the confluent.topic.bootstrap.servers property to include your broker address(es), and change the confluent.topic.replication.factor to 3 for staging or production use.

  2. Use curl to post a configuration to one of the Kafka Connect Workers. Change http://localhost:8083/ to the endpoint of one of your Kafka Connect worker(s).

    curl -s -X POST -H 'Content-Type: application/json' --data @config.json http://localhost:8083/connectors
    
  3. Use the following command to update the configuration of existing connector.

    curl -s -X PUT -H 'Content-Type: application/json' --data @config.json http://localhost:8083/connectors/GCSSourceConnector/config
    
  4. To consume records written by connector to the configured Kafka topic, run the following command:

    kafka-avro-console-consumer --bootstrap-server localhost:9092 --property schema.registry.url=http://localhost:8081  --topic copy_of_gcs_topic --from-beginning
    

Google Cloud Storage Source Connector Partitions

The connector supports a variety of partitions. The GCS Source connector’s partitioner determines how records read from Blob Items are partitioned into Kafka topics. Messages will be put back on to the same Kafka partition for that topic when it was written.

The partitioner is specified in the connector configuration with the partitioner.class configuration property. The GCS Source connector comes with the following partitioners:

  • Default Partitioner: For supporting Default Partitioner You must configure the partitioner.class = io.confluent.connect.gcs.source.partitioner.DefaultPartitioner.
  • Field Partitioner: A partitioner that uses record values of the configured partition.field.name to determine partitions. For supporting Field Partitioner You must configure the partitioner.class = io.confluent.connect.gcs.source.partitioner.FieldPartitioner.
  • Time Based Partitioner: For supporting Time Based Partitioner You must configure the partitioner.class = io.confluent.connect.gcs.source.partitioner.TimeBasedPartitioner.
  • Daily Partitioner: A subclass of the TimeBasedPartitioner. For supporting Daily Partitioner You must configure the partitioner.class = io.confluent.connect.gcs.source.partitioner.DailyPartitioner.
  • Hourly Partitioner: A subclass of the TimeBasedPartitioner. For supporting Hourly Partitioner You must configure the partitioner.class = io.confluent.connect.gcs.source.partitioner.HourlyPartitioner.

Google Cloud Storage Source Connector Data Formats

GCS source connector supports several data formats:

  • Avro Format: For supporting Avro Format. You must configure the format.class = io.confluent.connect.gcs.format.avro.AvroFormat.
  • JSON Format: For supporting JSON Format. You must configure the format.class = io.confluent.connect.gcs.format.json.JsonFormat.
  • Raw Bytes Format: For supporting Raw Bytes Format. You must configure the format.class = io.confluent.connect.gcs.format.bytearray.ByteArrayFormat.

Troubleshooting Connector and Task Failures

Stack Trace

You can use the Connect REST API to check the status of the connectors and tasks. If a task or connector has failed, the trace field will include a reason and a stack trace.

Fewer Partitions than in the Source Topic

If there are fewer partitions in the destination cluster than are in the source topic, the connector task throws an exception and immmediately stops before it tries to write to a Kafka partition that does not exist. You will see the following error messages in the Connect worker log. The recommended practice is to create topics manually in the destination Kafka cluster with the correct number of partitions before running the source connector.

INFO WorkerSourceTask{id=gcs-source-0} Committing offsets
(org.apache.kafka.connect.runtime.WorkerSourceTask:409)
INFO WorkerSourceTask{id=gcs-source-0} flushing 1 outstanding messages for offset commit
(org.apache.kafka.connect.runtime.WorkerSourceTask:426)
ERROR WorkerSourceTask{id=gcs-source-0} Failed to flush, timed out while waiting
for producer to flush outstanding 1 messages (org.apache.kafka.connect.runtime.WorkerSourceTask:431)
ERROR WorkerSourceTask{id=gcs-source-0} Failed to commit offsets
(org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter:114)

Error Handling

The following behavior.on.error configuration properties set how the connector handles errors.

  • fail : The connector stops processing when an error occurs. The full batch of records will not be sent to Kafka if any record in the batch is corrupted.
  • ignore : The corrupted record is ignored. The connector continues processing the next record. For Avro, the connector ignores the file containing a corrupted record and continues processing records for the next file.
  • log : Logs an error message and continues processing the next record. For Avro, the connector ignores the file containing a corrupted record and continues processing records for the next file.

Note

The connector always ignores a file which is not in <topic>+<partition>+<offset>.<extension> format.