Backup and Restore Google Cloud Storage Source Connector for Confluent Platform¶
The Kafka Connect Backup and Restore Google Cloud Storage (GCS) Source connector provides the capability to read data exported to GCS by the Kafka Connect GCS Sink connector and publish it back to a Kafka topic. Depending on the format and partitioner used to write the data to GCS, this connector can write to the destination topic using the same partitions as the original messages exported to GCS and maintain the same message order. The connector selects folders based on the partitioner configuration and reads each folders GCS objects in alphabetical order. Each record is read based on the format selected. Configuration is designed to mirror the Kafka Connect GCS Sink connector and should be possible to create source connector configs with only minor changes to the original sink configuration.
Important
The recommended practice is to create topics manually in the destination Kafka cluster with the correct number of partitions before running the source connector. If the topics do not exist, Connect relies on configuring auto topic creation for source connectors and the number of partitions are based upon the Kafka broker defaults. If there are more partitions in the destination cluster, the extra partitions are not used. If there are fewer partitions in the destination cluster, the connector task continuously checks for the presence of the required partition and does not produce records but remains in the running state.
Be aware of the following connector actions:
- The connector ignores any GCS object with a name that does not start with the
configured topics directory. This name is
/topics/
by default. - The connector ignores any GCS object that is below the topics directory but has
an extension that does not match the configured format. For example, a JSON file
is ignored when
format.class
is set for Avro files. - The connector stops and fails if the GCS object’s name does not match the expected format or is in an unexpected location.
- The connector ignores any GCS object that is below the topics directory but has
a partitioner structure that does not match the configured class. For example,
DefaultPartitioner
structure is ignored whenpartitioner.class
is set forTimeBasedPartitioner
.
Avoid the following configuration issues:
- A file with the correct extension and a valid name format e.g.
<topic>+<partition>+<offset>.<extension>
, placed in a folder of a different topic will be read normally and written to whatever topic as defined by its filename. - Do not configure a field partitioner to match the expected folder. Doing so can break the ordering that was reflected when a GCS sink exported the records using a deterministic sink partitioner.
Features¶
The Backup and Restore GCS Source connector includes a variety of features:
- At least once delivery
- Multiple tasks
- Pluggable data format with or without schema
- Matching source partitioning
- Source partition ordering
- Pluggable partitioner
At least once delivery¶
In the event of a task failure the connector guarantees no messages are lost, although the last few messages may be processed again.
Multiple tasks¶
The Backup and Restore GCS Source connector supports running one or more tasks.
You can specify the number of tasks in the tasks.max
configuration
parameter. This can lead to performance gains when multiple files need to
be parsed.
Pluggable data format with or without schema¶
Out of the box, the connector supports reading data from GCS in Avro and JSON format. Besides records with schema, the connector supports importing plain JSON records without schema in text files, one record per line. In general, the connector may accept any format that provides an implementation of the Format interface.
Matching source partitioning¶
Messages will be put back on to the same Kafka partition for that topic when it was written.
Source partition ordering¶
The connector will read records back in time order in each topic-source
partition if the DefaultPartitioner
or a TimeBasedPartitioner
is used.
If a FieldPartitioner
is used it isn’t possible to guarantee the order of
these messages.
Pluggable partitioner¶
The connector comes out of the box with partitioners that support default
partitioning based on Kafka partitions, field partitioning, and time-based
partitioning in days or hours. You may implement your own partitioners by
extending the Partitioner
class. Additionally, you can customize time based
partitioning by extending the TimeBasedPartitioner
class.
All partitioners will notice new topic folders with the inbuilt task
reconfiguration thread. The DefaultPartitioner
detects new partition
folders. The FieldPartitioner
notices new folders for the fields
specified. However, the TimeBasedPartititoner
does not currently detect new
files for a new time period.
Be careful when both the Connect GCS Sink connector and the GCS Source connector use the same Kafka cluster, since this results in the source connector writing to the same topic being consumed by the sink connector. This causes a continuous feedback loop that creates an ever-increasing number of duplicate Kafka records and GCS objects. It is possible to avoid this feedback loop by writing to a different topic than the one being consumed by the sink connector. Use the RegexRouter with the source connector to change the names of the topics where the records are written. Or, use the Extract topic SMT with the source connector to change the topic name based upon a field in each message.
Tip
By default, connectors inherit the partitioner used for the Kafka topic. You
can create a custom partitioner for a connector which you must place in the
connector’s /lib
folder.
You can also put partitioners in a common location of choice. If you choose
this option, you must add a symlink to the location from each connector’s
/lib
folder. For example, you would place a custom partitioner in the
path share/confluent-hub-components/partitioners
and then add the symlink
share/confluent-hub-components/kafka-connect-s3/lib/partitioners -> ../../partitioners
.
Limitations¶
For the TimeBasedPartitioner
, the capacity to scale the connector across
various time ranges is limited in Backup and Restore mode. Currently, the
connector does not support processing data that spans several years.
License¶
You can use this connector for a 30-day trial period without a license key.
After 30 days, you must purchase a connector subscription which includes Confluent enterprise license keys to subscribers, along with enterprise-level support for Confluent Platform and your connectors. If you are a subscriber, you can contact Confluent Support at support@confluent.io for more information.
For license properties, see Confluent Platform license. For information about the license topic, see Confluent License Properties.
Configuration Properties¶
For a complete list of configuration properties for this connector, see Configuration Reference for Google Cloud Storage Source Connector for Confluent Platform.
For an example of how to get Kafka Connect connected to Confluent Cloud, see Connect Self-Managed Kafka Connect to Confluent Cloud.
Install the Backup and Restore GCS Source Connector¶
You can install this connector by using the confluent connect plugin install command, or by manually downloading the ZIP file.
Prerequisites¶
You must install the connector on every machine where Connect will run.
Kafka Broker: Confluent Platform 3.3.0 or later, or Kafka 0.11.0 or later.
Connect: Confluent Platform 4.0.0 or later, or Kafka 1.0.0 or later.
Java 1.8.
An installation of the latest (
latest
) connector version.To install the
latest
connector version, navigate to your Confluent Platform installation directory and run the following command:confluent connect plugin install confluentinc/kafka-connect-gcs-source:latest
You can install a specific version by replacing
latest
with a version number as shown in the following example:confluent connect plugin install confluentinc/kafka-connect-gcs-source:2.6.0
Install the connector manually¶
Download and extract the ZIP file for your connector and then follow the manual connector installation instructions.
Quick Start¶
The following uses the GcsSinkConnector
to write a file from the Kafka topic
named gcs_topic
to GCS. Then, the GcsSourceConnector
loads that Avro file
from GCS to the Kafka topic named copy_of_gcs_topic
.
Follow the instructions from the GCS Sink connector quick start to set up the data to use below.
Install the connector through the Confluent Hub Client.
# run from your Confluent Platform installation directory confluent connect plugin install confluentinc/kafka-connect-gcs-source:latest
Tip
By default, the connector will install the plugin into the
share/confluent-hub-components
directory and add the directory to the plugin path. For the plugin path change to take effect, you must restart the Connect worker.
Property-based example¶
Create a
gcs-source-connector.properties
file with the following contents. This file is included with the connector inetc/kafka-connect-gcs/gcs-source-connector.properties
. This configuration is used typically along with standalone workers.:name=gcs-source tasks.max=1 connector.class=io.confluent.connect.gcs.GcsSourceConnector # enter the bucket name and GCS credentials here gcs.bucket.name=<bucket-name> gcs.credentials.path=</full/path/to/credentials/keys.json> format.class=io.confluent.connect.gcs.format.avro.AvroFormat confluent.topic.bootstrap.servers=localhost:9092 confluent.topic.replication.factor=1 # for production environments, enter the Confluent license here # confluent.license=
Tip
The following define the Confluent license stored in Kafka, so we need the Kafka bootstrap addresses.
replication.factor
may not be larger than the number of Kafka brokers in the destination cluster, so here we set this to ‘1’ for demonstration purposes. Always use at least ‘3’ in production configurations.Edit the
gcs-source-connector.properties
to add the following properties:transforms=AddPrefix transforms.AddPrefix.type=org.apache.kafka.connect.transforms.RegexRouter transforms.AddPrefix.regex=.* transforms.AddPrefix.replacement=copy_of_$0
Important
Adding this renames the output of topic of the messages to
copy_of_gcs_topic
. This prevents a continuous feedback loop of messages.Load the Backup and Restore GCS Source connector.
Caution
You must include a double dash (
--
) between the connector name and your flag. For more information, see this post.confluent local services connect connector load gcs-source --config gcs-source-connector.properties
Important
Don’t use the Confluent CLI in production environments.
Confirm that the connector is in a
RUNNING
state.confluent local services connect connector status gcs-source
Confirm that the messages are being sent to Kafka.
kafka-avro-console-consumer \ --bootstrap-server localhost:9092 \ --property schema.registry.url=http://localhost:8081 \ --topic copy_of_gcs_topic \ --from-beginning | jq '.'
The response should be 9 records as follows.
{"f1": "value1"} {"f1": "value2"} {"f1": "value3"} {"f1": "value4"} {"f1": "value5"} {"f1": "value6"} {"f1": "value7"} {"f1": "value8"} {"f1": "value9"}
REST-based example¶
Use this setting with distributed workers. Write the following JSON to config.json, configure all of the required values, and use the following command to post the configuration to one of the distributed connect workers. Check here for more information about the Kafka Connect REST API.
{ "name" : "GCSSourceConnector", "config" : { "format.class": "io.confluent.connect.gcs.format.avro.AvroFormat", "connector.class" : "io.confluent.connect.gcs.GcsSourceConnector", "gcs.bucket.name" : "confluent-kafka-connect-gcs-testing", "gcs.credentials.path" : "#/path/to/credentials/keys.json", "tasks.max" : "1", "confluent.topic.bootstrap.servers" : "localhost:9092", "confluent.topic.replication.factor" : "1", "confluent.license" : " Omit to enable trial mode ", "transforms" : "AddPrefix", "transforms.AddPrefix.type" : "org.apache.kafka.connect.transforms.RegexRouter", "transforms.AddPrefix.regex" : ".*", "transforms.AddPrefix.replacement" : "copy_of_$0" } }
Note
Change the
confluent.topic.bootstrap.servers
property to include your broker address(es), and change theconfluent.topic.replication.factor
to 3 for staging or production use.Use curl to post a configuration to one of the Kafka Connect Workers. Change
http://localhost:8083/
to the endpoint of one of your Kafka Connect worker(s).curl -s -X POST -H 'Content-Type: application/json' --data @config.json http://localhost:8083/connectors
Use the following command to update the configuration of existing connector.
curl -s -X PUT -H 'Content-Type: application/json' --data @config.json http://localhost:8083/connectors/GCSSourceConnector/config
To consume records written by connector to the configured Kafka topic, run the following command:
kafka-avro-console-consumer --bootstrap-server localhost:9092 --property schema.registry.url=http://localhost:8081 --topic copy_of_gcs_topic --from-beginning
Backup and Restore GCS Source Connector Partitions¶
The Backup and Restore GCS Source connector supports a variety of partitions. The connector’s partitioner determines how records that are read from blob items are partitioned into Kafka topics. Messages will be put back on to the same Kafka partition for that topic when it was written.
The partitioner is specified in the connector configuration with the
partitioner.class
configuration property. The Backup and Restore GCS Source connector comes
with the following partitioners:
- DefaultPartitioner: To use the
DefaultPartitioner
, you must set thepartitioner.class
property toio.confluent.connect.storage.partitioner.DefaultPartitioner
. - FieldPartitioner: A partitioner that uses record values of the configured
partition.field.name
to determine partitions. To use theFieldPartitioner
, you must set thepartitioner.class
property toio.confluent.connect.storage.partitioner.FieldPartitioner
. - TimeBasedPartitioner: To use the
TimeBasedPartitioner
, you must set thepartitioner.class
property toio.confluent.connect.storage.partitioner.TimeBasedPartitioner
. - DailyPartitioner: A subclass of the
TimeBasedPartitioner
. To use theDailyPartitioner
, you must set thepartitioner.class
property toio.confluent.connect.storage.partitioner.DailyPartitioner
. - HourlyPartitioner: A subclass of the
TimeBasedPartitioner
. To use theHourlyPartitioner
, you must set thepartitioner.class
property toio.confluent.connect.storage.partitioner.HourlyPartitioner
.
Backup and Restore GCS Source Connector Data Formats¶
The Backup and Restore GCS Source connector supports several data formats:
- Avro: For supporting Avro format. To use Avro format, you must set the
format.class
property toio.confluent.connect.gcs.format.avro.AvroFormat
. - JSON: For supporting JSON format. To use JSON format, you must set the
format.class
property toio.confluent.connect.gcs.format.json.JsonFormat
. - Raw Bytes: For supporting Raw Bytes format. To use the Raw Bytes format, you
must set the
format.class
property toio.confluent.connect.gcs.format.bytearray.ByteArrayFormat
.
Custom Credentials Provider¶
You can configure the GCS connector to use a custom credentials provider, instead of the default one provided by the GCS connector. To do this, you implement a custom credentials provider, build it as a JAR file, and deploy the JAR file to use the custom provider.
Complete the following steps to use a custom credentials provider:
- Set a custom credentials provider class: Set the
gcs.credentials.provider.class
property to a class that implements thecom.google.api.gax.core.CredentialsProvider
interface. Configure the class to the fully qualified name of your custom credentials provider class. - Configure additional settings (Optional): For additional configuration, prefix the configuration keys with
gcs.credentials.provider.
If your custom credentials provider needs to accept additional configuration, implement theorg.apache.kafka.common.Configurable
interface that lets the connector receive configurations that are prefixed withgcs.credentials.provider.
. - Ensure a public no-args constructor: Your custom credentials provider class must have a public no-argument constructor. This is necessary because the connector creates an instance of the provider using this constructor.
- Package your provider: Once your custom credentials provider class is implemented, package it into a JAR file.
- Copy the JAR file to Connect Worker: Copy the built JAR file to the
share/java/kafka-connect-gcs
directory on all Connect workers. This step ensures that the GCS connector can access and use your custom credentials provider.
Troubleshooting Connector and Task Failures¶
Stack Trace¶
You can use the Connect REST API
to check the status of the connectors and tasks. If a task or connector has
failed, the trace
field will include a reason and a stack trace.
Error Handling¶
The following behavior.on.error
configuration properties set how the connector handles errors.
fail
: The connector stops processing when an error occurs. The full batch of records will not be sent to Kafka if any record in the batch is corrupted.ignore
: The corrupted record is ignored. The connector continues processing the next record. For Avro, the connector ignores the file containing a corrupted record and continues processing records for the next file.log
: Logs an error message and continues processing the next record. For Avro, the connector ignores the file containing a corrupted record and continues processing records for the next file.
Note
The connector always ignores a file which is not in <topic>+<partition>+<offset>.<extension>
format.