.. _connect_gcs: |kconnect-long| GCS Sink Connector ================================== The Google Cloud Storage (GCS) connector, currently available as a sink, allows you to export data from |ak-tm| topics to GCS objects in various formats. In addition, for certain data layouts, GCS connector exports data by guaranteeing exactly-once delivery semantics to consumers of the GCS objects it produces. Relation to S3 -------------- The GCS connector is the counterpart of the S3 cloud storage sink connector in Google Cloud. See the :ref:`S3 Connector documentation <connect_s3>` for details on generic properties of cloud storage sink connectors, such as exactly once delivery, pluggable data formats, schema evolution, basic configuration, writing raw message values, and pluggable partitioners. Install GCS Connector --------------------- .. include:: ../includes/connector-install.rst .. codewithvars:: bash confluent-hub install confluentinc/kafka-connect-gcs:latest .. include:: ../includes/connector-install-version.rst .. codewithvars:: bash confluent-hub install confluentinc/kafka-connect-gcs:5.0.1 -------------------------- Install Connector Manually -------------------------- `Download and extract the ZIP file <https://www.confluent.io/connector/kafka-connect-gcs/#download>`_ for your connector and then follow the manual connector installation :ref:`instructions <connect_install_connectors>`. .. include:: ../includes/demo-kinesis.rst .. _gcs-connector-license-key: License ------- .. include:: ../includes/enterprise-license.rst See :ref:`gcs-sink-connector-license-config` for license properties and :ref:`gcs_license-topic-configuration` for information about the license topic. .. _gcs_mapping_records: Mapping Records to GCS Objects ------------------------------ The GCS connector consumes records from the specified topics, organizes them into different partitions, writes batches of records in each partition to an file, and then uploads those files to the GCS bucket. It uses GCS object paths that include the |ak| topic and partition, the computed partition, and the filename. The GCS connector offers several ways to customize this behavior, including: * :ref:`Controlling the names of the GCS objects <gcs_object_names>` * :ref:`Determining how records are partitioned into GCS objects <gcs_partitioners>` * :ref:`The format used to serialize sets of records into GCS objects <gcs_formats>` * :ref:`When to upload GCS objects <gcs_rotations>` .. _gcs_object_names: ---------------- GCS Object Names ---------------- The GCS data model is a flat structure: each bucket stores objects, and the name of each GCS object serves as the unique key. However, a logical hierarchy can be inferred when the GCS object names uses directory delimiters, such as ``/``. The GCS connector allows you to customize the names of the GCS objects it uploads to the GCS bucket. In general, the names of the GCS object uploaded by the GCS connector follow this format: .. codewithvars:: bash <prefix>/<topic>/<encodedPartition>/<topic>+<kafkaPartition>+<startOffset>.<format> where: * ``<prefix>`` is specified with the connector's ``topics.dir`` configuration property, which defaults to the literal value ``topics`` and helps create uniquely named GCS objects that don't clash with existing GCS objects in the same bucket. * ``<topic>`` corresponds to the name of the |ak| topic from which the records in this GCS object were read. * ``<encodedPartition>`` is generated by the GCS connector's partitioner (see :ref:`gcs_partitioners`). * ``<kafkaPartition>`` is the |ak| partition number from which the records in this GCS object were read. * ``<startOffset>`` is the |ak| offset of the first record written to this GCS object. * ``<format>`` is the extension identifing the format in which the records are serialized in this GCS object. If desired, the ``/`` and ``+`` characters can be changed using the connector's ``directory.delim`` and ``file.delim`` configuration properties. .. _gcs_partitioners: ------------------------------------- Partitioning Records into GCS Objects ------------------------------------- The GCS connector's *partitioner* determines how records read from a |ak| topic are partitioned into GCS objects. The partitioner determines the ``<encodedPartition>`` portion of the GCS object names (see :ref:`gcs_object_names`). The partitioner is specified in the connector configuration with the ``partitioner.class`` configuration property. The GCS connector comes with the following partitioners: * **Default (|ak|) Partitioner**: The ``io.confluent.connect.storage.partitioner.DefaultPartitioner`` preserves the same topic partitions as in |ak|, and records from each topic partition ultimately end up in GCS objects with names that include the |ak| topic and |ak| partitions. The ``<encodedPartition>`` is always ``<topicName>/partition=<kafkaPartition>``, resulting in GCS object names such as ``<prefix>/<topic>/partition=<kafkaPartition>/<topic>+<kafkaPartition>+<startOffset>.<format>``. * **Field Partitioner**: The ``io.confluent.connect.storage.partitioner.FieldPartitioner`` determines the partition from the field within each each record identified by the connector's ``partition.field.name`` configuration property, which has no default. This partitioner requires ``STRUCT`` record type values. The ``<encodedPartition>`` is always ``<topicName>/<fieldName>=<fieldValue>``, resulting in GCS object names of the form ``<prefix>/<topic>/<fieldName>=<fieldValue>/<topic>+<kafkaPartition>+<startOffset>.<format>``. * **Time Based Partitioner**: The ``io.confluent.connect.storage.partitioner.TimeBasedPartitioner`` determines the partition from the year, month, day, hour, minutes, and/or seconds. This partitioner requires the following connector configuration properties: * The ``path.format`` configuration property specifies the pattern used for the ``<encodedPartition>`` portion of the GCS object name. For example, when ``path.format='year'=YYYY/'month'=MM/'day'=dd/'hour'=HH``, GCS object names will have the form ``<prefix>/<topic>/year=YYYY/month=MM/day=dd/hour=HH/<topic>+<kafkaPartition>+<startOffset>.<format>``. * The ``partition.duration.ms`` configuration property defines the maximum granularity of the GCS objects within a single encoded partition directory. For example, setting ``partition.duration.ms=600000`` (10 minutes) will result in each GCS object in that directory having no more than 10 minutes of records. * The ``locale`` configuration property specifies the JDK's locale used for formatting dates and times. For example, use ``en-US`` for US English, ``en-GB`` for UK English, and ``fr-FR`` for French (in France). These may vary by Java version; see the `available locales <http://www.localeplanet.com/java/>`__. * The ``timezone`` configuration property specifies the current timezone in which the dates and times will be treated. Use standard short names for timezones such as ``UTC`` or (without daylight savings) ``PST``, ``EST``, and ``ECT``, or longer standard names such as ``America/Los_Angeles``, ``America/New_York``, and ``Europe/Paris``. These may vary by Java version; see the `available timezones within each locale <http://www.localeplanet.com/java>`__, such as `those within the "en_US" locale <http://www.localeplanet.com/java/en-US/index.html>`__. * The ``timestamp.extractor`` configuration property determines how to obtain a timestamp from each record. Values can include ``Wallclock`` (the default) to use the system time when the record is processed, ``Record`` to use the timestamp of the |ak| record denoting when it was produced or stored by the broker, ``RecordField`` to extract the timestamp from one of the fields in the record's value as specified by the ``timestamp.field`` configuration property. * **Daily Partitioner**: The ``io.confluent.connect.storage.partitioner.DailyPartitioner`` is equivalent to the TimeBasedPartitioner with ``path.format='year'=YYYY/'month'=MM/'day'=dd`` and ``partition.duration.ms=86400000`` (one day, for one GCS object in each daily directory). This partitioner always results in GCS object names of the form ``<prefix>/<topic>/year=YYYY/month=MM/day=dd/<topic>+<kafkaPartition>+<startOffset>.<format>``. This partitioner requires the following connector configuration properties: * The ``locale`` configuration property specifies the JDK's locale used for formatting dates and times. For example, use ``en-US`` for US English, ``en-GB`` for UK English, and ``fr-FR`` for French (in France). These may vary by Java version; see the `available locales <http://www.localeplanet.com/java/>`__. * The ``timezone`` configuration property specifies the current timezone in which the dates and times will be treated. Use standard short names for timezones such as ``UTC`` or (without daylight savings) ``PST``, ``EST``, and ``ECT``, or longer standard names such as ``America/Los_Angeles``, ``America/New_York``, and ``Europe/Paris``. These may vary by Java version; see the `available timezones within each locale <http://www.localeplanet.com/java>`__, such as `those within the "en_US" locale <http://www.localeplanet.com/java/en-US/index.html>`__. * The ``timestamp.extractor`` configuration property determines how to obtain a timestamp from each record. Values can include ``Wallclock`` (the default) to use the system time when the record is processed, ``Record`` to use the timestamp of the |ak| record denoting when it was produced or stored by the broker, ``RecordField`` to extract the timestamp from one of the fields in the record's value as specified by the ``timestamp.field`` configuration property. * **Hourly Partitioner**: The ``io.confluent.connect.storage.partitioner.HourlyPartitioner`` is equivalent to the TimeBasedPartitioner with ``path.format='year'=YYYY/'month'=MM/'day'=dd/'hour'=HH`` and ``partition.duration.ms=3600000`` (one hour, for one GCS object in each hourly directory). This partitioner always results in GCS object names of the form ``<prefix>/<topic>/year=YYYY/month=MM/day=dd/hour=HH/<topic>+<kafkaPartition>+<startOffset>.<format>``. This partitioner requires the following connector configuration properties: * The ``locale`` configuration property specifies the JDK's locale used for formatting dates and times. For example, use ``en-US`` for US English, ``en-GB`` for UK English, ``fr-FR`` for French (in France). These may vary by Java version; see the `available locales <http://www.localeplanet.com/java/>`__. * The ``timezone`` configuration property specifies the current timezone in which the dates and times will be treated. Use standard short names for timezones such as ``UTC`` or (without daylight savings) ``PST``, ``EST``, and ``ECT``, or longer standard names such as ``America/Los_Angeles``, ``America/New_York``, and ``Europe/Paris``. These may vary by Java version; see the `available timezones within each locale <http://www.localeplanet.com/java>`__, such as `those within the "en_US" locale <http://www.localeplanet.com/java/en-US/index.html>`__. * The ``timestamp.extractor`` configuration property determines how to obtain a timestamp from each record. Values can include ``Wallclock`` (the default) to use the system time when the record is processed, ``Record`` to use the timestamp of the |ak| record denoting when it was produced or stored by the broker, ``RecordField`` to extract the timestamp from one of the fields in the record's value as specified by the ``timestamp.field`` configuration property. As noted below, the choice of ``timestamp.extractor`` affects whether the GCS connector can support exactly once delivery. You can also choose to use a custom partitioner by implementing the ``io.confluent.connect.storage.partitioner.Partitioner`` interface, packaging your implementation into a JAR file, and then: #. Place the JAR file into the ``share/java/kafka-connect-gcs`` directory of your |cp| installation **on each worker node**. #. Restart all of the |kconnect| worker nodes. #. Configure GCS connectors to use your fully-qualified partitioner class name. .. _gcs_formats: ------------------ GCS Object Formats ------------------ The GCS connector can serialize multiple records into each GCS object using a number of formats. The connector's ``format.class`` configuration property identifies the name of the Java class that implements the ``io.confluent.connect.storage.format.Format`` interface. The GCS connector comes with several implementations: * **Avro**: Use ``format.class=io.confluent.connect.gcs.format.avro.AvroFormat`` to write the GCS object as an Avro container file and will include the Avro schema in the container file followed by one or more records. The connector's ``avro.codec`` configuration property specifies the Avro compression code, and values can be ``null`` (the default) for no Avro compression, ``deflate`` to use the deflate algorithm as specified in `RFC 1951 <http://www.isi.edu/in-notes/rfc1951.txt>`__, ``snappy`` to use Google's `Snappy <http://code.google.com/p/snappy/>`__ compression library, and ``bzip2`` for BZip2 compression. Optionally set ``enhanced.avro.schema.support=true`` to enable enum symbol preservation and package name awareness. * **JSON**: Use ``format.class=io.confluent.connect.gcs.format.json.JsonFormat`` to write the GCS object as a single JSON array containing a JSON object for each record. The connector's ``gcs.compression.type`` configuration property can be set to ``none`` (the default) for no compression or ``gzip`` for GZip compression. * **Raw Bytes**: Use ``format.class=io.confluent.connect.gcs.format.bytearray.ByteArrayFormat`` to write the raw serialized record values delimited with the JDK's line separator to the GCS object. This requires using the ``value.converter=org.apache.kafka.connect.converters.ByteArrayConverter`` with the connector. Use a different delimiter by specifying the connect's ``format.bytearray.separator`` configuration property. You can also choose to use a custom partitioner by implementing the ``io.confluent.connect.storage.format.Format`` interface, packaging your implementation into a JAR file, and then: #. Place the JAR file into the ``share/java/kafka-connect-gcs`` directory of your |cp| installation **on each worker node**. #. Restart all of the |kconnect| worker nodes. #. Configure GCS connectors with ``format.class`` set to the fully-qualified class name of your format implementation. .. _gcs_rotations: ------------------ GCS Object Uploads ------------------ As the GCS connector processes each record, it uses the partitioner to determine into which encoded partition that record should be written. This continues for each partition until the connector determines that a partition has enough records and should be uploaded to the GCS bucket using the GCS object name for that partition. This technique of knowing when to flush a partition file and upload it to GCS is called the *rotation strategy*, and there are a number of ways to control this behavior: * **Maximum number of records**: The connector's ``flush.size`` configuration property specifies the maximum number of records that should be written to a single GCS object. There is no default for this setting. * **Maximum span of record time**: The connector's ``rotate.interval.ms`` specifies the maximum timespan in milliseconds a file can remain open and ready for additional records. The timestamp for each file starts with the *record timestamp* of the first record written to the file, as determined by the partitioner's ``timestamp.extractor``. As long as the next record's timestamp fits within the timespan specified by the ``rotate.interval.ms``, the record will be written to the file; if a record's timestamp does not fit within the timespan of the file, the connector will flush the file, uploaded it to GCS, commit the offsets of the records in that file, and *then* create a new file with a timespan that starts with the first record and writes the first record to the file. * **Scheduled rotation**: The connector's ``rotate.scheduled.interval.ms`` specifies the maximum timespan in milliseconds a file can remain open and ready for additional records. Unlike with ``rotate.interval.ms``, with scheduled rotation the timestamp for each file starts with the **system time** that the first record is written to the file. As long as a record is processed within the timespan specified by ``rotate.scheduled.interval.ms``, the record will be written to the file. As soon as a record is processed *after* the timespan for the current file, the file is flushed, uploaded to GCS, and the offset of the records in the file are committed. A new file is created with a timespan that starts with the current system time, and the record is written to the file. The commit will be performed at the scheduled time, regardless of the previous commit time or number of messages. This configuration is useful when you have to commit your data based on current server time, for example at the beginning of every hour. The default value ``-1`` means that this feature is disabled. These strategies can be combined as needed, and rotation occurs whenever any of the strategies signals a rotation. The first strategy will cause a rotation as soon as enough records have been written to the file, and can be calculated *after* each record has been written to the file. In other words, the file can be closed and uploaded to GCS as soon as it is full. When using ``rotate.interval.ms``, the connector only closes and uploads a file to GCS when the next file does not belong based upon that record's timestamp. In other words, if the connector has no more records to process, the connector may keep the file open for a significant period of time -- until the connector can process another record. Scheduled rotation uses ``rotate.schedule.interval.ms`` to close the file and upload to GCS on a regular basis using the current time, rather than the record time. Even if the connector has no more records to process, |kconnect| will still call the connector at least every ``offset.flush.interval.ms`` as defined in the |kconnect| worker's configuration file. And every time this occurs, the connector uses the current time to determine if the currently opened file should be closed and uploaded to GCS. .. note:: Not all rotation strategy are compatible with the GCS connector's ability to deliver GCS objects exactly once with eventual consistency. See the :ref:`Exactly Once section <gcs_exactly_once>` below for details. The GCS object uploaded by the connector can be quite large, and the connector supports using a multi-part upload mechanism. The ``gcs.part.size`` configuration property defaults to ``26214400`` bytes (25MB), and specifies the maximum size of each GCS object part used to upload a single GCS object. Additionally, the ``schema.compatibility`` setting (see :ref:`Schema Evolution <s3_schema_evolution>`) will also affect when one file is closed and uploaded to an GCS object. If a record cannot be written to one file because its schema has changed relative to the records already in the file, the connector will rotate by closing the file, uploading it to GCS, committing offsets for the records in the file, creating a new file and writing the new record. .. _gcs_exactly_once: Exactly-once delivery on top of eventual consistency ---------------------------------------------------- The GCS connector is able to provide exactly-once semantics to consumers of the objects it exports to GCS, under the condition that the connector is supplied with a deterministic partitioner. Currently, out of the available partitioners, the default and field partitioners are always deterministic. ``TimeBasedPartitioner`` can be deterministic with some configurations, discussed below. This implies that, when any of these partitioners is used, splitting of files always happens at the same offsets for a given set of Kafka records. These partitioners take into account ``flush.size`` and ``schema.compatibility`` to decide when to roll and save a new file to GCS. The connector always delivers files in GCS that contain the same records, even under the presence of failures. If a connector task fails before an upload completes, the file does not become visible to GCS. If, on the other hand, a failure occurs after the upload has completed but before the corresponding offset is committed to |ak| by the connector, then a re-upload will take place. However, such a re-upload is transparent to the user of the GCS bucket, who at any time will have access to the same records made eventually available by successful uploads to GCS. To guarantee exactly-once semantics with the ``TimeBasedPartitioner``, the connector must be configured to use a deterministic implementation of ``TimestampExtractor`` and a deterministic rotation strategy. The deterministic timestamp extractors are Kafka records (``timestamp.extractor=Record``) or record fields (``timestamp.extractor=RecordField``). The deterministic rotation strategy configuration is ``rotate.interval.ms`` (setting ``rotate.schedule.interval.ms`` is nondeterministic and will invalidate exactly-once guarantees). .. _gcs_schema_evolution: Schema Evolution ---------------- The GCS connector supports schema evolution and reacts to schema changes of data according to the ``schema.compatibility`` configuration. This section describes how the connector reacts to schema evolution under different values of ``schema.compatibility``. The ``schema.compatibility`` can be set to ``NONE``, ``BACKWARD``, ``FORWARD`` and ``FULL``, which means NO compatibility, BACKWARD compatibility, FORWARD compatibility and FULL compatibility respectively. * **NO Compatibility**: By default, the ``schema.compatibility`` is set to ``NONE``. In this case, the connector ensures that each file written to GCS has the proper schema. When the connector observes a schema change in data, it commits the current set of files for the affected topic partitions and writes the data with new schema in new files. * **BACKWARD Compatibility**: If a schema is evolved in a backward compatible way, we can always use the latest schema to query all the data uniformly. For example, removing fields is backward compatible change to a schema, since when we encounter records written with the old schema that contain these fields we can just ignore them. Adding a field with a default value is also backward compatible. If ``BACKWARD`` is specified in the ``schema.compatibility``, the connector keeps track of the latest schema used in writing data to GCS, and if a data record with a schema version larger than current latest schema arrives, the connector commits the current set of files and writes the data record with new schema to new files. For data records arriving at a later time with schema of an earlier version, the connector projects the data record to the latest schema before writing to the same set of files in GCS. * **FORWARD Compatibility**: If a schema is evolved in a forward compatible way, we can always use the oldest schema to query all the data uniformly. Removing a field that had a default value is forward compatible, since the old schema will use the default value when the field is missing. If ``FORWARD`` is specified in the ``schema.compatibility``, the connector projects the data to the oldest schema before writing to the same set of files in GCS. * **FULL Compatibility**: Full compatibility means that old data can be read with the new schema and new data can also be read with the old schema. If ``FULL`` is specified in the ``schema.compatibility``, the connector performs the same action as ``BACKWARD``. Schema evolution in the GCS connector works in the same way as in the :ref:`HDFS connector <connect_hdfs>` and :ref:`S3 connector <connect_s3>`. .. _gcs_automatic_retries: Automatic Retries ----------------- The GCS connector may experience problems writing to the GCS bucket, due to network partitions, interruptions, or even AWS throttling limits. In many cases, the connector will retry the request a number of times before failing. To prevent from further overloading the network or GCS service, the connector uses an exponential backoff technique to give the network and/or service time to recover. The technique adds randomness, called jitter, to the calculated backoff times to prevent a thundering herd, where large numbers of requests from many tasks are submitted concurrently and overwhelm the service. Randomness spreads out the retries from many tasks and should reduce the overall time required to complete all outstanding requests compared to simple exponential backoff. The goal is to spread out the requests to GCS as much as possible. The maximum number of retry attempts is dictated by the ``gcs.part.retries`` GCS connector configuration property, which defaults to three attempts. The delay for retries is dependent upon the connector's ``gcs.retry.backoff.ms`` configuration property, which defaults to 200 milliseconds. The actual delay is randomized, but the maximum delay can be calculated as a function of the number of retry attempts with ``${gcs.retry.backoff.ms} * 2 ^ (retry-1)``, where ``retry`` is the number of attempts taken so far in the current iteration. In order to keep the maximum delay within a reasonable duration, it is capped at 24 hours. For example, the following table shows the possible wait times before submitting each of the three retry attempts. .. table:: Range of backoff times for each retry using the default configuration :widths: auto ===== ===================== ===================== ============================================== Retry Minimum Backoff (sec) Maximum Backoff (sec) Total Potential Delay from First Attempt (sec) ===== ===================== ===================== ============================================== 1 0.0 0.2 0.2 2 0.0 0.4 0.6 3 0.0 0.8 1.4 ===== ===================== ===================== ============================================== Increasing the maximum number of retries adds more backoff: .. table:: Range of backoff times for additional retries :widths: auto ===== ===================== ===================== ============================================== Retry Minimum Backoff (sec) Maximum Backoff (sec) Total Potential Delay from First Attempt (sec) ===== ===================== ===================== ============================================== 4 0.0 1.6 3.0 5 0.0 3.2 6.2 6 0.0 6.4 12.6 7 0.0 12.8 25.4 8 0.0 25.6 51.0 9 0.0 51.2 102.2 10 0.0 102.4 204.6 ===== ===================== ===================== ============================================== At some point, maximum backoff time will reach saturation and will be capped at 24 hours. From the example below, all attempts starting with 20 will have maximum backoff time as 24 hours. .. table:: Range of backoff times when reaching the cap of 24 hours :widths: auto ===== ===================== ===================== ============================================== Retry Minimum Backoff (sec) Maximum Backoff (sec) Total Potential Delay from First Attempt (sec) ===== ===================== ===================== ============================================== 15 0.0 3276.8 6553.4 16 0.0 6553.6 13107.0 17 0.0 13107.2 26214.2 18 0.0 26214.4 52428.6 19 0.0 52428.8 104857.4 20 0.0 86400.0 191257.4 21 0.0 86400.0 277657.4 ===== ===================== ===================== ============================================== It's not advised to set ``gcs.part.retries`` too high since making more attempts after reaching a cap of 24 hours isn't practical. You can adjust both the ``gcs.part.retries`` and ``gcs.retry.backoff.ms`` connector configuration properties to achieve the desired retry and backoff characteristics. .. _gcs_sink_quick_start: Quick Start ----------- In this quick start, we use the GCS connector to export data produced by the Avro console producer to GCS. It is assumed that the connector was installed using ``confluent-hub`` command line tool as described in the previous section. ---------------- Prepare a Bucket ---------------- Before you begin, you will need to create a GCS destination bucket. In order to use the web interface, navigate to the Google Cloud Platform console website, and use the GUI to create a bucket. To download credentials for the newly created bucket, navigate to **APIs & Services -> Credentials**. From the Credentials page, you can **Create credentials**, then select **Service Account Key**. Select the appropriate account, and download the JSON file. The location of the downloaded JSON file will be used in your GCS Connector configuration. To use the CLI, use the ``gsutil mb`` command. Installation and usage documentation at the `Google Cloud SDK page <https://cloud.google.com/sdk/>`_. ---------- Start |cp| ---------- Next, start the services with one command using Confluent command-line tool: .. codewithvars:: bash |confluent_start| Every service will start in order, printing a message with its status: .. include:: ../../includes/cli.rst :start-after: CE_CLI_startup_output :end-before: COS_CP_CLI_startup_output To import a few records with a simple schema in |ak|, start the Avro console producer as follows: .. codewithvars:: bash kafka-avro-console-producer --broker-list localhost:9092 --topic gcs_topic \ --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}' Then, in the console producer, type in: .. codewithvars:: bash {"f1": "value1"} {"f1": "value2"} {"f1": "value3"} {"f1": "value4"} {"f1": "value5"} {"f1": "value6"} {"f1": "value7"} {"f1": "value8"} {"f1": "value9"} The nine records entered are published to the |ak| topic ``gcs_topic`` in Avro format. ------------------- Start the Connector ------------------- Before starting the connector, create a config file. For example, save the following properties file as ``quickstart-gcs.propreties``, or edit the file that is included in the connector archive (under the "etc" directory): .. sourcecode:: properties name=gcs-sink connector.class=io.confluent.connect.gcs.GcsSinkConnector tasks.max=1 topics=gcs_topic gcs.bucket.name=#bucket-name gcs.part.size=5242880 flush.size=3 gcs.credentials.path=#/path/to/credentials/keys.json storage.class=io.confluent.connect.gcs.storage.GcsStorage format.class=io.confluent.connect.gcs.format.avro.AvroFormat partitioner.class=io.confluent.connect.storage.partitioner.DefaultPartitioner schema.compatibility=NONE confluent.topic.bootstrap.servers=localhost:9092 confluent.topic.replication.factor=1 # Uncomment and insert license for production use # confluent.license= Fill in appropriate values for ``gcs.bucket.name`` and ``gcs.credentials.path``. It is recommended to use absolute paths. Then start the GCS connector by loading its configuration with the following command. .. include:: ../../includes/confluent-local-consume-limit.rst .. include:: ../../includes/confluent-local-consume-limit.rst .. codewithvars:: bash |confluent_load| gcs|dash| -d quickstart-gcs.properties { "name": "gcs", "config": { "connector.class": "io.confluent.connect.gcs.GcsSinkConnector", "tasks.max": "1", "topics": "gcs_topic", "gcs.bucket.name": "#bucket-name", "gcs.part.size": "5242880", "flush.size": "3", "gcs.credentials.path": "#/path/to/credentials/keys.json", "storage.class": "io.confluent.connect.gcs.storage.GcsStorage", "format.class": "io.confluent.connect.gcs.format.avro.AvroFormat", "partitioner.class": "io.confluent.connect.storage.partitioner.DefaultPartitioner", "schema.compatibility": "NONE", "confluent.topic.bootstrap.servers": "localhost:9092", "confluent.topic.replication.factor": "1", "name": "gcs" }, "tasks": [], "type": null } Towards the end of the log you should see that the connector starts, logs a few messages, and then uploads data from |ak| to GCS. Once the connector has ingested some records check that the data is available in GCS, for instance by viewing the bucket in the GCS web browser console. You should see three objects with keys: .. codewithvars:: bash topics/gcs_topic/partition=0/gcs_topic+0+0000000000.avro topics/gcs_topic/partition=0/gcs_topic+0+0000000003.avro topics/gcs_topic/partition=0/gcs_topic+0+0000000006.avro Each file name is encoded as ``<topic>+<kafkaPartition>+<startOffset>.<format>``. To verify the contents, use ``avro-tools-1.8.2.jar`` (available in `Apache mirrors <http://mirror.metrocast.net/apache/avro/avro-1.8.2/java/avro-tools-1.8.2.jar>`_) to print the records: .. codewithvars:: bash java -jar avro-tools-1.8.2.jar tojson gcs_topic+0+0000000000.avro For the file above, you should see the following output: .. codewithvars:: bash {"f1":"value1"} {"f1":"value2"} {"f1":"value3"} with the rest of the records contained in the other two files. Finally, stop the |kconnect| worker as well as all the rest of the Confluent services by running: .. codewithvars:: bash |confluent_stop| or stop all the services and additionally wipe out any data generated during this quick start by running: .. codewithvars:: bash |confluent_destroy| Additional Documentation ------------------------ .. toctree:: :maxdepth: 1 configuration_options