Important

You are viewing documentation for an older version of Confluent Platform. For the latest, click here.

GCS Connector

The GCS connector, currently available as a sink, allows you to export data from Kafka topics to GCS objects in various formats. In addition, for certain data layouts, GCS connector exports data by guaranteeing exactly-once delivery semantics to consumers of the GCS objects it produces.

Relation to S3

The GCS connector is the counterpart of the S3 cloud storage sink connector in Google Cloud.

See the S3 Connector documentation for details on generic properties of cloud storage sink connectors, such as exactly once delivery, pluggable data formats, schema evolution, basic configuration, writing raw message values, and pluggable partitioners.

Installation

Installation is recommended via the Confluent Hub command-line tool, which comes packaged with Confluent Platform.

Tip

If not already in your PATH, add Confluent’s bin directory by running: export PATH=<path-to-confluent>/bin:$PATH

Use the confluent-hub command-line tool:

confluent-hub install confluentinc/kafka-connect-gcs:latest

The interactive command-line tool will guide you to download and install the GCS connector to the appropriate path.

Quickstart

In this Quickstart, we use the GCS connector to export data produced by the Avro console producer to GCS. It is assumed that the connector was installed using confluent-hub command line tool as described in the previous section.

Preparing a Bucket

Before you begin, you will need to create a GCS destination bucket. Navigate to the Google Cloud Platform console website. Use the web interface to create a new bucket.

To download credentials, navigate to “APIs & Services” -> “Credentials.” From the Credentials page, you can “Create credentials,” then select “Service Account Key.” Select the appropriate account, and download the JSON file. The location of the downloaded JSON file will be used in your GCS Connector configuration.

Start Confluent

Next, start the services with one command using Confluent command-line tool:

confluent start

Every service will start in order, printing a message with its status:

Starting zookeeper
zookeeper is [UP]
Starting kafka
kafka is [UP]
Starting schema-registry
schema-registry is [UP]
Starting kafka-rest
kafka-rest is [UP]
Starting connect
connect is [UP]
Starting ksql
ksql is [UP]
Starting control-center
control-center is [UP]

To import a few records with a simple schema in Kafka, start the Avro console producer as follows:

kafka-avro-console-producer --broker-list localhost:9092 --topic gcs_topic \
--property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}'

Then, in the console producer, type in:

{"f1": "value1"}
{"f1": "value2"}
{"f1": "value3"}
{"f1": "value4"}
{"f1": "value5"}
{"f1": "value6"}
{"f1": "value7"}
{"f1": "value8"}
{"f1": "value9"}

The nine records entered are published to the Kafka topic gcs_topic in Avro format.

Starting the Connector

Before starting the connector, create a config file. For example, save the following properties file as quickstart-gcs.propreties, or edit the file that is included in the connector archive (under the “etc” directory):

name=gcs-sink
connector.class=io.confluent.connect.gcs.GcsSinkConnector
tasks.max=1
topics=gcs_topic

gcs.bucket.name=#bucket-name
gcs.part.size=5242880
flush.size=3

gcs.credentials.path=#/path/to/credentials/keys.json

storage.class=io.confluent.connect.gcs.storage.GcsStorage
format.class=io.confluent.connect.gcs.format.avro.AvroFormat
partitioner.class=io.confluent.connect.storage.partitioner.DefaultPartitioner

schema.compatibility=NONE

confluent.topic.bootstrap.servers=localhost:9092
confluent.topic.replication.factor=1

Fill in appropriate values for gcs.bucket.name and gcs.credentials.path. It is recommended to use absolute paths.

Then start the GCS connector by loading its configuration with the following command:

 confluent load gcs -d quickstart-gcs.properties
 {
  "name": "gcs",
  "config": {
    "connector.class": "io.confluent.connect.gcs.GcsSinkConnector",
    "tasks.max": "1",
    "topics": "gcs_topic",
    "gcs.bucket.name": "#bucket-name",
    "gcs.part.size": "5242880",
    "flush.size": "3",
    "gcs.credentials.path": "#/path/to/credentials/keys.json",
    "storage.class": "io.confluent.connect.gcs.storage.GcsStorage",
    "format.class": "io.confluent.connect.gcs.format.avro.AvroFormat",
    "partitioner.class": "io.confluent.connect.storage.partitioner.DefaultPartitioner",
    "schema.compatibility": "NONE",
    "confluent.topic.bootstrap.servers": "localhost:9092",
    "confluent.topic.replication.factor": "1",
    "name": "gcs"
  },
  "tasks": [],
  "type": null
}

Towards the end of the log you should see that the connector starts, logs a few messages, and then uploads data from Kafka to GCS. Once the connector has ingested some records check that the data is available in GCS, for instance by viewing the bucket in the GCS web browser console.

You should see three objects with keys:

topics/gcs_topic/partition=0/gcs_topic+0+0000000000.avro
topics/gcs_topic/partition=0/gcs_topic+0+0000000003.avro
topics/gcs_topic/partition=0/gcs_topic+0+0000000006.avro

Each file name is encoded as <topic>+<kafkaPartition>+<startOffset>.<format>.

To verify the contents, use avro-tools-1.8.2.jar (available in Apache mirrors) to print the records:

java -jar avro-tools-1.8.2.jar tojson gcs_topic+0+0000000000.avro

For the file above, you should see the following output:

{"f1":"value1"}
{"f1":"value2"}
{"f1":"value3"}

with the rest of the records contained in the other two files.

Finally, stop the Connect worker as well as all the rest of the Confluent services by running:

confluent stop

or stop all the services and additionally wipe out any data generated during this quickstart by running:

confluent destroy