Kafka Connect Azure Blob Storage Sink Connector

Note

If you are using Confluent Cloud, see Azure Blob Storage Sink Connector for Confluent Cloud for the cloud Quick Start.

You can use the Azure Blob Storage connector to export data from Apache Kafka® topics to Azure Blob Storage objects in either Avro, JSON, or Bytes formats. Depending on your environment, the Azure Blob Storage connector can export data by guaranteeing exactly-once delivery semantics to consumers of the Azure Blob Storage objects it produces.

The Azure Blob Storage sink connector periodically polls data from Kafka and then uploads the data to Azure Blob Storage. A partitioner is used to split the data of every Kafka partition into chunks. Each chunk of data is represented as an Azure Blob Storage object. The key name encodes the topic, the Kafka partition, and the start offset of this data chunk.

If no partitioner is specified in the configuration, the default partitioner which preserves Kafka partitioning is used. The size of each data chunk is determined by the number of records written to Azure Blob Storage and by schema compatibility.

Features

The Microsoft Azure Blob Storage connector offers a variety of features:

  • Exactly Once Delivery: Records that are exported using a deterministic partitioner are delivered with exactly-once semantics.
  • Pluggable Data Format with or without Schema: Out of the box, the connector supports writing data to Azure Blob Storage in Avro and JSON format. Besides records with schema, the connector supports exporting plain JSON and as byte array records without schema in text files, one record per-line. In general, the connector may accept any format that provides an implementation of the Format interface.
  • Schema Evolution: When schemas are used, the connector supports schema evolution based on schema compatibility modes. The available modes are: NONE, BACKWARD, FORWARD and FULL and a selection can be made by setting the property schema.compatibility in the connector’s configuration. When the connector observes a schema change, it decides whether to roll the file or project the record to the proper schema according to the schema.compatibility configuration in use.
  • Pluggable Partitioner: The connector comes out of the box with partitioners that support default partitioning based on Apache Kafka® partitions, field partitioning, and time-based partitioning in days or hours. You may implement your own partitioners by extending the Partitioner class. Additionally, you can customize time based partitioning by extending the TimeBasedPartitioner class.

Install Azure Blob Storage Connector

Install the connector using Confluent Hub

Prerequisite
Confluent Hub Client must be installed. This is installed by default with Confluent Enterprise.

Navigate to your Confluent Platform installation directory and run this command to install the latest (latest) connector version. The connector must be installed on every machine where Connect will be run.

confluent-hub install confluentinc/kafka-connect-azure-blob-storage:latest

You can install a specific version by replacing latest with a version number. For example:

confluent-hub install confluentinc/kafka-connect-azure-blob-storage:1.1.1

Install Connector Manually

Download and extract the ZIP file for your connector and follow the manual connector installation instructions.

License

You can use this connector for a 30-day trial period without a license key.

After 30 days, this connector is available under a Confluent enterprise license. Confluent issues enterprise license keys to subscribers, along with providing enterprise-level support for Confluent Platform and your connectors. If you are a subscriber, please contact Confluent Support at support@confluent.io for more information.

See Confluent Platform license for license properties and License topic configuration for information about the license topic.

Mapping Records to Azure Blob Storage Objects

The Azure Blob Storage connector consumes records from the specified topics, organizes them into different partitions, writes batches of records in each partition to an file, and then uploads those files to the Azure Blob Storage bucket. It uses Azure Blob Storage object paths that include the Kafka topic and partition, the computed partition, and the filename. The Azure Blob Storage connector offers several ways to customize this behavior, including:

Azure Blob Storage Object Names

The Azure Blob Storage data model is a flat structure: each bucket stores objects, and the name of each Azure Blob Storage object serves as the unique key. However, a logical hierarchy can be inferred when the Azure Blob Storage object names uses directory delimiters, such as /. The Azure Blob Storage connector allows you to customize the names of the Azure Blob Storage objects it uploads to the Azure Blob Storage bucket.

In general, the names of the Azure Blob Storage object uploaded by the Azure Blob Storage connector follow this format:

<prefix>/<topic>/<encodedPartition>/<topic>+<kafkaPartition>+<startOffset>.<format>

where:

  • <prefix> is specified with the connector’s topics.dir configuration property, which defaults to the literal value topics and helps create uniquely named Azure Blob Storage objects that don’t clash with existing Azure Blob Storage objects in the same bucket.
  • <topic> corresponds to the name of the Kafka topic from which the records in this Azure Blob Storage object were read.
  • <encodedPartition> is generated by the Azure Blob Storage connector’s partitioner (see Partitioning Records into Azure Blob Storage Objects).
  • <kafkaPartition> is the Kafka partition number from which the records in this Azure Blob Storage object were read.
  • <startOffset> is the Kafka offset of the first record written to this Azure Blob Storage object.
  • <format> is the extension identifing the format in which the records are serialized in this Azure Blob Storage object.

If desired, the / and + characters can be changed using the connector’s directory.delim and file.delim configuration properties.

Partitioning Records into Azure Blob Storage Objects

The Azure Blob Storage connector’s partitioner determines how records read from a Kafka topic are partitioned into Azure Blob Storage objects. The partitioner determines the <encodedPartition> portion of the Azure Blob Storage object names (see Azure Blob Storage Object Names).

The partitioner is specified in the connector configuration with the partitioner.class configuration property. The Azure Blob Storage connector comes with the following partitioners:

  • Default (|ak|) Partitioner: The io.confluent.connect.storage.partitioner.DefaultPartitioner preserves the same topic partitions as in Kafka, and records from each topic partition ultimately end up in Azure Blob Storage objects with names that include the Kafka topic and Kafka partitions. The <encodedPartition> is always <topicName>/partition=<kafkaPartition>, resulting in Azure Blob Storage object names such as <prefix>/<topic>/partition=<kafkaPartition>/<topic>+<kafkaPartition>+<startOffset>.<format>.
  • Field Partitioner: The io.confluent.connect.storage.partitioner.FieldPartitioner determines the partition from the field within each each record identified by the connector’s partition.field.name configuration property, which has no default. This partitioner requires STRUCT record type values. The <encodedPartition> is always <topicName>/<fieldName>=<fieldValue>, resulting in Azure Blob Storage object names of the form <prefix>/<topic>/<fieldName>=<fieldValue>/<topic>+<kafkaPartition>+<startOffset>.<format>.
  • Time Based Partitioner: The io.confluent.connect.storage.partitioner.TimeBasedPartitioner determines the partition from the year, month, day, hour, minutes, and/or seconds. This partitioner requires the following connector configuration properties:
    • The path.format configuration property specifies the pattern used for the <encodedPartition> portion of the Azure Blob Storage object name. For example, when path.format='year'=YYYY/'month'=MM/'day'=dd/'hour'=HH, Azure Blob Storage object names will have the form <prefix>/<topic>/year=YYYY/month=MM/day=dd/hour=HH/<topic>+<kafkaPartition>+<startOffset>.<format>.
    • The partition.duration.ms configuration property defines the maximum granularity of the Azure Blob Storage objects within a single encoded partition directory. For example, setting partition.duration.ms=600000 (10 minutes) will result in each Azure Blob Storage object in that directory having no more than 10 minutes of records.
    • The locale configuration property specifies the JDK’s locale used for formatting dates and times. For example, use en-US for US English, en-GB for UK English, and fr-FR for French (in France). These may vary by Java version; see the available locales.
    • The timezone configuration property specifies the current timezone in which the dates and times will be treated. Use standard short names for timezones such as UTC or (without daylight savings) PST, EST, and ECT, or longer standard names such as America/Los_Angeles, America/New_York, and Europe/Paris. These may vary by Java version; see the available timezones within each locale, such as those within the “en_US” locale.
    • The timestamp.extractor configuration property determines how to obtain a timestamp from each record. Values can include Wallclock (the default) to use the system time when the record is processed, Record to use the timestamp of the Kafka record denoting when it was produced or stored by the broker, RecordField to extract the timestamp from one of the fields in the record’s value as specified by the timestamp.field configuration property.
  • Daily Partitioner: The io.confluent.connect.storage.partitioner.DailyPartitioner is equivalent to the TimeBasedPartitioner with path.format='year'=YYYY/'month'=MM/'day'=dd and partition.duration.ms=86400000 (one day, for one Azure Blob Storage object in each daily directory). This partitioner always results in Azure Blob Storage object names of the form <prefix>/<topic>/year=YYYY/month=MM/day=dd/<topic>+<kafkaPartition>+<startOffset>.<format>. This partitioner requires the following connector configuration properties:
    • The locale configuration property specifies the JDK’s locale used for formatting dates and times. For example, use en-US for US English, en-GB for UK English, and fr-FR for French (in France). These may vary by Java version; see the available locales.
    • The timezone configuration property specifies the current timezone in which the dates and times will be treated. Use standard short names for timezones such as UTC or (without daylight savings) PST, EST, and ECT, or longer standard names such as America/Los_Angeles, America/New_York, and Europe/Paris. These may vary by Java version; see the available timezones within each locale, such as those within the “en_US” locale.
    • The timestamp.extractor configuration property determines how to obtain a timestamp from each record. Values can include Wallclock (the default) to use the system time when the record is processed, Record to use the timestamp of the Kafka record denoting when it was produced or stored by the broker, RecordField to extract the timestamp from one of the fields in the record’s value as specified by the timestamp.field configuration property.
  • Hourly Partitioner: The io.confluent.connect.storage.partitioner.HourlyPartitioner is equivalent to the TimeBasedPartitioner with path.format='year'=YYYY/'month'=MM/'day'=dd/'hour'=HH and partition.duration.ms=3600000 (one hour, for one Azure Blob Storage object in each hourly directory). This partitioner always results in Azure Blob Storage object names of the form <prefix>/<topic>/year=YYYY/month=MM/day=dd/hour=HH/<topic>+<kafkaPartition>+<startOffset>.<format>. This partitioner requires the following connector configuration properties:
    • The locale configuration property specifies the JDK’s locale used for formatting dates and times. For example, use en-US for US English, en-GB for UK English, fr-FR for French (in France). These may vary by Java version; see the available locales.
    • The timezone configuration property specifies the current timezone in which the dates and times will be treated. Use standard short names for timezones such as UTC or (without daylight savings) PST, EST, and ECT, or longer standard names such as America/Los_Angeles, America/New_York, and Europe/Paris. These may vary by Java version; see the available timezones within each locale, such as those within the “en_US” locale.
    • The timestamp.extractor configuration property determines how to obtain a timestamp from each record. Values can include Wallclock (the default) to use the system time when the record is processed, Record to use the timestamp of the Kafka record denoting when it was produced or stored by the broker, RecordField to extract the timestamp from one of the fields in the record’s value as specified by the timestamp.field configuration property.

As noted below, the choice of timestamp.extractor affects whether the Azure Blob Storage connector can support exactly once delivery.

You can also choose to use a custom partitioner by implementing the io.confluent.connect.storage.partitioner.Partitioner interface, packaging your implementation into a JAR file, and then:

  1. Place the JAR file into the share/java/kafka-connect-azure_blob_storage directory of your Confluent Platform installation on each worker node.
  2. Restart all of the Connect worker nodes.
  3. Configure Azure Blob Storage connectors to use your fully-qualified partitioner class name.

Azure Blob Storage Object Formats

The Azure Blob Storage connector can serialize multiple records into each Azure Blob Storage object using a number of formats. The connector’s format.class configuration property identifies the name of the Java class that implements the io.confluent.connect.storage.format.Format interface, and the Azure Blob Storage connector comes with several implementations:

  • Avro: Use format.class=io.confluent.connect.azure.blob.format.avro.AvroFormat to write the Azure Blob Storage object as an Avro container file and will include the Avro schema in the container file followed by one or more records. The connector’s avro.codec configuration property specifies the Avro compression code, and values can be null (the default) for no Avro compression, deflate to use the deflate algorithm as specified in RFC 1951, snappy to use Google’s Snappy compression library, and bzip2 for BZip2 compression. Optionally set enhanced.avro.schema.support=true to enable enum symbol preservation and package name awareness.
  • JSON: Use format.class=io.confluent.connect.azure.blob.format.json.JsonFormat to write the Azure Blob Storage object as a single JSON array containing a JSON object for each record. The connector’s azblob.compression.type configuration property can be set to none (the default) for no compression or gzip for GZip compression.
  • Raw Bytes: Use format.class=io.confluent.connect.azure.blob.format.bytearray.ByteArrayFormat to write the raw serialized record values delimited with the JDK’s line separator to the Azure Blob Storage object. This requires using the value.converter=org.apache.kafka.connect.converters.ByteArrayConverter with the connector. Use a different delimiter by specifying the connect’s format.bytearray.separator configuration property.

You can also choose to use a custom partitioner by implementing the io.confluent.connect.storage.format.Format interface, packaging your implementation into a JAR file, and then:

  1. Place the JAR file into the share/java/kafka-connect-azure_blob_storage directory of your Confluent Platform installation on each worker node.
  2. Restart all of the Connect worker nodes.
  3. Configure Azure Blob Storage connectors with format.class set to the fully-qualified class name of your format implementation.

Azure Blob Storage Object Uploads

As the Azure Blob Storage connector processes each record, it uses the partitioner to determine into which encoded partition that record should be written. This continues for each partition until the connector determines that a partition has enough records and should be uploaded to the Azure Blob Storage bucket using the Azure Blob Storage object name for that partition. This technique of knowing when to flush a partition file and upload it to Azure Blob Storage is called the rotation strategy, and there are a number of ways to control this behavior:

  • Maximum number of records: The connector’s flush.size configuration property specifies the maximum number of records that should be written to a single Azure Blob Storage object. There is no default for this setting.
  • Maximum span of record time: The connector’s rotate.interval.ms specifies the maximum timespan in milliseconds a file can remain open and ready for additional records. The timestamp for each file starts with the record timestamp of the first record written to the file, as determined by the partitioner’s timestamp.extractor. As long as the next record’s timestamp fits within the timespan specified by the rotate.interval.ms, the record will be written to the file; if a record’s timestamp does not fit within the timespan of the file, the connector will flush the file, uploaded it to Azure Blob Storage, commit the offsets of the records in that file, and then create a new file with a timespan that starts with the first record and writes the first record to the file.
  • Scheduled rotation: The connector’s rotate.scheduled.interval.ms specifies the maximum timespan in milliseconds a file can remain open and ready for additional records. Unlike with rotate.interval.ms, with scheduled rotation the timestamp for each file starts with the system time that the first record is written to the file. As long as a record is processed within the timespan specified by rotate.scheduled.interval.ms, the record will be written to the file. As soon as a record is processed after the timespan for the current file, the file is flushed, uploaded to Azure Blob Storage, and the offset of the records in the file are committed. A new file is created with a timespan that starts with the current system time, and the record is written to the file. The commit will be performed at the scheduled time, regardless of the previous commit time or number of messages. This configuration is useful when you have to commit your data based on current server time, for example at the beginning of every hour. The default value -1 means that this feature is disabled.

These strategies can be combined as needed, and rotation occurs whenever any of the strategies signals a rotation.

The first strategy will cause a rotation as soon as enough records have been written to the file, and can be calculated after each record has been written to the file. In other words, the file can be closed and uploaded to Azure Blob Storage as soon as it is full.

When using rotate.interval.ms, the connector only closes and uploads a file to Azure Blob Storage when the next file does not belong based upon that record’s timestamp. In other words, if the connector has no more records to process, the connector may keep the file open for a significant period of time – until the connector can process another record.

Scheduled rotation uses rotate.schedule.interval.ms to close the file and upload to Azure Blob Storage on a regular basis using the current time, rather than the record time. Even if the connector has no more records to process, Connect will still call the connector at least every offset.flush.interval.ms as defined in the Connect worker’s configuration file. And every time this occurs, the connector uses the current time to determine if the currently opened file should be closed and uploaded to Azure Blob Storage.

Note

Not all rotation strategy are compatible with the Azure Blob Storage connector’s ability to deliver Azure Blob Storage objects exactly once with eventual consistency. See the Exactly Once section below for details.

The Azure Blob Storage object uploaded by the connector can be quite large, and the connector supports using a multi-part upload mechanism. The azure_blob_storage.part.size configuration property defaults to 26214400 bytes (25MB), and specifies the maximum size of each Azure Blob Storage object part used to upload a single Azure Blob Storage object.

Additionally, the schema.compatibility setting (see Schema Evolution) will also affect when one file is closed and uploaded to an Azure Blob Storage object. If a record cannot be written to one file because its schema has changed relative to the records already in the file, the connector will rotate by closing the file, uploading it to Azure Blob Storage, committing offsets for the records in the file, creating a new file and writing the new record.

Exactly-once delivery on top of eventual consistency

The Azure Blob Storage connector is able to provide exactly-once semantics to consumers of the objects it exports to Azure Blob Storage, under the condition that the connector is supplied with a deterministic partitioner.

Currently, out of the available partitioners, the default and field partitioners are always deterministic. TimeBasedPartitioner can be deterministic with some configurations, discussed below. This implies that, when any of these partitioners is used, splitting of files always happens at the same offsets for a given set of Kafka records. These partitioners take into account flush.size and schema.compatibility to decide when to roll and save a new file to Azure Blob Storage. The connector always delivers files in Azure Blob Storage that contain the same records, even under the presence of failures. If a connector task fails before an upload completes, the file does not become visible to Azure Blob Storage. If, on the other hand, a failure occurs after the upload has completed but before the corresponding offset is committed to Kafka by the connector, then a re-upload will take place. However, such a re-upload is transparent to the user of the Azure Blob Storage bucket, who at any time will have access to the same records made eventually available by successful uploads to Azure Blob Storage.

To guarantee exactly-once semantics with the TimeBasedPartitioner, the connector must be configured to use a deterministic implementation of TimestampExtractor and a deterministic rotation strategy. The deterministic timestamp extractors are Kafka records (timestamp.extractor=Record) or record fields (timestamp.extractor=RecordField). The deterministic rotation strategy configuration is rotate.interval.ms (setting rotate.schedule.interval.ms is nondeterministic and will invalidate exactly-once guarantees).

Schema Evolution

The Azure Blob Storage connector supports schema evolution and reacts to schema changes of data according to the schema.compatibility configuration. This section describes how the connector reacts to schema evolution under different values of schema.compatibility. The schema.compatibility can be set to NONE, BACKWARD, FORWARD and FULL, which means NO compatibility, BACKWARD compatibility, FORWARD compatibility and FULL compatibility respectively.

  • NO Compatibility: By default, the schema.compatibility is set to NONE. In this case, the connector ensures that each file written to Azure Blob Storage has the proper schema. When the connector observes a schema change in data, it commits the current set of files for the affected topic partitions and writes the data with new schema in new files.

  • BACKWARD Compatibility: If a schema is evolved in a backward compatible way, we can always use the latest schema to query all the data uniformly. For example, removing fields is backward compatible change to a schema, since when we encounter records written with the old schema that contain these fields we can just ignore them. Adding a field with a default value is also backward compatible.

    If BACKWARD is specified in the schema.compatibility, the connector keeps track of the latest schema used in writing data to Azure Blob Storage, and if a data record with a schema version larger than current latest schema arrives, the connector commits the current set of files and writes the data record with new schema to new files. For data records arriving at a later time with schema of an earlier version, the connector projects the data record to the latest schema before writing to the same set of files in Azure Blob Storage.

  • FORWARD Compatibility: If a schema is evolved in a forward compatible way, we can always use the oldest schema to query all the data uniformly. Removing a field that had a default value is forward compatible, since the old schema will use the default value when the field is missing.

    If FORWARD is specified in the schema.compatibility, the connector projects the data to the oldest schema before writing to the same set of files in Azure Blob Storage.

  • FULL Compatibility: Full compatibility means that old data can be read with the new schema and new data can also be read with the old schema.

    If FULL is specified in the schema.compatibility, the connector performs the same action as BACKWARD.

Schema evolution in the Azure Blob Storage connector works in the same way as in the HDFS connector and S3 connector.

Automatic Retries

The Azure Blob Storage connector may experience problems writing to the Azure Blob Storage bucket, due to network partitions, interruptions, or even AWS throttling limits. In many cases, the connector will retry the request a number of times before failing. To prevent from further overloading the network or Azure Blob Storage service, the connector uses an exponential backoff technique to give the network and/or service time to recover. The technique adds randomness, called jitter, to the calculated backoff times to prevent a thundering herd, where large numbers of requests from many tasks are submitted concurrently and overwhelm the service. Randomness spreads out the retries from many tasks and should reduce the overall time required to complete all outstanding requests compared to simple exponential backoff. The goal is to spread out the requests to Azure Blob Storage as much as possible.

The maximum number of retry attempts is dictated by the azblob.part.retries Azure Blob Storage connector configuration property, which defaults to three attempts. The delay for retries is dependent upon the connector’s azblob.retry.backoff.ms configuration property, which defaults to 200 milliseconds. The actual delay is randomized, but the maximum delay can be calculated as a function of the number of retry attempts with ${azblob.retry.backoff.ms} * 2 ^ (retry-1), where retry is the number of attempts taken so far in the current iteration. In order to keep the maximum delay within a reasonable duration, it is capped at 24 hours. For example, the following table shows the possible wait times before submitting each of the three retry attempts.

Range of backoff times for each retry using the default configuration
Retry Minimum Backoff (sec) Maximum Backoff (sec) Total Potential Delay from First Attempt (sec)
1 0.0 0.2 0.2
2 0.0 0.4 0.6
3 0.0 0.8 1.4

Increasing the maximum number of retries adds more backoff:

Range of backoff times for additional retries
Retry Minimum Backoff (sec) Maximum Backoff (sec) Total Potential Delay from First Attempt (sec)
4 0.0 1.6 3.0
5 0.0 3.2 6.2
6 0.0 6.4 12.6
7 0.0 12.8 25.4
8 0.0 25.6 51.0
9 0.0 51.2 102.2
10 0.0 102.4 204.6

At some point, maximum backoff time will reach saturation and will be capped at 24 hours. From the example below, all attempts starting with 20 will have maximum backoff time as 24 hours.

Range of backoff times when reaching the cap of 24 hours
Retry Minimum Backoff (sec) Maximum Backoff (sec) Total Potential Delay from First Attempt (sec)
15 0.0 3276.8 6553.4
16 0.0 6553.6 13107.0
17 0.0 13107.2 26214.2
18 0.0 26214.4 52428.6
19 0.0 52428.8 104857.4
20 0.0 86400.0 191257.4
21 0.0 86400.0 277657.4

It’s not advised to set azblob.part.retries too high since making more attempts after reaching a cap of 24 hours isn’t practical. You can adjust both the azblob.part.retries and azblob.retry.backoff.ms connector configuration properties to achieve the desired retry and backoff characteristics.

Quick Start

In this quick start, the Azure Blob Storage connector is used to export data produced by the Avro console producer to Azure Blob Storage.

Before you begin, create an Azure Blob Storage destination container and grant write access to the user or IAM role completing these procedures. See Create a block blob storage account for additional information. Also if you’re using Shared Key access instead of account keys: see Configure Azure Storage connection strings for additional information.

By default, all resources in Azure Storage are secured and only available to the account owner.

Install the connector through the Confluent Hub Client.

# run from your Confluent Platform installation directory
confluent-hub install confluentinc/kafka-connect-azure-blob-storage:latest

Tip

By default, it will install the plugin into share/confluent-hub-components and add the directory to the plugin path. If this is the first connector you have installed, you may need to restart the connect server for the plugin path change to take effect. Also see Azure Blob Storage CLI for setup and using the CLI.

Start the services using the confluent local.

Tip

The command syntax for the Confluent CLI development commands changed in 5.3.0. These commands have been moved to confluent local. For example, the syntax for confluent start is now confluent local start. For more information, see confluent local.

confluent local start

Every service start in order, printing a message with its status.

Starting zookeeper
zookeeper is [UP]
Starting kafka
kafka is [UP]
Starting schema-registry
schema-registry is [UP]
Starting kafka-rest
kafka-rest is [UP]
Starting connect
connect is [UP]
Starting ksql-server
ksql-server is [UP]
Starting control-center
control-center is [UP]

Note

Make sure the Azure Blob Storage connector has write access to the Azure Blob Storage container shown in azblob.container.name and can deploy credentials successfully.

To import a few records with a simple schema in Kafka, start the Avro console producer as follows:

  ./bin/kafka-avro-console-producer --broker-list localhost:9092 --topic blob_topic \
--property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}'

Then, in the console producer, enter the following:

{"f1": "value1"}
{"f1": "value2"}
{"f1": "value3"}
{"f1": "value4"}
{"f1": "value5"}
{"f1": "value6"}
{"f1": "value7"}
{"f1": "value8"}
{"f1": "value9"}

The nine records entered are published to the Kafka topic blob_topic in Avro format.

Create a blob.properties file with the following contents:

name=blob-sink
connector.class=io.confluent.connect.azure.blob.AzureBlobStorageSinkConnector
tasks.max=1
topics=blob_topic
flush.size=3
azblob.account.name=your-account
azblob.account.key=your-key
azblob.container.name=confluent-kafka-connect-azure-blob-storage-testing
format.class=io.confluent.connect.azure.blob.format.avro.AvroFormat
confluent.topic.bootstrap.servers=localhost:9092
confluent.topic.replication.factor=1

Before starting the connector, make sure that the configurations in blob.properties are properly set to your configurations of Azure Blob Storage. For this example, make sure that azblob.container.name points to your container, azblob.account.name is set to your account, and azblob.account.key is set to your key. Then start the Azure Blob Storage connector by loading its configuration with the following command.

Caution

You must include a double dash (--) between the topic name and your flag. For more information, see this post.

confluent local load blob-sink -- -d blob.properties
{
 "name": "blob-sink",
 "config": {
   "name": "blob-sink"
   "connector.class": "io.confluent.connect.azure.blob.AzureBlobStorageSinkConnector",
   "tasks.max": "1",
   "topics": "blob_topic",
   "flush.size": "3",
   "azblob.account.name": "your-account",
   "azblob.account.key": "your-key",
   "azblob.container.name": "confluent-kafka-connect-azure-blob-storage-testing",
   "format.class": "io.confluent.connect.azure.blob.format.avro.AvroFormat",
   "confluent.topic.bootstrap.servers": "localhost:9092",
   "confluent.topic.replication.factor": "1",
 },
  "tasks": []
}

Check that the connector started successfully. Review the Connect worker’s log by entering the following:

confluent local log connect

Towards the end of the log you should see that the connector starts, logs a few messages, and then uploads data from Kafka to Azure Blob Storage.

Once the connector has ingested some records, check that the data is available in Azure Blob Storage. Use the following Azure CLI command:

az storage blob list --container-name confluent-kafka-connect-azure-blob-storage-testing --output table

You should see three objects with keys.

topics/blob_topic/partition=0/blob_topic+0+0000000000.avro
topics/blob_topic/partition=0/blob_topic+0+0000000003.avro
topics/blob_topic/partition=0/blob_topic+0+0000000006.avro

Each file is encoded as <topic>+<kafkaPartition>+<startOffset>.<format>.

To verify the contents, copy each file from Azure Blob Storage to your local filesystem. Use the following Azure CLI command:

az storage blob download --container-name confluent-kafka-connect-azure-blob-storage-testing --name topics/blob_topic/partition=0/blob_topic+0+0000000000.avro --file ~/blob_topic+0+0000000000.avro

Use avro-tools-1.8.2.jar (available in Apache mirrors) to print the records.

java -jar avro-tools-1.8.2.jar tojson blob_topic+0+0000000000.avro

For the file above, you should see the following output:

{"f1":"value1"}
{"f1":"value2"}
{"f1":"value3"}

The rest of the records are contained in the other two files.

Finally, stop the Connect worker and all other Confluent services by running:

confluent local stop

Your output should resemble:

Stopping control-center
control-center is [DOWN]
Stopping ksql-server
ksql-server is [DOWN]
Stopping connect
connect is [DOWN]
Stopping kafka-rest
kafka-rest is [DOWN]
Stopping schema-registry
schema-registry is [DOWN]
Stopping kafka
kafka is [DOWN]
Stopping zookeeper
zookeeper is [DOWN]

You can stop all services and remove any data generated during this quick start by entering the following command:

confluent local destroy

Your output should resemble:

Stopping control-center
control-center is [DOWN]
Stopping ksql-server
ksql-server is [DOWN]
Stopping connect
connect is [DOWN]
Stopping kafka-rest
kafka-rest is [DOWN]
Stopping schema-registry
schema-registry is [DOWN]
Stopping kafka
kafka is [DOWN]
Stopping zookeeper
zookeeper is [DOWN]
Deleting: /var/folders/ty/rqbqmjv54rg_v10ykmrgd1_80000gp/T/confluent.PkQpsKfE

Exactly-once delivery

The Azure Blob Storage connector is able to provide exactly-once semantics to consumers of the objects it exports to Azure Blob Storage, under the condition that the connector is supplied with a deterministic partitioner.

Currently, out of the available partitioners, the default and field partitioners are always deterministic. TimeBasedPartitioner can be deterministic with some configurations, discussed below. This implies that, when any of these partitioners is used, splitting of files always happens at the same offsets for a given set of Kafka records. These partitioners take into account flush.size and schema.compatibility to decide when to roll and save a new file to Azure Blob Storage. The connector always delivers files in Azure Blob Storage that contain the same records, even under the presence of failures. If a connector task fails before an upload completes, the file does not become visible to Azure Blob Storage. If, on the other hand, a failure occurs after the upload has completed but before the corresponding offset is committed to Kafka by the connector, then a re-upload will take place. However, such a re-upload is transparent to the user of the Azure Blob Storage container, who at any time will have access to the same records made eventually available by successful uploads to Azure Blob Storage.

To guarantee exactly-once semantics with the TimeBasedPartitioner, the connector must be configured to use a deterministic implementation of TimestampExtractor and a deterministic rotation strategy. The deterministic timestamp extractors are Kafka records (timestamp.extractor=Record) or record fields (timestamp.extractor=RecordField). The deterministic rotation strategy configuration is rotate.interval.ms (setting rotate.schedule.interval.ms is nondeterministic and will invalidate exactly-once guarantees).

../../_images/connect-s3-eos.png

Schema Evolution

The Azure Blob Storage connector supports schema evolution and reacts to schema changes of data according to the schema.compatibility configuration. This section, will explain how the connector reacts to schema evolution under different values of schema.compatibility. The schema.compatibility can be set to NONE, BACKWARD, FORWARD and FULL, which means NO compatibility, BACKWARD compatibility, FORWARD compatibility and FULL compatibility respectively.

  • NO Compatibility: By default, the schema.compatibility is set to NONE. In this case, the connector ensures that each file written to Azure Blob Storage has the proper schema. When the connector observes a schema change in data, it commits the current set of files for the affected topic partitions and writes the data with new schema in new files.

  • BACKWARD Compatibility: If a schema is evolved in a backward compatible way, the connector can always use the latest schema to query all the data uniformly. For example, removing fields is backward compatible change to a schema, since when the connector encounters records written with the old schema that contain these fields the connector can just ignore them. Adding a field with a default value is also backward compatible.

    If BACKWARD is specified in the schema.compatibility, the connector keeps track of the latest schema used in writing data to Azure Blob Storage, and if a data record with a schema version larger than current latest schema arrives, the connector commits the current set of files and writes the data record with new schema to new files. For data records arriving at a later time with schema of an earlier version, the connector projects the data record to the latest schema before writing to the same set of files in Azure Blob Storage.

  • FORWARD Compatibility: If a schema is evolved in a forward compatible way, the connector can always use the oldest schema to query all the data uniformly. Removing a field that had a default value is forward compatible, since the old schema will use the default value when the field is missing.

    If FORWARD is specified in the schema.compatibility, the connector projects the data to the oldest schema before writing to the same set of files in Azure Blob Storage.

  • Full Compatibility: Full compatibility means that old data can be read with the new schema and new data can also be read with the old schema.

    If FULL is specified in the schema.compatibility, the connector performs the same action as BACKWARD.

Schema evolution in the Azure Blob Storage connector works the same way as the Schema Evolution.

Automatic Retries

The Azure Blob Storage connector may experience intermittent problems writing to the Azure Blob Storage container. This is because of network partitioning, interruptions, or throttling limits. In many cases, the connector retries the request a number of times before failing. To prevent overloading the network or Azure Blob Storage service, the connector uses an exponential backoff technique to give the network and service time to recover. The retries occur on requests which return HTTP status codes of greater than 500 excluding 501 and 505.

The maximum number of retry attempts is dictated by the azblob.retry.retries Azure Blob Storage connector configuration property. This property defaults to three retry attempts (one initial attempt and three retry attempts). The delay for retries is dependent upon the connector’s azblob.retry.backoff.ms configuration property, which defaults to 4000 milliseconds. The maximum delay between retries is capped by azblob.retry.max.backoff.ms, which defaults to 120000 milliseconds. The actual delay is calculated by taking the lower value between ${azblob.retry.backoff.ms} * (-1 + 2 ^ (attempt-1)) and azblob.retry.max.backoff.ms, where attempt is the number of attempts taken so far in the current iteration. For example, the following table shows the possible wait times before submitting each of the three retries:

Range of backoff times for each retry using the default configuration
Attempt Calculated Backoff (sec) Actual Backoff (sec)
1 0.0 0.0
2 4.0 4.0
3 12.0 12.0
4 28.0 28.0

The maximum backoff time eventually reaches saturation and is capped at azblob.retry.max.backoff.ms. From the example below, all attempts starting with attempt six will have the maximum backoff time of 120 seconds:

Range of backoff times when reaching the default cap of two minutes
Attempt Calculated Backoff (sec) Actual Backoff (sec)
4 28.0 28.0
5 60.0 60.0
6 124.0 120.0
7 252.0 120.0
8 508.0 120.0
9 1020.0 120.0
10 2044.0 120.0

In the case of a secondary host being provided the behaviour is a little different. After a failed attempt to your normal store.url it will then delay by (.1 second * random(0.8, 1.2)) before calling the azblob.retry.secondary.host.

It’s not advised to set azblob.retry.max.backoff.ms too high since making more attempts after reaching 24 hours isn’t practical. You can adjust both the azblob.retry.retries, azblob.retry.backoff.ms, azblob.retry.max.backoff.ms and azblob.retry.secondary.host connector configuration properties to achieve the desired retry and backoff characteristics.

Also the timeout of a single HTTP request can be configured by editing azblob.connection.timeout.ms.

Note

When transferring large amounts of data, the default TryTimeout is probably not sufficient. You should override this value based on the bandwidth available to the host machine and proximity to the Storage service. A good starting point may be something like (60 seconds per MB of anticipated payload size).

Write JSON message values into Azure Blob Storage

The example settings file is shown below:

name=blob-sink
connector.class=io.confluent.connect.azure.blob.AzureBlobStorageSinkConnector
tasks.max=1
topics=blob_topic
flush.size=100

# Required configuration
azblob.account.name=account
azblob.account.key=accountkey

# The following define the information used to validate the license stored in Kafka
confluent.license=
confluent.topic.bootstrap.servers=localhost:9092

The first few settings are common to most connectors. topics specifies the topics to export data from, in this case blob_topic. The property flush.size specifies the number of records per partition the connector needs to write to before completing a multiblock upload to Azure Blob Storage.

The azblob.account.name and azblob.account.key are your required Azure credentials. This is a licensed Confluent connector. Enter the following for testing purposes. multiblock uploads For more on the this look at the Azure Blob Storage Licensing section.

azblob.container.name=confluent-kafka-connect-azure-blob-storage-testing
azblob.block.size=5242880

The next settings are specific to Azure Blob Storage. A mandatory setting is the name of your Azure Blob Storage container to host the exported Kafka records. Another useful configuration setting is azblob.block.size. This setting controls the size of each block in the multiblock uploads used to upload a single chunk of Kafka records.

storage.class=io.confluent.connect.azure.blob.storage.AzureBlobStorage
format.class=io.confluent.connect.azure.blob.format.json.JsonFormat
schema.generator.class=io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator
partitioner.class=io.confluent.connect.storage.partitioner.DefaultPartitioner

These class settings are required to specify the Azure Block Storage interface, the output file format, which is currently io.confluent.connect.azure.blob.format.avro.AvroFormat, io.confluent.connect.azure.blob.format.json.JsonFormat or io.confluent.connect.azure.blob.format.bytearray.ByteArrayFormat, and the partitioner class along with its schema generator class. When using a format with no schema definition, it is sufficient to set the schema generator class to its default value.

schema.compatibility=NONE

Finally, schema evolution is disabled in this example by setting schema.compatibility to NONE, as explained above.

For detailed descriptions for all the available configuration options of the Azure Blob Storage connector go to Configuration Properties.

Maximum file size exception

If a file is created that exceeds the maximum of 50000 blocks, an exception occurs with the error: org.apache.kafka.connect.errors.ConnectException: Due to the Azure constraints, you can only have a block blob of up to a maximum of 50000 blocks and you've exceeded that limit. To resolve the issue please either decrease your flush.size to make the files smaller or increase your azblob.block.size. As the error explains a situation can occur when the file you are trying to create is greater than 50,000 multiplied by your azblob.block.size value. To resolve this issue either decrease the flush.size or increase the azblob.block.size. If you are interested in learning more about Azure Blob Storage’s constraints you can read more here.

Write raw message values into Azure Blob Storage

It is possible to use the Azure Blob Storage connector to write out the unmodified original message values into newline-separated files in Azure Blob Storage. To accomplish this configure Kafka Connect so it does not deserialize any of the messages, and configure the Azure Blob Storage connector to store the message values in a binary format in Azure Blob Storage.

The first part of the Azure Blob Storage connector configuration is similar to other examples.

name=blob-raw-sink
connector.class=io.confluent.connect.azure.blob.AzureBlobStorageSinkConnector
tasks.max=1
topics=blob_topic
flush.size=3

The topics setting specifies the topics you want to export data from, which is blob_topic in the example. The property flush.size specifies the number of records per partition the connector needs to write before completing a multipart upload to Azure Blob Storage.

Next, configure container name, block size, and compression type.

azblob.container.name=confluent-kafka-connect-azure-blob-storage-testing
azblob.block.size=5242880
azblob.compression.type=gzip

The azblob.container.name is mandatory and names your Azure Blob Storage container where the exported Kafka records should be written. And since the Azure Blob Storage connector uses multiblock uploads, you can use the azblob.block.size to control the size of each of these continuous parts used to upload Kafka records into a single Azure Blob Storage object. The part size affects throughput and latency, as an Azure Blob Storage object is visible/available only after all parts are uploaded. The azblob.compression.type specifies that the Azure Blob Storage connector should compress all Azure Blob Storage objects using GZIP compression, adding the .gz extension to any files (see below).

So far this example configuration is relatively typical of most Azure Blob Storage connectors. Now configure the connector to read the raw message values and write them in binary format:

value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
format.class=io.confluent.connect.azure.blob.format.bytearray.ByteArrayFormat
storage.class=io.confluent.connect.azure.blob.storage.AzureBlobStorage
schema.compatibility=NONE

The value.converter setting overrides the connector default in the Connect worker configuration. ByteArrayConverter is used to instruct Connect to skip deserializing the message values and instead give the connector the message values in their raw binary form. The format.class setting is used to instruct the Azure Blob Storage connector to write these binary message values as-is into Azure Blob Storage objects. By default the message values written to the same Azure Blob Storage object are separated by a newline character sequence, but you can control this with the format.bytearray.separator setting, and you may want to consider this if your messages might contain newlines. Also, by default the files written to Azure Blob Storage have an extension of .bin (before compression, if enabled), or you can use the format.bytearray.extension setting to change the pre-compression filename extension.

Next, you need to decide how you want to partition the consumed messages in Azure Blob Storage objects. You have a few options, including the default partitioner that preserves the same partitions as in Kafka:

partitioner.class=io.confluent.connect.storage.partitioner.DefaultPartitioner

Or, you could partition using the timestamp of the Kafka messages.

partitioner.class=io.confluent.connect.storage.partitioner.TimeBasedPartitioner
timestamp.extractor=Record

Or, you can use the timestamp that the Azure Blob Storage connector processes each message.

partitioner.class=io.confluent.connect.storage.partitioner.TimeBasedPartitioner
timestamp.extractor=Wallclock

Custom partitioners are always an option, too. Just be aware that since the record value is an opaque binary value, Connect cannot extract timestamps from fields using the RecordField option.

The Azure Blob Storage connector configuration outlined above results in newline-delimited gzipped objects in Azure Blob Storage with .bin.gz.

Additional Documentation