Kafka Connect GCS

The Google Cloud Storage (GCS) connector, currently available as a sink, allows you to export data from Kafka topics to GCS objects in various formats. In addition, for certain data layouts, GCS connector exports data by guaranteeing exactly-once delivery semantics to consumers of the GCS objects it produces.

Relation to S3

The GCS connector is the counterpart of the S3 cloud storage sink connector in Google Cloud.

See the S3 Connector documentation for details on generic properties of cloud storage sink connectors, such as exactly once delivery, pluggable data formats, schema evolution, basic configuration, writing raw message values, and pluggable partitioners.

Install GCS Connector

You can install this connector by using the Confluent Hub client (recommended) or you can manually download the ZIP file.

confluent-hub install confluentinc/kafka-connect-gcs:latest

You can install a specific version by replacing latest with a version number. For example:

confluent-hub install confluentinc/kafka-connect-gcs:5.0.0

Download

Download the ZIP file and extract it into a directory that is listed on the plugin path of the Connect worker configuration properties (e.g. plugin.path=/usr/local/share/kafka/plugins). This must be done on each of the installations where Connect will be run. For more information, see Installing Plugins.

Quick Start

In this quick start, we use the GCS connector to export data produced by the Avro console producer to GCS. It is assumed that the connector was installed using confluent-hub command line tool as described in the previous section.

Prepare a Bucket

Before you begin, you will need to create a GCS destination bucket.

In order to use the web interface, navigate to the Google Cloud Platform console website, and use the GUI to create a bucket. To download credentials for the newly created bucket, navigate to APIs & Services -> Credentials. From the Credentials page, you can Create credentials, then select Service Account Key. Select the appropriate account, and download the JSON file. The location of the downloaded JSON file will be used in your GCS Connector configuration.

To use the CLI, use the gsutil mb command. Installation and usage documentation at the Google Cloud SDK page.

Start Confluent Platform

Next, start the services with one command using Confluent command-line tool:

confluent start

Every service will start in order, printing a message with its status:

Starting zookeeper
zookeeper is [UP]
Starting kafka
kafka is [UP]
Starting schema-registry
schema-registry is [UP]
Starting kafka-rest
kafka-rest is [UP]
Starting connect
connect is [UP]
Starting ksql-server
ksql-server is [UP]
Starting control-center
control-center is [UP]

To import a few records with a simple schema in Kafka, start the Avro console producer as follows:

kafka-avro-console-producer --broker-list localhost:9092 --topic gcs_topic \
--property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}'

Then, in the console producer, type in:

{"f1": "value1"}
{"f1": "value2"}
{"f1": "value3"}
{"f1": "value4"}
{"f1": "value5"}
{"f1": "value6"}
{"f1": "value7"}
{"f1": "value8"}
{"f1": "value9"}

The nine records entered are published to the Kafka topic gcs_topic in Avro format.

Start the Connector

Before starting the connector, create a config file. For example, save the following properties file as quickstart-gcs.propreties, or edit the file that is included in the connector archive (under the “etc” directory):

name=gcs-sink
connector.class=io.confluent.connect.gcs.GcsSinkConnector
tasks.max=1
topics=gcs_topic

gcs.bucket.name=#bucket-name
gcs.part.size=5242880
flush.size=3

gcs.credentials.path=#/path/to/credentials/keys.json

storage.class=io.confluent.connect.gcs.storage.GcsStorage
format.class=io.confluent.connect.gcs.format.avro.AvroFormat
partitioner.class=io.confluent.connect.storage.partitioner.DefaultPartitioner

schema.compatibility=NONE

confluent.topic.bootstrap.servers=localhost:9092
confluent.topic.replication.factor=1

confluent.license=""

Fill in appropriate values for gcs.bucket.name and gcs.credentials.path. It is recommended to use absolute paths.

Then start the GCS connector by loading its configuration with the following command:

 confluent load gcs -d quickstart-gcs.properties
 {
  "name": "gcs",
  "config": {
    "connector.class": "io.confluent.connect.gcs.GcsSinkConnector",
    "tasks.max": "1",
    "topics": "gcs_topic",
    "gcs.bucket.name": "#bucket-name",
    "gcs.part.size": "5242880",
    "flush.size": "3",
    "gcs.credentials.path": "#/path/to/credentials/keys.json",
    "storage.class": "io.confluent.connect.gcs.storage.GcsStorage",
    "format.class": "io.confluent.connect.gcs.format.avro.AvroFormat",
    "partitioner.class": "io.confluent.connect.storage.partitioner.DefaultPartitioner",
    "schema.compatibility": "NONE",
    "confluent.topic.bootstrap.servers": "localhost:9092",
    "confluent.topic.replication.factor": "1",
    "name": "gcs"
  },
  "tasks": [],
  "type": null
}

Towards the end of the log you should see that the connector starts, logs a few messages, and then uploads data from Kafka to GCS. Once the connector has ingested some records check that the data is available in GCS, for instance by viewing the bucket in the GCS web browser console.

You should see three objects with keys:

topics/gcs_topic/partition=0/gcs_topic+0+0000000000.avro
topics/gcs_topic/partition=0/gcs_topic+0+0000000003.avro
topics/gcs_topic/partition=0/gcs_topic+0+0000000006.avro

Each file name is encoded as <topic>+<kafkaPartition>+<startOffset>.<format>.

To verify the contents, use avro-tools-1.8.2.jar (available in Apache mirrors) to print the records:

java -jar avro-tools-1.8.2.jar tojson gcs_topic+0+0000000000.avro

For the file above, you should see the following output:

{"f1":"value1"}
{"f1":"value2"}
{"f1":"value3"}

with the rest of the records contained in the other two files.

Finally, stop the Connect worker as well as all the rest of the Confluent services by running:

confluent stop

or stop all the services and additionally wipe out any data generated during this quick start by running:

confluent destroy