.. _connect_azure_blob_storage: |kconnect-long| |az| Blob Storage Sink Connector ================================================ .. note:: If you are using |ccloud|, see https://docs.confluent.io/cloud/current/connectors/cc-azure-blob-sink.html for the cloud Quick Start. .. include:: includes/introduction.rst Features -------- The |az-long| Blob Storage connector offers a variety of features: * **Exactly Once Delivery**: Records that are exported using a deterministic partitioner are delivered with exactly-once semantics. * **Pluggable Data Format with or without Schema**: Out of the box, the connector supports writing data to |az| Blob Storage in Avro and JSON format. Besides records with schema, the connector supports exporting plain JSON and as byte array records without schema in text files, one record per-line. In general, the connector may accept any format that provides an implementation of the ``Format`` interface. * **Schema Evolution**: When schemas are used, the connector supports schema evolution based on schema compatibility modes. The available modes are: ``NONE``, ``BACKWARD``, ``FORWARD`` and ``FULL`` and a selection can be made by setting the property ``schema.compatibility`` in the connector's configuration. When the connector observes a schema change, it decides whether to roll the file or project the record to the proper schema according to the ``schema.compatibility`` configuration in use. * **Pluggable Partitioner**: The connector comes out of the box with partitioners that support default partitioning based on |ak-tm| partitions, field partitioning, and time-based partitioning in days or hours. You may implement your own partitioners by extending the ``Partitioner`` class. Additionally, you can customize time based partitioning by extending the ``TimeBasedPartitioner`` class. Install |az| Blob Storage Connector ----------------------------------- .. include:: ../includes/connector-install-hub.rst .. codewithvars:: bash confluent-hub install confluentinc/kafka-connect-azure-blob-storage:latest .. include:: ../includes/connector-install-version.rst .. codewithvars:: bash confluent-hub install confluentinc/kafka-connect-azure-blob-storage:1.1.1 -------------------------- Install Connector Manually -------------------------- `Download and extract the ZIP file `_ for your connector and follow the manual connector installation :ref:`instructions `. .. _azure-blob-connect-license: License ------- .. include:: ../includes/enterprise-license.rst See :ref:`azure-blob-connector-license-config` for license properties and :ref:`azure-blob-license-topic-configuration` for information about the license topic. .. _azure_blob_storage_mapping_records: Mapping Records to |az| Blob Storage Objects -------------------------------------------- The |az| Blob Storage connector consumes records from the specified topics, organizes them into different partitions, writes batches of records in each partition to an file, and then uploads those files to the |az| Blob Storage bucket. It uses |az| Blob Storage object paths that include the |ak| topic and partition, the computed partition, and the filename. The |az| Blob Storage connector offers several ways to customize this behavior, including: * :ref:`Controlling the names of the |az| Blob Storage objects ` * :ref:`Determining how records are partitioned into |az| Blob Storage objects ` * :ref:`The format used to serialize sets of records into |az| Blob Storage objects ` * :ref:`When to upload |az| Blob Storage objects ` .. _azure_blob_storage_object_names: ------------------------------ |az| Blob Storage Object Names ------------------------------ The |az| Blob Storage data model is a flat structure: each bucket stores objects, and the name of each |az| Blob Storage object serves as the unique key. However, a logical hierarchy can be inferred when the |az| Blob Storage object names uses directory delimiters, such as ``/``. The |az| Blob Storage connector allows you to customize the names of the |az| Blob Storage objects it uploads to the |az| Blob Storage bucket. In general, the names of the |az| Blob Storage object uploaded by the |az| Blob Storage connector follow this format: .. codewithvars:: bash ///++. where: * ```` is specified with the connector's ``topics.dir`` configuration property, which defaults to the literal value ``topics`` and helps create uniquely named |az| Blob Storage objects that don't clash with existing |az| Blob Storage objects in the same bucket. * ```` corresponds to the name of the |ak| topic from which the records in this |az| Blob Storage object were read. * ```` is generated by the |az| Blob Storage connector's partitioner (see :ref:`azure_blob_storage_partitioners`). * ```` is the |ak| partition number from which the records in this |az| Blob Storage object were read. * ```` is the |ak| offset of the first record written to this |az| Blob Storage object. * ```` is the extension identifing the format in which the records are serialized in this |az| Blob Storage object. If desired, the ``/`` and ``+`` characters can be changed using the connector's ``directory.delim`` and ``file.delim`` configuration properties. .. _azure_blob_storage_partitioners: --------------------------------------------------- Partitioning Records into |az| Blob Storage Objects --------------------------------------------------- The |az| Blob Storage connector's *partitioner* determines how records read from a |ak| topic are partitioned into |az| Blob Storage objects. The partitioner determines the ```` portion of the |az| Blob Storage object names (see :ref:`azure_blob_storage_object_names`). The partitioner is specified in the connector configuration with the ``partitioner.class`` configuration property. The |az| Blob Storage connector comes with the following partitioners: * **Default (|ak|) Partitioner**: The ``io.confluent.connect.storage.partitioner.DefaultPartitioner`` preserves the same topic partitions as in |ak|, and records from each topic partition ultimately end up in |az| Blob Storage objects with names that include the |ak| topic and |ak| partitions. The ```` is always ``/partition=``, resulting in |az| Blob Storage object names such as ``//partition=/++.``. * **Field Partitioner**: The ``io.confluent.connect.storage.partitioner.FieldPartitioner`` determines the partition from the field within each each record identified by the connector's ``partition.field.name`` configuration property, which has no default. This partitioner requires ``STRUCT`` record type values. The ```` is always ``/=``, resulting in |az| Blob Storage object names of the form ``//=/++.``. * **Time Based Partitioner**: The ``io.confluent.connect.storage.partitioner.TimeBasedPartitioner`` determines the partition from the year, month, day, hour, minutes, and/or seconds. This partitioner requires the following connector configuration properties: * The ``path.format`` configuration property specifies the pattern used for the ```` portion of the |az| Blob Storage object name. For example, when ``path.format='year'=YYYY/'month'=MM/'day'=dd/'hour'=HH``, |az| Blob Storage object names will have the form ``//year=YYYY/month=MM/day=dd/hour=HH/++.``. * The ``partition.duration.ms`` configuration property defines the maximum granularity of the |az| Blob Storage objects within a single encoded partition directory. For example, setting ``partition.duration.ms=600000`` (10 minutes) will result in each |az| Blob Storage object in that directory having no more than 10 minutes of records. * The ``locale`` configuration property specifies the JDK's locale used for formatting dates and times. For example, use ``en-US`` for US English, ``en-GB`` for UK English, and ``fr-FR`` for French (in France). These may vary by Java version; see the `available locales `__. * The ``timezone`` configuration property specifies the current timezone in which the dates and times will be treated. Use standard short names for timezones such as ``UTC`` or (without daylight savings) ``PST``, ``EST``, and ``ECT``, or longer standard names such as ``America/Los_Angeles``, ``America/New_York``, and ``Europe/Paris``. These may vary by Java version; see the `available timezones within each locale `__, such as `those within the "en_US" locale `__. * The ``timestamp.extractor`` configuration property determines how to obtain a timestamp from each record. Values can include ``Wallclock`` (the default) to use the system time when the record is processed, ``Record`` to use the timestamp of the |ak| record denoting when it was produced or stored by the broker, ``RecordField`` to extract the timestamp from one of the fields in the record's value as specified by the ``timestamp.field`` configuration property. * **Daily Partitioner**: The ``io.confluent.connect.storage.partitioner.DailyPartitioner`` is equivalent to the TimeBasedPartitioner with ``path.format='year'=YYYY/'month'=MM/'day'=dd`` and ``partition.duration.ms=86400000`` (one day, for one |az| Blob Storage object in each daily directory). This partitioner always results in |az| Blob Storage object names of the form ``//year=YYYY/month=MM/day=dd/++.``. This partitioner requires the following connector configuration properties: * The ``locale`` configuration property specifies the JDK's locale used for formatting dates and times. For example, use ``en-US`` for US English, ``en-GB`` for UK English, and ``fr-FR`` for French (in France). These may vary by Java version; see the `available locales `__. * The ``timezone`` configuration property specifies the current timezone in which the dates and times will be treated. Use standard short names for timezones such as ``UTC`` or (without daylight savings) ``PST``, ``EST``, and ``ECT``, or longer standard names such as ``America/Los_Angeles``, ``America/New_York``, and ``Europe/Paris``. These may vary by Java version; see the `available timezones within each locale `__, such as `those within the "en_US" locale `__. * The ``timestamp.extractor`` configuration property determines how to obtain a timestamp from each record. Values can include ``Wallclock`` (the default) to use the system time when the record is processed, ``Record`` to use the timestamp of the |ak| record denoting when it was produced or stored by the broker, ``RecordField`` to extract the timestamp from one of the fields in the record's value as specified by the ``timestamp.field`` configuration property. * **Hourly Partitioner**: The ``io.confluent.connect.storage.partitioner.HourlyPartitioner`` is equivalent to the TimeBasedPartitioner with ``path.format='year'=YYYY/'month'=MM/'day'=dd/'hour'=HH`` and ``partition.duration.ms=3600000`` (one hour, for one |az| Blob Storage object in each hourly directory). This partitioner always results in |az| Blob Storage object names of the form ``//year=YYYY/month=MM/day=dd/hour=HH/++.``. This partitioner requires the following connector configuration properties: * The ``locale`` configuration property specifies the JDK's locale used for formatting dates and times. For example, use ``en-US`` for US English, ``en-GB`` for UK English, ``fr-FR`` for French (in France). These may vary by Java version; see the `available locales `__. * The ``timezone`` configuration property specifies the current timezone in which the dates and times will be treated. Use standard short names for timezones such as ``UTC`` or (without daylight savings) ``PST``, ``EST``, and ``ECT``, or longer standard names such as ``America/Los_Angeles``, ``America/New_York``, and ``Europe/Paris``. These may vary by Java version; see the `available timezones within each locale `__, such as `those within the "en_US" locale `__. * The ``timestamp.extractor`` configuration property determines how to obtain a timestamp from each record. Values can include ``Wallclock`` (the default) to use the system time when the record is processed, ``Record`` to use the timestamp of the |ak| record denoting when it was produced or stored by the broker, ``RecordField`` to extract the timestamp from one of the fields in the record's value as specified by the ``timestamp.field`` configuration property. As noted below, the choice of ``timestamp.extractor`` affects whether the |az| Blob Storage connector can support exactly once delivery. You can also choose to use a custom partitioner by implementing the ``io.confluent.connect.storage.partitioner.Partitioner`` interface, packaging your implementation into a JAR file, and then: #. Place the JAR file into the ``share/java/kafka-connect-azure_blob_storage`` directory of your |cp| installation **on each worker node**. #. Restart all of the |kconnect| worker nodes. #. Configure |az| Blob Storage connectors to use your fully-qualified partitioner class name. .. _azure_blob_storage_formats: -------------------------------- |az| Blob Storage Object Formats -------------------------------- The |az| Blob Storage connector can serialize multiple records into each |az| Blob Storage object using a number of formats. The connector's ``format.class`` configuration property identifies the name of the Java class that implements the ``io.confluent.connect.storage.format.Format`` interface, and the |az| Blob Storage connector comes with several implementations: * **Avro**: Use ``format.class=io.confluent.connect.azure.blob.format.avro.AvroFormat`` to write the |az| Blob Storage object as an Avro container file and will include the Avro schema in the container file followed by one or more records. The connector's ``avro.codec`` configuration property specifies the Avro compression code, and values can be ``null`` (the default) for no Avro compression, ``deflate`` to use the deflate algorithm as specified in `RFC 1951 `__, ``snappy`` to use Google's `Snappy `__ compression library, and ``bzip2`` for BZip2 compression. Optionally set ``enhanced.avro.schema.support=true`` to enable enum symbol preservation and package name awareness. * **JSON**: Use ``format.class=io.confluent.connect.azure.blob.format.json.JsonFormat`` to write the |az| Blob Storage object as a single JSON array containing a JSON object for each record. The connector's ``azblob.compression.type`` configuration property can be set to ``none`` (the default) for no compression or ``gzip`` for GZip compression. * **Raw Bytes**: Use ``format.class=io.confluent.connect.azure.blob.format.bytearray.ByteArrayFormat`` to write the raw serialized record values delimited with the JDK's line separator to the |az| Blob Storage object. This requires using the ``value.converter=org.apache.kafka.connect.converters.ByteArrayConverter`` with the connector. Use a different delimiter by specifying the connect's ``format.bytearray.separator`` configuration property. You can also choose to use a custom partitioner by implementing the ``io.confluent.connect.storage.format.Format`` interface, packaging your implementation into a JAR file, and then: #. Place the JAR file into the ``share/java/kafka-connect-azure_blob_storage`` directory of your |cp| installation **on each worker node**. #. Restart all of the |kconnect| worker nodes. #. Configure |az| Blob Storage connectors with ``format.class`` set to the fully-qualified class name of your format implementation. .. _azure_blob_storage_rotations: -------------------------------- |az| Blob Storage Object Uploads -------------------------------- As the |az| Blob Storage connector processes each record, it uses the partitioner to determine into which encoded partition that record should be written. This continues for each partition until the connector determines that a partition has enough records and should be uploaded to the |az| Blob Storage bucket using the |az| Blob Storage object name for that partition. This technique of knowing when to flush a partition file and upload it to |az| Blob Storage is called the *rotation strategy*, and there are a number of ways to control this behavior: * **Maximum number of records**: The connector's ``flush.size`` configuration property specifies the maximum number of records that should be written to a single |az| Blob Storage object. There is no default for this setting. * **Maximum span of record time**: The connector's ``rotate.interval.ms`` specifies the maximum timespan in milliseconds a file can remain open and ready for additional records. The timestamp for each file starts with the *record timestamp* of the first record written to the file, as determined by the partitioner's ``timestamp.extractor``. As long as the next record's timestamp fits within the timespan specified by the ``rotate.interval.ms``, the record will be written to the file; if a record's timestamp does not fit within the timespan of the file, the connector will flush the file, uploaded it to |az| Blob Storage, commit the offsets of the records in that file, and *then* create a new file with a timespan that starts with the first record and writes the first record to the file. * **Scheduled rotation**: The connector's ``rotate.scheduled.interval.ms`` specifies the maximum timespan in milliseconds a file can remain open and ready for additional records. Unlike with ``rotate.interval.ms``, with scheduled rotation the timestamp for each file starts with the **system time** that the first record is written to the file. As long as a record is processed within the timespan specified by ``rotate.scheduled.interval.ms``, the record will be written to the file. As soon as a record is processed *after* the timespan for the current file, the file is flushed, uploaded to |az| Blob Storage, and the offset of the records in the file are committed. A new file is created with a timespan that starts with the current system time, and the record is written to the file. The commit will be performed at the scheduled time, regardless of the previous commit time or number of messages. This configuration is useful when you have to commit your data based on current server time, for example at the beginning of every hour. The default value ``-1`` means that this feature is disabled. These strategies can be combined as needed, and rotation occurs whenever any of the strategies signals a rotation. The first strategy will cause a rotation as soon as enough records have been written to the file, and can be calculated *after* each record has been written to the file. In other words, the file can be closed and uploaded to |az| Blob Storage as soon as it is full. When using ``rotate.interval.ms``, the connector only closes and uploads a file to |az| Blob Storage when the next file does not belong based upon that record's timestamp. In other words, if the connector has no more records to process, the connector may keep the file open for a significant period of time -- until the connector can process another record. Scheduled rotation uses ``rotate.schedule.interval.ms`` to close the file and upload to |az| Blob Storage on a regular basis using the current time, rather than the record time. Even if the connector has no more records to process, |kconnect| will still call the connector at least every ``offset.flush.interval.ms`` as defined in the |kconnect| worker's configuration file. And every time this occurs, the connector uses the current time to determine if the currently opened file should be closed and uploaded to |az| Blob Storage. .. note:: Not all rotation strategy are compatible with the |az| Blob Storage connector's ability to deliver |az| Blob Storage objects exactly once with eventual consistency. See the :ref:`Exactly Once section ` below for details. The |az| Blob Storage object uploaded by the connector can be quite large, and the connector supports using a multi-part upload mechanism. The ``azure_blob_storage.part.size`` configuration property defaults to ``26214400`` bytes (25MB), and specifies the maximum size of each |az| Blob Storage object part used to upload a single |az| Blob Storage object. Additionally, the ``schema.compatibility`` setting (see :ref:`Schema Evolution `) will also affect when one file is closed and uploaded to an |az| Blob Storage object. If a record cannot be written to one file because its schema has changed relative to the records already in the file, the connector will rotate by closing the file, uploading it to |az| Blob Storage, committing offsets for the records in the file, creating a new file and writing the new record. .. _azure_blob_storage_exactly_once: Exactly-once delivery on top of eventual consistency ---------------------------------------------------- The |az| Blob Storage connector is able to provide exactly-once semantics to consumers of the objects it exports to |az| Blob Storage, under the condition that the connector is supplied with a deterministic partitioner. Currently, out of the available partitioners, the default and field partitioners are always deterministic. ``TimeBasedPartitioner`` can be deterministic with some configurations, discussed below. This implies that, when any of these partitioners is used, splitting of files always happens at the same offsets for a given set of Kafka records. These partitioners take into account ``flush.size`` and ``schema.compatibility`` to decide when to roll and save a new file to |az| Blob Storage. The connector always delivers files in |az| Blob Storage that contain the same records, even under the presence of failures. If a connector task fails before an upload completes, the file does not become visible to |az| Blob Storage. If, on the other hand, a failure occurs after the upload has completed but before the corresponding offset is committed to |ak| by the connector, then a re-upload will take place. However, such a re-upload is transparent to the user of the |az| Blob Storage bucket, who at any time will have access to the same records made eventually available by successful uploads to |az| Blob Storage. To guarantee exactly-once semantics with the ``TimeBasedPartitioner``, the connector must be configured to use a deterministic implementation of ``TimestampExtractor`` and a deterministic rotation strategy. The deterministic timestamp extractors are Kafka records (``timestamp.extractor=Record``) or record fields (``timestamp.extractor=RecordField``). The deterministic rotation strategy configuration is ``rotate.interval.ms`` (setting ``rotate.schedule.interval.ms`` is nondeterministic and will invalidate exactly-once guarantees). .. _azure_blob_storage_schema_evolution: Schema Evolution ---------------- The |az| Blob Storage connector supports schema evolution and reacts to schema changes of data according to the ``schema.compatibility`` configuration. This section describes how the connector reacts to schema evolution under different values of ``schema.compatibility``. The ``schema.compatibility`` can be set to ``NONE``, ``BACKWARD``, ``FORWARD`` and ``FULL``, which means NO compatibility, BACKWARD compatibility, FORWARD compatibility and FULL compatibility respectively. * **NO Compatibility**: By default, the ``schema.compatibility`` is set to ``NONE``. In this case, the connector ensures that each file written to |az| Blob Storage has the proper schema. When the connector observes a schema change in data, it commits the current set of files for the affected topic partitions and writes the data with new schema in new files. * **BACKWARD Compatibility**: If a schema is evolved in a backward compatible way, we can always use the latest schema to query all the data uniformly. For example, removing fields is backward compatible change to a schema, since when we encounter records written with the old schema that contain these fields we can just ignore them. Adding a field with a default value is also backward compatible. If ``BACKWARD`` is specified in the ``schema.compatibility``, the connector keeps track of the latest schema used in writing data to |az| Blob Storage, and if a data record with a schema version larger than current latest schema arrives, the connector commits the current set of files and writes the data record with new schema to new files. For data records arriving at a later time with schema of an earlier version, the connector projects the data record to the latest schema before writing to the same set of files in |az| Blob Storage. * **FORWARD Compatibility**: If a schema is evolved in a forward compatible way, we can always use the oldest schema to query all the data uniformly. Removing a field that had a default value is forward compatible, since the old schema will use the default value when the field is missing. If ``FORWARD`` is specified in the ``schema.compatibility``, the connector projects the data to the oldest schema before writing to the same set of files in |az| Blob Storage. * **FULL Compatibility**: Full compatibility means that old data can be read with the new schema and new data can also be read with the old schema. If ``FULL`` is specified in the ``schema.compatibility``, the connector performs the same action as ``BACKWARD``. Schema evolution in the |az| Blob Storage connector works in the same way as in the :ref:`HDFS connector ` and :ref:`S3 connector `. .. _azure_blob_storage_automatic_retries: Automatic Retries ----------------- The |az| Blob Storage connector may experience problems writing to the |az| Blob Storage bucket, due to network partitions, interruptions, or even AWS throttling limits. In many cases, the connector will retry the request a number of times before failing. To prevent from further overloading the network or |az| Blob Storage service, the connector uses an exponential backoff technique to give the network and/or service time to recover. The technique adds randomness, called jitter, to the calculated backoff times to prevent a thundering herd, where large numbers of requests from many tasks are submitted concurrently and overwhelm the service. Randomness spreads out the retries from many tasks and should reduce the overall time required to complete all outstanding requests compared to simple exponential backoff. The goal is to spread out the requests to |az| Blob Storage as much as possible. The maximum number of retry attempts is dictated by the ``azblob.part.retries`` |az| Blob Storage connector configuration property, which defaults to three attempts. The delay for retries is dependent upon the connector's ``azblob.retry.backoff.ms`` configuration property, which defaults to 200 milliseconds. The actual delay is randomized, but the maximum delay can be calculated as a function of the number of retry attempts with ``${azblob.retry.backoff.ms} * 2 ^ (retry-1)``, where ``retry`` is the number of attempts taken so far in the current iteration. In order to keep the maximum delay within a reasonable duration, it is capped at 24 hours. For example, the following table shows the possible wait times before submitting each of the three retry attempts. .. table:: Range of backoff times for each retry using the default configuration :widths: auto ===== ===================== ===================== ============================================== Retry Minimum Backoff (sec) Maximum Backoff (sec) Total Potential Delay from First Attempt (sec) ===== ===================== ===================== ============================================== 1 0.0 0.2 0.2 2 0.0 0.4 0.6 3 0.0 0.8 1.4 ===== ===================== ===================== ============================================== Increasing the maximum number of retries adds more backoff: .. table:: Range of backoff times for additional retries :widths: auto ===== ===================== ===================== ============================================== Retry Minimum Backoff (sec) Maximum Backoff (sec) Total Potential Delay from First Attempt (sec) ===== ===================== ===================== ============================================== 4 0.0 1.6 3.0 5 0.0 3.2 6.2 6 0.0 6.4 12.6 7 0.0 12.8 25.4 8 0.0 25.6 51.0 9 0.0 51.2 102.2 10 0.0 102.4 204.6 ===== ===================== ===================== ============================================== At some point, maximum backoff time will reach saturation and will be capped at 24 hours. From the example below, all attempts starting with 20 will have maximum backoff time as 24 hours. .. table:: Range of backoff times when reaching the cap of 24 hours :widths: auto ===== ===================== ===================== ============================================== Retry Minimum Backoff (sec) Maximum Backoff (sec) Total Potential Delay from First Attempt (sec) ===== ===================== ===================== ============================================== 15 0.0 3276.8 6553.4 16 0.0 6553.6 13107.0 17 0.0 13107.2 26214.2 18 0.0 26214.4 52428.6 19 0.0 52428.8 104857.4 20 0.0 86400.0 191257.4 21 0.0 86400.0 277657.4 ===== ===================== ===================== ============================================== It's not advised to set ``azblob.part.retries`` too high since making more attempts after reaching a cap of 24 hours isn't practical. You can adjust both the ``azblob.part.retries`` and ``azblob.retry.backoff.ms`` connector configuration properties to achieve the desired retry and backoff characteristics. .. _azure_blob_storage_sink_quick_start: Quick Start ----------- In this quick start, the |az| Blob Storage connector is used to export data produced by the Avro console producer to |az| Blob Storage. Before you begin, create an |az| Blob Storage destination container and grant **write access** to the user or IAM role completing these procedures. See `Create a block blob storage account `_ for additional information. Also if you're using Shared Key access instead of account keys: see `Configure Azure Storage connection strings `_ for additional information. By default, all resources in |az| Storage are secured and only available to the account owner. Install the connector through the :ref:`confluent_hub_client`. .. codewithvars:: bash # run from your Confluent Platform installation directory confluent-hub install confluentinc/kafka-connect-azure-blob-storage:latest .. tip:: By default, it will install the plugin into ``share/confluent-hub-components`` and add the directory to the plugin path. If this is the first connector you have installed, you may need to restart the connect server for the plugin path change to take effect. Also see `Azure Blob Storage CLI `_ for setup and using the CLI. Start the services using the :ref:`confluent_local`. .. include:: ../../includes/cli-new.rst .. codewithvars:: bash |confluent_start| Every service start in order, printing a message with its status. .. include:: ../../includes/cli.rst :start-after: CE_CLI_startup_output :end-before: COS_CP_CLI_startup_output .. note:: Make sure the |az| Blob Storage connector has write access to the |az| Blob Storage container shown in ``azblob.container.name`` and can deploy credentials successfully. To import a few records with a simple schema in |ak|, start the Avro console producer as follows: .. codewithvars:: bash ./bin/kafka-avro-console-producer --broker-list localhost:9092 --topic blob_topic \ --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}' Then, in the console producer, enter the following: .. codewithvars:: bash {"f1": "value1"} {"f1": "value2"} {"f1": "value3"} {"f1": "value4"} {"f1": "value5"} {"f1": "value6"} {"f1": "value7"} {"f1": "value8"} {"f1": "value9"} The nine records entered are published to the |ak| topic ``blob_topic`` in Avro format. Create a ``blob.properties`` file with the following contents: .. codewithvars:: properties name=blob-sink connector.class=io.confluent.connect.azure.blob.AzureBlobStorageSinkConnector tasks.max=1 topics=blob_topic flush.size=3 azblob.account.name=your-account azblob.account.key=your-key azblob.container.name=confluent-kafka-connect-azure-blob-storage-testing format.class=io.confluent.connect.azure.blob.format.avro.AvroFormat confluent.topic.bootstrap.servers=localhost:9092 confluent.topic.replication.factor=1 Before starting the connector, make sure that the configurations in ``blob.properties`` are properly set to your configurations of |az| Blob Storage. For this example, make sure that ``azblob.container.name`` points to your container, ``azblob.account.name`` is set to your account, and ``azblob.account.key`` is set to your key. Then start the |az| Blob Storage connector by loading its configuration with the following command. .. include:: ../../includes/confluent-local-consume-limit.rst .. codewithvars:: bash |confluent_load| blob-sink|dash| -d blob.properties { "name": "blob-sink", "config": { "name": "blob-sink" "connector.class": "io.confluent.connect.azure.blob.AzureBlobStorageSinkConnector", "tasks.max": "1", "topics": "blob_topic", "flush.size": "3", "azblob.account.name": "your-account", "azblob.account.key": "your-key", "azblob.container.name": "confluent-kafka-connect-azure-blob-storage-testing", "format.class": "io.confluent.connect.azure.blob.format.avro.AvroFormat", "confluent.topic.bootstrap.servers": "localhost:9092", "confluent.topic.replication.factor": "1", }, "tasks": [] } Check that the connector started successfully. Review the Connect worker's log by entering the following: .. codewithvars:: bash |confluent_log| connect Towards the end of the log you should see that the connector starts, logs a few messages, and then uploads data from |ak| to |az| Blob Storage. Once the connector has ingested some records, check that the data is available in |az| Blob Storage. Use the following |az| CLI command: .. codewithvars:: bash az storage blob list --container-name confluent-kafka-connect-azure-blob-storage-testing --output table You should see three objects with keys. .. codewithvars:: bash topics/blob_topic/partition=0/blob_topic+0+0000000000.avro topics/blob_topic/partition=0/blob_topic+0+0000000003.avro topics/blob_topic/partition=0/blob_topic+0+0000000006.avro Each file is encoded as ``++.``. To verify the contents, copy each file from |az| Blob Storage to your local filesystem. Use the following |az| CLI command: .. codewithvars:: bash az storage blob download --container-name confluent-kafka-connect-azure-blob-storage-testing --name topics/blob_topic/partition=0/blob_topic+0+0000000000.avro --file ~/blob_topic+0+0000000000.avro Use ``avro-tools-1.8.2.jar`` (available in `Apache mirrors `_) to print the records. .. codewithvars:: bash java -jar avro-tools-1.8.2.jar tojson blob_topic+0+0000000000.avro For the file above, you should see the following output: .. codewithvars:: bash {"f1":"value1"} {"f1":"value2"} {"f1":"value3"} The rest of the records are contained in the other two files. Finally, stop the Connect worker and all other Confluent services by running: .. codewithvars:: bash |confluent_stop| Your output should resemble: .. include:: ../../includes/cli.rst :start-after: ce_cli_stop_output_start :end-before: ce_cli_stop_output_stop You can stop all services and remove any data generated during this quick start by entering the following command: .. codewithvars:: bash |confluent_destroy| Your output should resemble: .. include:: ../../includes/cli.rst :start-after: ce_cli_stop_destroy_output_start :end-before: ce_cli_stop_destroy_output_stop Exactly-once delivery --------------------- The |az| Blob Storage connector is able to provide exactly-once semantics to consumers of the objects it exports to |az| Blob Storage, under the condition that the connector is supplied with a deterministic partitioner. Currently, out of the available partitioners, the default and field partitioners are always deterministic. ``TimeBasedPartitioner`` can be deterministic with some configurations, discussed below. This implies that, when any of these partitioners is used, splitting of files always happens at the same offsets for a given set of |ak| records. These partitioners take into account ``flush.size`` and ``schema.compatibility`` to decide when to roll and save a new file to |az| Blob Storage. The connector always delivers files in |az| Blob Storage that contain the same records, even under the presence of failures. If a connector task fails before an upload completes, the file does not become visible to |az| Blob Storage. If, on the other hand, a failure occurs after the upload has completed but before the corresponding offset is committed to |ak| by the connector, then a re-upload will take place. However, such a re-upload is transparent to the user of the |az| Blob Storage container, who at any time will have access to the same records made eventually available by successful uploads to |az| Blob Storage. To guarantee exactly-once semantics with the ``TimeBasedPartitioner``, the connector must be configured to use a deterministic implementation of ``TimestampExtractor`` and a deterministic rotation strategy. The deterministic timestamp extractors are |ak| records (``timestamp.extractor=Record``) or record fields (``timestamp.extractor=RecordField``). The deterministic rotation strategy configuration is ``rotate.interval.ms`` (setting ``rotate.schedule.interval.ms`` is nondeterministic and will invalidate exactly-once guarantees). .. image:: ../../images/connect-s3-eos.png Schema Evolution ---------------- The |az| Blob Storage connector supports schema evolution and reacts to schema changes of data according to the ``schema.compatibility`` configuration. This section, will explain how the connector reacts to schema evolution under different values of ``schema.compatibility``. The ``schema.compatibility`` can be set to ``NONE``, ``BACKWARD``, ``FORWARD`` and ``FULL``, which means NO compatibility, BACKWARD compatibility, FORWARD compatibility and FULL compatibility respectively. * **NO Compatibility**: By default, the ``schema.compatibility`` is set to ``NONE``. In this case, the connector ensures that each file written to |az| Blob Storage has the proper schema. When the connector observes a schema change in data, it commits the current set of files for the affected topic partitions and writes the data with new schema in new files. * **BACKWARD Compatibility**: If a schema is evolved in a backward compatible way, the connector can always use the latest schema to query all the data uniformly. For example, removing fields is backward compatible change to a schema, since when the connector encounters records written with the old schema that contain these fields the connector can just ignore them. Adding a field with a default value is also backward compatible. If ``BACKWARD`` is specified in the ``schema.compatibility``, the connector keeps track of the latest schema used in writing data to |az| Blob Storage, and if a data record with a schema version larger than current latest schema arrives, the connector commits the current set of files and writes the data record with new schema to new files. For data records arriving at a later time with schema of an earlier version, the connector projects the data record to the latest schema before writing to the same set of files in |az| Blob Storage. * **FORWARD Compatibility**: If a schema is evolved in a forward compatible way, the connector can always use the oldest schema to query all the data uniformly. Removing a field that had a default value is forward compatible, since the old schema will use the default value when the field is missing. If ``FORWARD`` is specified in the ``schema.compatibility``, the connector projects the data to the oldest schema before writing to the same set of files in |az| Blob Storage. * **Full Compatibility**: Full compatibility means that old data can be read with the new schema and new data can also be read with the old schema. If ``FULL`` is specified in the ``schema.compatibility``, the connector performs the same action as ``BACKWARD``. Schema evolution in the |az| Blob Storage connector works the same way as the :ref:`s3_schema_evolution`. Automatic Retries ----------------- The |az| Blob Storage connector may experience intermittent problems writing to the |az| Blob Storage container. This is because of network partitioning, interruptions, or throttling limits. In many cases, the connector retries the request a number of times before failing. To prevent overloading the network or |az| Blob Storage service, the connector uses an exponential backoff technique to give the network and service time to recover. The retries occur on requests which return HTTP status codes of greater than 500 excluding 501 and 505. The maximum number of retry attempts is dictated by the ``azblob.retry.retries`` |az| Blob Storage connector configuration property. This property defaults to three retry attempts (one initial attempt and three retry attempts). The delay for retries is dependent upon the connector's ``azblob.retry.backoff.ms`` configuration property, which defaults to 4000 milliseconds. The maximum delay between retries is capped by ``azblob.retry.max.backoff.ms``, which defaults to 120000 milliseconds. The actual delay is calculated by taking the lower value between ``${azblob.retry.backoff.ms} * (-1 + 2 ^ (attempt-1))`` and ``azblob.retry.max.backoff.ms``, where ``attempt`` is the number of attempts taken so far in the current iteration. For example, the following table shows the possible wait times before submitting each of the three retries: .. table:: Range of backoff times for each retry using the default configuration :widths: auto ======= ======================== ===================== Attempt Calculated Backoff (sec) Actual Backoff (sec) ======= ======================== ===================== 1 0.0 0.0 2 4.0 4.0 3 12.0 12.0 4 28.0 28.0 ======= ======================== ===================== The maximum backoff time eventually reaches saturation and is capped at ``azblob.retry.max.backoff.ms``. From the example below, all attempts starting with attempt six will have the maximum backoff time of 120 seconds: .. table:: Range of backoff times when reaching the default cap of two minutes :widths: auto ======= ======================== ===================== Attempt Calculated Backoff (sec) Actual Backoff (sec) ======= ======================== ===================== 4 28.0 28.0 5 60.0 60.0 6 124.0 120.0 7 252.0 120.0 8 508.0 120.0 9 1020.0 120.0 10 2044.0 120.0 ======= ======================== ===================== In the case of a secondary host being provided the behaviour is a little different. After a failed attempt to your normal ``store.url`` it will then delay by ``(.1 second * random(0.8, 1.2))`` before calling the ``azblob.retry.secondary.host``. It's not advised to set ``azblob.retry.max.backoff.ms`` too high since making more attempts after reaching 24 hours isn't practical. You can adjust both the ``azblob.retry.retries``, ``azblob.retry.backoff.ms``, ``azblob.retry.max.backoff.ms`` and ``azblob.retry.secondary.host`` connector configuration properties to achieve the desired retry and backoff characteristics. Also the timeout of a single HTTP request can be configured by editing ``azblob.connection.timeout.ms``. .. note:: When transferring large amounts of data, the default TryTimeout is probably not sufficient. You should override this value based on the bandwidth available to the host machine and proximity to the Storage service. A good starting point may be something like (60 seconds per MB of anticipated payload size). .. _azure-blob-storage-properties-file: ------------------------------------------------ Write JSON message values into |az| Blob Storage ------------------------------------------------ The example settings file is shown below: .. codewithvars:: bash name=blob-sink connector.class=io.confluent.connect.azure.blob.AzureBlobStorageSinkConnector tasks.max=1 topics=blob_topic flush.size=100 # Required configuration azblob.account.name=account azblob.account.key=accountkey # The following define the information used to validate the license stored in Kafka confluent.license= confluent.topic.bootstrap.servers=localhost:9092 The first few settings are common to most connectors. ``topics`` specifies the topics to export data from, in this case ``blob_topic``. The property ``flush.size`` specifies the number of records per partition the connector needs to write to before completing a multiblock upload to |az| Blob Storage. The ``azblob.account.name`` and ``azblob.account.key`` are your required |az| credentials. This is a licensed Confluent connector. Enter the following for testing purposes. `multiblock uploads `_ For more on the this look at :ref:`the Azure Blob Storage Licensing section`. .. codewithvars:: bash azblob.container.name=confluent-kafka-connect-azure-blob-storage-testing azblob.block.size=5242880 The next settings are specific to |az| Blob Storage. A mandatory setting is the name of your |az| Blob Storage container to host the exported |ak| records. Another useful configuration setting is ``azblob.block.size``. This setting controls the size of each block in the multiblock uploads used to upload a single chunk of |ak| records. .. codewithvars:: bash storage.class=io.confluent.connect.azure.blob.storage.AzureBlobStorage format.class=io.confluent.connect.azure.blob.format.json.JsonFormat schema.generator.class=io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator partitioner.class=io.confluent.connect.storage.partitioner.DefaultPartitioner These class settings are required to specify the |az| Block Storage interface, the output file format, which is currently ``io.confluent.connect.azure.blob.format.avro.AvroFormat``, ``io.confluent.connect.azure.blob.format.json.JsonFormat`` or ``io.confluent.connect.azure.blob.format.bytearray.ByteArrayFormat``, and the partitioner class along with its schema generator class. When using a format with no schema definition, it is sufficient to set the schema generator class to its default value. .. codewithvars:: bash schema.compatibility=NONE Finally, schema evolution is disabled in this example by setting ``schema.compatibility`` to ``NONE``, as explained above. For detailed descriptions for all the available configuration options of the |az| Blob Storage connector go to :ref:`azure-blob-storage-configuration-options`. --------------------------- Maximum file size exception --------------------------- If a file is created that exceeds the maximum of 50000 blocks, an exception occurs with the error: ``org.apache.kafka.connect.errors.ConnectException: Due to the Azure constraints, you can only have a block blob of up to a maximum of 50000 blocks and you've exceeded that limit. To resolve the issue please either decrease your flush.size to make the files smaller or increase your azblob.block.size.`` As the error explains a situation can occur when the file you are trying to create is greater than 50,000 multiplied by your ``azblob.block.size`` value. To resolve this issue either decrease the ``flush.size`` or increase the ``azblob.block.size``. If you are interested in learning more about |az| Blob Storage's constraints you can `read more here `_. ----------------------------------------------- Write raw message values into |az| Blob Storage ----------------------------------------------- It is possible to use the |az| Blob Storage connector to write out the unmodified original message values into newline-separated files in |az| Blob Storage. To accomplish this configure |kconnect-long| so it does not deserialize any of the messages, and configure the |az| Blob Storage connector to store the message values in a binary format in |az| Blob Storage. The first part of the |az| Blob Storage connector configuration is similar to other examples. .. codewithvars:: bash name=blob-raw-sink connector.class=io.confluent.connect.azure.blob.AzureBlobStorageSinkConnector tasks.max=1 topics=blob_topic flush.size=3 The ``topics`` setting specifies the topics you want to export data from, which is ``blob_topic`` in the example. The property ``flush.size`` specifies the number of records per partition the connector needs to write before completing a multipart upload to |az| Blob Storage. Next, configure container name, block size, and compression type. .. codewithvars:: bash azblob.container.name=confluent-kafka-connect-azure-blob-storage-testing azblob.block.size=5242880 azblob.compression.type=gzip The ``azblob.container.name`` is mandatory and names your |az| Blob Storage container where the exported |ak| records should be written. And since the |az| Blob Storage connector uses `multiblock uploads `_, you can use the ``azblob.block.size`` to control the size of each of these continuous parts used to upload |ak| records into a single |az| Blob Storage object. The part size affects throughput and latency, as an |az| Blob Storage object is visible/available only after all parts are uploaded. The ``azblob.compression.type`` specifies that the |az| Blob Storage connector should compress all |az| Blob Storage objects using GZIP compression, adding the ``.gz`` extension to any files (see below). So far this example configuration is relatively typical of most |az| Blob Storage connectors. Now configure the connector to read the raw message values and write them in binary format: .. codewithvars:: bash value.converter=org.apache.kafka.connect.converters.ByteArrayConverter format.class=io.confluent.connect.azure.blob.format.bytearray.ByteArrayFormat storage.class=io.confluent.connect.azure.blob.storage.AzureBlobStorage schema.compatibility=NONE The ``value.converter`` setting overrides the connector default in the |kconnect| worker configuration. ``ByteArrayConverter`` is used to instruct Connect to skip deserializing the message values and instead give the connector the message values in their raw binary form. The ``format.class`` setting is used to instruct the |az| Blob Storage connector to write these binary message values as-is into |az| Blob Storage objects. By default the message values written to the same |az| Blob Storage object are separated by a newline character sequence, but you can control this with the ``format.bytearray.separator`` setting, and you may want to consider this if your messages might contain newlines. Also, by default the files written to |az| Blob Storage have an extension of ``.bin`` (before compression, if enabled), or you can use the ``format.bytearray.extension`` setting to change the pre-compression filename extension. Next, you need to decide how you want to partition the consumed messages in |az| Blob Storage objects. You have a few options, including the default partitioner that preserves the same partitions as in |ak|: .. codewithvars:: bash partitioner.class=io.confluent.connect.storage.partitioner.DefaultPartitioner Or, you could partition using the timestamp of the |ak| messages. .. codewithvars:: bash partitioner.class=io.confluent.connect.storage.partitioner.TimeBasedPartitioner timestamp.extractor=Record Or, you can use the timestamp that the |az| Blob Storage connector processes each message. .. codewithvars:: bash partitioner.class=io.confluent.connect.storage.partitioner.TimeBasedPartitioner timestamp.extractor=Wallclock Custom partitioners are always an option, too. Just be aware that since the record value is an opaque binary value, |kconnect| cannot extract timestamps from fields using the ``RecordField`` option. The |az| Blob Storage connector configuration outlined above results in newline-delimited gzipped objects in |az| Blob Storage with ``.bin.gz``. Additional Documentation ------------------------ .. toctree:: :maxdepth: 1 configuration_options changelog