Important
You are viewing documentation for an older version of Confluent Platform. For the latest, click here.
GCS Connector¶
The GCS connector, currently available as a sink, allows you to export data from Kafka topics to GCS objects in various formats. In addition, for certain data layouts, GCS connector exports data by guaranteeing exactly-once delivery semantics to consumers of the GCS objects it produces.
Relation to S3¶
The GCS connector is the counterpart of the S3 cloud storage sink connector in Google Cloud.
See the S3 Connector documentation for details on generic properties of cloud storage sink connectors, such as exactly once delivery, pluggable data formats, schema evolution, basic configuration, writing raw message values, and pluggable partitioners.
Installation¶
Installation is recommended via the Confluent Hub command-line tool, which comes packaged with Confluent Platform.
Tip
If not already in your PATH, add Confluent’s bin
directory by running: export PATH=<path-to-confluent>/bin:$PATH
Use the confluent-hub
command-line tool:
confluent-hub install confluentinc/kafka-connect-gcs:latest
The interactive command-line tool will guide you to download and install the GCS connector to the appropriate path.
Quickstart¶
In this Quickstart, we use the GCS connector to export data produced by the Avro console producer to GCS.
It is assumed that the connector was installed using confluent-hub
command line tool as described in the previous section.
Preparing a Bucket¶
Before you begin, you will need to create a GCS destination bucket. Navigate to the Google Cloud Platform console website. Use the web interface to create a new bucket.
To download credentials, navigate to “APIs & Services” -> “Credentials.” From the Credentials page, you can “Create credentials,” then select “Service Account Key.” Select the appropriate account, and download the JSON file. The location of the downloaded JSON file will be used in your GCS Connector configuration.
Start Confluent¶
Next, start the services with one command using Confluent command-line tool:
confluent start
Every service will start in order, printing a message with its status:
Starting zookeeper
zookeeper is [UP]
Starting kafka
kafka is [UP]
Starting schema-registry
schema-registry is [UP]
Starting kafka-rest
kafka-rest is [UP]
Starting connect
connect is [UP]
Starting ksql
ksql is [UP]
Starting control-center
control-center is [UP]
To import a few records with a simple schema in Kafka, start the Avro console producer as follows:
kafka-avro-console-producer --broker-list localhost:9092 --topic gcs_topic \
--property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}'
Then, in the console producer, type in:
{"f1": "value1"}
{"f1": "value2"}
{"f1": "value3"}
{"f1": "value4"}
{"f1": "value5"}
{"f1": "value6"}
{"f1": "value7"}
{"f1": "value8"}
{"f1": "value9"}
The nine records entered are published to the Kafka topic gcs_topic
in Avro format.
Starting the Connector¶
Before starting the connector, create a config file. For example, save the following properties file as quickstart-gcs.propreties
, or edit the file that is included in the connector archive (under the “etc” directory):
name=gcs-sink
connector.class=io.confluent.connect.gcs.GcsSinkConnector
tasks.max=1
topics=gcs_topic
gcs.bucket.name=#bucket-name
gcs.part.size=5242880
flush.size=3
gcs.credentials.path=#/path/to/credentials/keys.json
storage.class=io.confluent.connect.gcs.storage.GcsStorage
format.class=io.confluent.connect.gcs.format.avro.AvroFormat
partitioner.class=io.confluent.connect.storage.partitioner.DefaultPartitioner
schema.compatibility=NONE
confluent.topic.bootstrap.servers=localhost:9092
confluent.topic.replication.factor=1
Fill in appropriate values for gcs.bucket.name
and gcs.credentials.path
. It is recommended to use absolute paths.
Then start the GCS connector by loading its configuration with the following command:
confluent load gcs -d quickstart-gcs.properties
{
"name": "gcs",
"config": {
"connector.class": "io.confluent.connect.gcs.GcsSinkConnector",
"tasks.max": "1",
"topics": "gcs_topic",
"gcs.bucket.name": "#bucket-name",
"gcs.part.size": "5242880",
"flush.size": "3",
"gcs.credentials.path": "#/path/to/credentials/keys.json",
"storage.class": "io.confluent.connect.gcs.storage.GcsStorage",
"format.class": "io.confluent.connect.gcs.format.avro.AvroFormat",
"partitioner.class": "io.confluent.connect.storage.partitioner.DefaultPartitioner",
"schema.compatibility": "NONE",
"confluent.topic.bootstrap.servers": "localhost:9092",
"confluent.topic.replication.factor": "1",
"name": "gcs"
},
"tasks": [],
"type": null
}
Towards the end of the log you should see that the connector starts, logs a few messages, and then uploads data from Kafka to GCS. Once the connector has ingested some records check that the data is available in GCS, for instance by viewing the bucket in the GCS web browser console.
You should see three objects with keys:
topics/gcs_topic/partition=0/gcs_topic+0+0000000000.avro
topics/gcs_topic/partition=0/gcs_topic+0+0000000003.avro
topics/gcs_topic/partition=0/gcs_topic+0+0000000006.avro
Each file name is encoded as <topic>+<kafkaPartition>+<startOffset>.<format>
.
To verify the contents, use avro-tools-1.8.2.jar
(available in Apache mirrors) to
print the records:
java -jar avro-tools-1.8.2.jar tojson gcs_topic+0+0000000000.avro
For the file above, you should see the following output:
{"f1":"value1"}
{"f1":"value2"}
{"f1":"value3"}
with the rest of the records contained in the other two files.
Finally, stop the Connect worker as well as all the rest of the Confluent services by running:
confluent stop
or stop all the services and additionally wipe out any data generated during this quickstart by running:
confluent destroy