Azure Blob Storage Sink Connector for Confluent Platform
You can use the Azure Blob Storage connector to export data from Apache Kafka® topics
to Azure Blob Storage objects in either Avro, JSON, Bytes or Parquet formats.
Depending on your environment, the Azure Blob Storage connector can export data
by guaranteeing exactly-once delivery semantics to consumers of the Azure Blob
Storage objects it produces.
The Azure Blob Storage Sink connector periodically polls data from Kafka and then
uploads the data to Azure Blob Storage. A partitioner is used to split the data
of every Kafka partition into chunks. Each chunk of data is represented as an
Azure Blob Storage object. The key name encodes the topic, the Kafka partition,
and the start offset of this data chunk.
If no partitioner is specified in the configuration, the default partitioner
which preserves Kafka partitioning is used. The size of each data chunk is
determined by the number of records written to Azure Blob Storage and by schema
compatibility.
Features
The Azure Blob Storage Sink Connector for Confluent Platform includes the following features:
Exactly once delivery
Records that are exported using a deterministic partitioner are delivered with
exactly-once semantics.
Multiple tasks
The Azure Blob Storage Sink Connector supports running one or more tasks. You can specify
the number of tasks in the tasks.max
configuration parameter. This can lead
to huge performance gains when multiple files need to be parsed.
Pluggable partitioner
The connector comes out of the box with partitioners that support default
partitioning based on Apache Kafka® partitions, field partitioning, and time-based
partitioning in days or hours. You may implement your own partitioners by
extending the Partitioner
class. Additionally, you can customize time based
partitioning by extending the TimeBasedPartitioner
class.
- Schema Evolution: Schema evolution only works if the records are generated with the default naming strategy, which is
TopicNameStrategy
. An error may occur if other naming strategies are used. This is because records are not compatible with each other. schema.compatibility
should be set to NONE
if other naming strategies are used. This may result in small object files because the sink connector creates a new file every time the schema ID changes between records. See Subject Name Strategy for more information about naming strategies.
Install the Azure Blob Storage Connector
You can install this connector by using the Confluent Hub client installation
instructions or by
manually downloading the ZIP file.
Prerequisites
Note
You must install the connector on every machine where Connect will run.
An install of the Confluent Hub Client.
Note
This is installed by default with Confluent Enterprise.
An install of the latest (latest
) connector version.
To install the latest
connector version, navigate to your Confluent Platform
installation directory and run the following command:
confluent-hub install confluentinc/kafka-connect-azure-blob-storage:latest
You can install a specific version by replacing latest
with a version
number as shown in the following example:
confluent-hub install confluentinc/kafka-connect-azure-blob-storage:1.1.1
Caution
You can’t mix schema and schemaless records in storage using
kafka-connect-storage-common. Attempting
this causes a runtime exception. If you are using the self-managed version of
this connector, this issue will be evident when you review the log
files (only available for the self-managed connector).
Mapping records to Azure Blob Storage Objects
The Azure Blob Storage connector consumes records from the specified topics,
organizes them into different partitions, writes batches of records in each
partition to an file, and then uploads those files to the Azure Blob Storage
bucket. It uses Azure Blob Storage object paths that include the Kafka topic and
partition, the computed partition, and the filename. The Azure Blob Storage
connector offers several ways to customize this behavior, including:
Azure Blob Storage object names
The Azure Blob Storage data model is a flat structure: each bucket stores
objects, and the name of each Azure Blob Storage object serves as the unique key.
However, a logical hierarchy can be inferred when the Azure Blob Storage object
names uses directory delimiters, such as /
. The Azure Blob Storage connector
allows you to customize the names of the Azure Blob Storage objects it uploads to
the Azure Blob Storage bucket.
In general, the names of the Azure Blob Storage object uploaded by the Azure Blob
Storage connector follow this format:
<prefix>/<topic>/<encodedPartition>/<topic>+<kafkaPartition>+<startOffset>.<format>
where:
<prefix>
is specified with the connector’s topics.dir
configuration property, which defaults
to the literal value topics
and helps create uniquely named Azure Blob Storage objects that don’t clash with existing Azure Blob Storage objects
in the same bucket.
<topic>
corresponds to the name of the Kafka topic from which the records in this Azure Blob Storage object were read.
<encodedPartition>
is generated by the Azure Blob Storage connector’s partitioner (see Partitioning records into Azure Blob Storage objects).
<kafkaPartition>
is the Kafka partition number from which the records in this Azure Blob Storage object were read.
<startOffset>
is the Kafka offset of the first record written to this Azure Blob Storage object.
<format>
is the extension identifing the format in which the records are serialized in this Azure Blob Storage object.
If desired, the /
and +
characters can be changed using the connector’s
directory.delim
and file.delim
configuration properties.
Partitioning records into Azure Blob Storage objects
The Azure Blob Storage connector’s partitioner determines how records read from
a Kafka topic are partitioned into Azure Blob Storage objects. The partitioner
determines the <encodedPartition>
portion of the Azure Blob Storage object
names (see Azure Blob Storage object names).
The partitioner is specified in the connector configuration with the
partitioner.class
configuration property. The Azure Blob Storage connector
comes with the following partitioners:
- Default Kafka Partitioner: The
io.confluent.connect.storage.partitioner.DefaultPartitioner
preserves the same topic partitions as in Kafka, and records from each topic
partition ultimately end up in Azure Blob Storage objects with names that
include the Kafka topic and Kafka partitions. The <encodedPartition>
is
always <topicName>/partition=<kafkaPartition>
, resulting in Azure Blob
Storage object names such as
<prefix>/<topic>/partition=<kafkaPartition>/<topic>+<kafkaPartition>+<startOffset>.<format>
.
- Field Partitioner: The
io.confluent.connect.storage.partitioner.FieldPartitioner
determines the
partition from the field within each each record identified by the connector’s
partition.field.name
configuration property, which has no default. This
partitioner requires STRUCT
record type values. The <encodedPartition>
is always <topicName>/<fieldName>=<fieldValue>
, resulting in Azure Blob
Storage object names of the form
<prefix>/<topic>/<fieldName>=<fieldValue>/<topic>+<kafkaPartition>+<startOffset>.<format>
.
- Time Based Partitioner: The
io.confluent.connect.storage.partitioner.TimeBasedPartitioner
determines the partition from the year, month, day, hour, minutes, and/or seconds.
This partitioner requires the following connector configuration properties:
- The
path.format
configuration property specifies the pattern used for the <encodedPartition>
portion of the Azure Blob Storage object name. For example, when path.format='year'=YYYY/'month'=MM/'day'=dd/'hour'=HH
,
Azure Blob Storage object names will have the form <prefix>/<topic>/year=YYYY/month=MM/day=dd/hour=HH/<topic>+<kafkaPartition>+<startOffset>.<format>
.
- The
partition.duration.ms
configuration property defines the maximum granularity of the Azure Blob Storage objects within
a single encoded partition directory. For example, setting partition.duration.ms=600000
(10 minutes)
will result in each Azure Blob Storage object in that directory having no more than 10 minutes of records.
- The
locale
configuration property specifies the JDK’s locale used for formatting dates and times.
For example, use en-US
for US English, en-GB
for UK English, and fr-FR
for French (in France).
These may vary by Java version; see the available locales.
- The
timezone
configuration property specifies the current timezone in which the dates and times will be treated.
Use standard short names for timezones such as UTC
or (without daylight savings) PST
, EST
, and ECT
,
or longer standard names such as America/Los_Angeles
, America/New_York
, and Europe/Paris
.
These may vary by Java version; see the available timezones within each locale,
such as those within the “en_US” locale.
- The
timestamp.extractor
configuration property determines how to obtain a timestamp from each record.
Values can include Wallclock
(the default) to use the system time when the record is processed,
Record
to use the timestamp of the Kafka record denoting when it was produced or stored by the broker,
RecordField
to extract the timestamp from one of the fields in the record’s value
as specified by the timestamp.field
configuration property.
- Daily Partitioner: The
io.confluent.connect.storage.partitioner.DailyPartitioner
is equivalent to the
TimeBasedPartitioner with path.format='year'=YYYY/'month'=MM/'day'=dd
and
partition.duration.ms=86400000
(one day, for one Azure Blob Storage object in each daily directory).
This partitioner always results in Azure Blob Storage object names of the form <prefix>/<topic>/year=YYYY/month=MM/day=dd/<topic>+<kafkaPartition>+<startOffset>.<format>
.
This partitioner requires the following connector configuration properties:
- The
locale
configuration property specifies the JDK’s locale used for formatting dates and times.
For example, use en-US
for US English, en-GB
for UK English, and fr-FR
for French (in France).
These may vary by Java version; see the available locales.
- The
timezone
configuration property specifies the current timezone in which the dates and times will be treated.
Use standard short names for timezones such as UTC
or (without daylight savings) PST
, EST
, and ECT
,
or longer standard names such as America/Los_Angeles
, America/New_York
, and Europe/Paris
.
These may vary by Java version; see the available timezones within each locale,
such as those within the “en_US” locale.
- The
timestamp.extractor
configuration property determines how to obtain a timestamp from each record.
Values can include Wallclock
(the default) to use the system time when the record is processed,
Record
to use the timestamp of the Kafka record denoting when it was produced or stored by the broker,
RecordField
to extract the timestamp from one of the fields in the record’s value
as specified by the timestamp.field
configuration property.
- Hourly Partitioner: The
io.confluent.connect.storage.partitioner.HourlyPartitioner
is equivalent to the
TimeBasedPartitioner with path.format='year'=YYYY/'month'=MM/'day'=dd/'hour'=HH
and
partition.duration.ms=3600000
(one hour, for one Azure Blob Storage object in each hourly directory).
This partitioner always results in Azure Blob Storage object names of the form <prefix>/<topic>/year=YYYY/month=MM/day=dd/hour=HH/<topic>+<kafkaPartition>+<startOffset>.<format>
.
This partitioner requires the following connector configuration properties:
- The
locale
configuration property specifies the JDK’s locale used for formatting dates and times.
For example, use en-US
for US English, en-GB
for UK English, fr-FR
for French (in France).
These may vary by Java version; see the available locales.
- The
timezone
configuration property specifies the current timezone in which the dates and times will be treated.
Use standard short names for timezones such as UTC
or (without daylight savings) PST
, EST
, and ECT
,
or longer standard names such as America/Los_Angeles
, America/New_York
, and Europe/Paris
.
These may vary by Java version; see the available timezones within each locale,
such as those within the “en_US” locale.
- The
timestamp.extractor
configuration property determines how to obtain a timestamp from each record.
Values can include Wallclock
(the default) to use the system time when the record is processed,
Record
to use the timestamp of the Kafka record denoting when it was produced or stored by the broker,
RecordField
to extract the timestamp from one of the fields in the record’s value
as specified by the timestamp.field
configuration property.
As noted below, the choice of timestamp.extractor
affects whether the Azure
Blob Storage connector can support exactly once delivery.
You can also choose to use a custom partitioner by implementing the
io.confluent.connect.storage.partitioner.Partitioner
interface, packaging
your implementation into a JAR file, and then:
- Place the JAR file into the
share/java/kafka-connect-azure_blob_storage
directory of your Confluent Platform installation on each worker node.
- Restart all of the Connect worker nodes.
- Configure Azure Blob Storage connectors to use your fully-qualified partitioner class name.
Azure Blob Storage object uploads
As the Azure Blob Storage connector processes each record, it uses the
partitioner to determine which encoded partition to write the record. This
continues for each partition until the connector determines that a partition has
enough records and should be flushed and uploaded to the Azure Blob Storage
bucket using the Azure Blob Storage object name for that partition. This
technique of knowing when to flush a partition file and upload it to Azure Blob
Storage is called the rotation strategy, and there are a number of ways to
control this behavior.
Maximum number of records: The connector’s flush.size
configuration
property specifies the maximum number of records that should be written to a
single Azure Blob Storage object. There is no default for this setting.
Important
Rotation strategy logic: In the following rotation strategies, the
logic to flush files to storage is triggered when a new record arrives,
after the defined interval or scheduled interval time. Flushing files is
also triggered periodically by the offset.flush.interval.ms
setting
defined in the Connect worker configuration. The
offset.flush.interval.ms
setting defaults to 60000 ms (60 seconds). If
you enable the properties rotate.interval.ms
or
rotate.schedule.interval.ms
and ingestion rate is low, you should set
offset.flush.interval.ms
to a smaller value so that records flush at
the rotation interval (or close to the interval) . Leaving the
offset.flush.interval.ms
set to the default 60 seconds may cause
records to stay in an open file for longer than expected, if no new records
get processed that trigger rotation.
Maximum span of record time: In this rotation strategy, the connector’s
rotate.interval.ms
property specifies the maximum timespan in milliseconds a
file can remain open and ready for additional records. The timestamp for each
file starts with the record timestamp of the first record written to the
file, as determined by the partitioner’s timestamp.extractor
. As long as the
next record’s timestamp fits within the timespan specified by the
rotate.interval.ms
property, the record is written to the file. If a
record’s timestamp does not fit within the timespan of rotate.interval.ms
,
the connector flushes the file, uploads it to Azure Blob Storage, and commits the
offsets of the records in that file. After this, the connector creates a new
file with a timespan that starts with the first record, and writes the first
record to the file.
Scheduled rotation: In this rotation strategy, the connector’s
rotate.schedule.interval.ms
specifies the maximum timespan in milliseconds
a file can remain open and ready for additional records. Unlike
rotate.interval.ms
, with scheduled rotation the timestamp for each file
starts with the system time that the first record is written to the file.
You must have the partitioner parameter timezone
configured (defaults
to an empty string) when using this configuration property, otherwise the
connector fails with an exception.
As long as a record is processed within the timespan specified by
rotate.schedule.interval.ms
, the record will be written to the file. As
soon as a new record is processed after the timespan for the current file,
the file is flushed, uploaded to Azure Blob Storage, and the offset of the
records in the file are committed. A new file is created with a timespan that
starts with the current system time, and the new record is written to the
file. This configuration is useful when you have to commit your data based on
current server time, for example at the beginning of every hour. The default
value -1
means that this feature is disabled.
Scheduled rotation uses rotate.schedule.interval.ms
to close the file and
upload to Azure Blob Storage on a regular basis using the current time, rather
than the record time. Even if the connector has no more records to process,
Connect will still call the connector at least every
offset.flush.interval.ms
, as defined in the Connect worker’s
configuration file. And every time this occurs, the connector uses the current
time to determine if the currently opened file should be closed and uploaded
to Azure Blob Storage.
These strategies can be combined as needed. However, when using either of the
two rotation strategies described above, the connector only closes and uploads a
file to Azure Blob Storage when the next file does not belong based upon the
timestamp. In other words, if the connector has no more records to process, the
connector may keep the file open for a significant period of time, until the
connector can process another record.
Note
Not all rotation strategies are compatible with the Azure Blob Storage
connector’s ability to deliver Azure Blob Storage objects exactly once with
eventual consistency. See the Exactly Once section below for details.
The Azure Blob Storage object uploaded by the connector can be quite large. The
connector supports using a multi-part upload mechanism. The
azure_blob_storage.part.size
configuration property defaults to 26214400
bytes (25MB) and specifies the maximum size of each Azure Blob Storage object
part used to upload a single Azure Blob Storage object.
Additionally, the schema.compatibility
setting (see Schema Evolution)
will also affect when one file is closed and uploaded to Azure Blob Storage. If a
record cannot be written to one file because its schema has changed relative to
the records already in the file, the connector rotates by closing the file,
uploading it to Azure Blob Storage, committing offsets for the records in the
file, creating a new file, and writing the new record.
Exactly-once delivery on top of eventual consistency
The Azure Blob Storage connector is able to provide exactly-once semantics to
consumers of the objects it exports to Azure Blob Storage, under the condition
that the connector is supplied with a deterministic partitioner.
Currently, out of the available partitioners, the default and field partitioners
are always deterministic. TimeBasedPartitioner
can be deterministic with
some configurations, discussed below. This implies that, when any of these
partitioners is used, splitting of files always happens at the same offsets for
a given set of Kafka records. These partitioners take into account
flush.size
and schema.compatibility
to decide when to roll and save a
new file to Azure Blob Storage. The connector always delivers files in Azure Blob
Storage that contain the same records, even under the presence of failures. If a
connector task fails before an upload completes, the file does not become
visible to Azure Blob Storage. If, on the other hand, a failure occurs after the
upload has completed but before the corresponding offset is committed to Kafka by
the connector, then a re-upload will take place. However, such a re-upload is
transparent to the user of the Azure Blob Storage bucket, who at any time will
have access to the same records made eventually available by successful uploads
to Azure Blob Storage.
To guarantee exactly-once semantics with the TimeBasedPartitioner
, the
connector must be configured to use a deterministic implementation of
TimestampExtractor
and a deterministic rotation strategy. The deterministic
timestamp extractors are Kafka records (timestamp.extractor=Record
) or
record fields (timestamp.extractor=RecordField
). The deterministic rotation
strategy configuration is rotate.interval.ms
(setting
rotate.schedule.interval.ms
is nondeterministic and will invalidate
exactly-once guarantees).
Schema Evolution
The Azure Blob Storage connector supports schema evolution and reacts to schema
changes of data according to the schema.compatibility
configuration. This
section describes how the connector reacts to schema evolution under different
values of schema.compatibility
. The schema.compatibility
can be set to
NONE
, BACKWARD
, FORWARD
and FULL
, which means NO compatibility,
BACKWARD compatibility, FORWARD compatibility and FULL compatibility
respectively.
NO Compatibility: By default, the schema.compatibility
is set to
NONE
. In this case, the connector ensures that each file written to Azure
Blob Storage has the proper schema. When the connector observes a schema
change in data, it commits the current set of files for the affected topic
partitions and writes the data with new schema in new files.
BACKWARD Compatibility: If a schema is evolved in a backward compatible
way, we can always use the latest schema to query all the data uniformly. For
example, removing fields is backward compatible change to a schema, since when
we encounter records written with the old schema that contain these fields we
can just ignore them. Adding a field with a default value is also backward
compatible.
If BACKWARD
is specified in the schema.compatibility
, the connector
keeps track of the latest schema used in writing data to Azure Blob Storage,
and if a data record with a schema version larger than current latest schema
arrives, the connector commits the current set of files and writes the data
record with new schema to new files. For data records arriving at a later time
with schema of an earlier version, the connector projects the data record to
the latest schema before writing to the same set of files in Azure Blob
Storage.
FORWARD Compatibility: If a schema is evolved in a forward compatible way,
we can always use the oldest schema to query all the data uniformly. Removing
a field that had a default value is forward compatible, since the old schema
will use the default value when the field is missing.
If FORWARD
is specified in the schema.compatibility
, the connector
projects the data to the oldest schema before writing to the same set of files
in Azure Blob Storage.
FULL Compatibility: Full compatibility means that old data can be read
with the new schema and new data can also be read with the old schema.
If FULL
is specified in the schema.compatibility
, the connector
performs the same action as BACKWARD
.
Schema evolution in the Azure Blob Storage connector works in the same way as in
the HDFS connector and Amazon
S3 Sink Connector for Confluent Platform.
Automatic Retries
The Azure Blob Storage Connector may experience problems writing to the Azure Blob
Storage bucket, due to network partitions, interruptions, or even AWS throttling
limits. In many cases, the connector will retry the request a number of times
before failing. To prevent from further overloading the network or Azure Blob
Storage service, the connector uses an exponential backoff technique to give the
network and/or service time to recover. The technique adds randomness, called
jitter, to the calculated backoff times to prevent a thundering herd, where
large numbers of requests from many tasks are submitted concurrently and
overwhelm the service. Randomness spreads out the retries from many tasks and
should reduce the overall time required to complete all outstanding requests
compared to simple exponential backoff. The goal is to spread out the requests
to Azure Blob Storage as much as possible.
The maximum number of retry attempts is dictated by the azblob.part.retries
Azure Blob Storage connector configuration property, which defaults to three
attempts. The delay for retries is dependent upon the connector’s
azblob.retry.backoff.ms
configuration property, which defaults to 200
milliseconds. The actual delay is randomized, but the maximum delay can be
calculated as a function of the number of retry attempts with
${azblob.retry.backoff.ms} * 2 ^ (retry-1)
, where retry
is the number of
attempts taken so far in the current iteration. In order to keep the maximum
delay within a reasonable duration, it is capped at 24 hours. For example, the
following table shows the possible wait times before submitting each of the
three retry attempts.
Range of backoff times for each retry using the default configuration
Retry |
Minimum Backoff (sec) |
Maximum Backoff (sec) |
Total Potential Delay from First Attempt (sec) |
1 |
0.0 |
0.2 |
0.2 |
2 |
0.0 |
0.4 |
0.6 |
3 |
0.0 |
0.8 |
1.4 |
Increasing the maximum number of retries adds more backoff:
Range of backoff times for additional retries
Retry |
Minimum Backoff (sec) |
Maximum Backoff (sec) |
Total Potential Delay from First Attempt (sec) |
4 |
0.0 |
1.6 |
3.0 |
5 |
0.0 |
3.2 |
6.2 |
6 |
0.0 |
6.4 |
12.6 |
7 |
0.0 |
12.8 |
25.4 |
8 |
0.0 |
25.6 |
51.0 |
9 |
0.0 |
51.2 |
102.2 |
10 |
0.0 |
102.4 |
204.6 |
At some point, maximum backoff time will reach saturation and will be capped at
24 hours. From the example below, all attempts starting with 20 will have
maximum backoff time as 24 hours.
Range of backoff times when reaching the cap of 24 hours
Retry |
Minimum Backoff (sec) |
Maximum Backoff (sec) |
Total Potential Delay from First Attempt (sec) |
15 |
0.0 |
3276.8 |
6553.4 |
16 |
0.0 |
6553.6 |
13107.0 |
17 |
0.0 |
13107.2 |
26214.2 |
18 |
0.0 |
26214.4 |
52428.6 |
19 |
0.0 |
52428.8 |
104857.4 |
20 |
0.0 |
86400.0 |
191257.4 |
21 |
0.0 |
86400.0 |
277657.4 |
It’s not advised to set azblob.part.retries
too high since making more
attempts after reaching a cap of 24 hours isn’t practical. You can adjust both
the azblob.part.retries
and azblob.retry.backoff.ms
connector
configuration properties to achieve the desired retry and backoff
characteristics.
Quick Start
In this quick start, the Azure Blob Storage connector is used to export data
produced by the Avro console producer to Azure Blob Storage.
Before you begin, create an Azure Blob Storage destination container and grant
write access to the user or IAM role completing these procedures. See
Create a block blob storage account
for additional information. Also if you’re using Shared Key access instead of
account keys: see Configure Azure Storage connection strings
for additional information.
By default, all resources in Azure Storage are secured and only available to the
account owner.
Install the connector through the Confluent Hub
Client.
# run from your Confluent Platform installation directory
confluent-hub install confluentinc/kafka-connect-azure-blob-storage:latest
Tip
By default, it will install the plugin into
share/confluent-hub-components
and add the directory to the plugin path.
If this is the first connector you have installed, you may need to restart
the connect server for the plugin path change to take effect. Also see Azure
Blob Storage CLI
for setup and using the CLI.
Start the services using the confluent
local.
Tip
The command syntax for the Confluent CLI development commands changed in 5.3.0.
These commands have been moved to confluent local
. For example, the syntax for confluent start
is now
confluent local services start
. For more information, see confluent local.
confluent local services start
Every service start in order, printing a message with its status.
Starting Zookeeper
Zookeeper is [UP]
Starting Kafka
Kafka is [UP]
Starting Schema Registry
Schema Registry is [UP]
Starting Kafka REST
Kafka REST is [UP]
Starting Connect
Connect is [UP]
Starting KSQL Server
KSQL Server is [UP]
Starting Control Center
Control Center is [UP]
Note
Make sure the Azure Blob Storage connector has write access to the Azure Blob
Storage container shown in azblob.container.name
and can deploy
credentials successfully.
To import a few records with a simple schema in Kafka, start the Avro console
producer as follows:
./bin/kafka-avro-console-producer --broker-list localhost:9092 --topic blob_topic \
--property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}'
Then, in the console producer, enter the following:
{"f1": "value1"}
{"f1": "value2"}
{"f1": "value3"}
{"f1": "value4"}
{"f1": "value5"}
{"f1": "value6"}
{"f1": "value7"}
{"f1": "value8"}
{"f1": "value9"}
The nine records entered are published to the Kafka topic blob_topic
in Avro format.
Create a blob.properties
file with the following contents:
name=blob-sink
connector.class=io.confluent.connect.azure.blob.AzureBlobStorageSinkConnector
tasks.max=1
topics=blob_topic
flush.size=3
azblob.account.name=your-account
azblob.account.key=your-key
azblob.container.name=confluent-kafka-connect-azure-blob-storage-testing
format.class=io.confluent.connect.azure.blob.format.avro.AvroFormat
confluent.topic.bootstrap.servers=localhost:9092
confluent.topic.replication.factor=1
Before starting the connector, make sure that the configurations in
blob.properties
are properly set to your configurations of Azure Blob
Storage. For this example, make sure that azblob.container.name
points to
your container, azblob.account.name
is set to your account, and
azblob.account.key
is set to your key. Then start the Azure Blob Storage
connector by loading its configuration with the following command.
Caution
You must include a double dash (--
) between the topic name and your flag. For more information,
see this post.
confluent local services connect connector load blob-sink --config blob.properties
{
"name": "blob-sink",
"config": {
"name": "blob-sink"
"connector.class": "io.confluent.connect.azure.blob.AzureBlobStorageSinkConnector",
"tasks.max": "1",
"topics": "blob_topic",
"flush.size": "3",
"azblob.account.name": "your-account",
"azblob.account.key": "your-key",
"azblob.container.name": "confluent-kafka-connect-azure-blob-storage-testing",
"format.class": "io.confluent.connect.azure.blob.format.avro.AvroFormat",
"confluent.topic.bootstrap.servers": "localhost:9092",
"confluent.topic.replication.factor": "1",
},
"tasks": []
}
Check that the connector started successfully. Review the Connect worker’s log
by entering the following:
confluent local services connect log
Towards the end of the log you should see that the connector starts, logs a few
messages, and then uploads data from Kafka to Azure Blob Storage.
Once the connector has ingested some records, check that the data is available
in Azure Blob Storage. Use the following Azure CLI command:
az storage blob list --container-name confluent-kafka-connect-azure-blob-storage-testing --output table
You should see three objects with keys.
topics/blob_topic/partition=0/blob_topic+0+0000000000.avro
topics/blob_topic/partition=0/blob_topic+0+0000000003.avro
topics/blob_topic/partition=0/blob_topic+0+0000000006.avro
Each file is encoded as <topic>+<kafkaPartition>+<startOffset>.<format>
.
To verify the contents, copy each file from Azure Blob Storage to your local
filesystem. Use the following Azure CLI command:
az storage blob download --container-name confluent-kafka-connect-azure-blob-storage-testing --name topics/blob_topic/partition=0/blob_topic+0+0000000000.avro --file ~/blob_topic+0+0000000000.avro
To print the records, use avro-tools-1.8.2.jar
(available in the Apache Archives):
java -jar avro-tools-1.8.2.jar tojson blob_topic+0+0000000000.avro
For the file above, you should see the following output:
{"f1":"value1"}
{"f1":"value2"}
{"f1":"value3"}
The rest of the records are contained in the other two files.
Finally, stop the Connect worker and all other Confluent services by running:
Your output should resemble:
Stopping Control Center
Control Center is [DOWN]
Stopping KSQL Server
KSQL Server is [DOWN]
Stopping Connect
Connect is [DOWN]
Stopping Kafka REST
Kafka REST is [DOWN]
Stopping Schema Registry
Schema Registry is [DOWN]
Stopping Kafka
Kafka is [DOWN]
Stopping Zookeeper
Zookeeper is [DOWN]
You can stop all services and remove any data generated during this quick start
by entering the following command:
Your output should resemble:
Stopping Control Center
Control Center is [DOWN]
Stopping KSQL Server
KSQL Server is [DOWN]
Stopping Connect
Connect is [DOWN]
Stopping Kafka REST
Kafka REST is [DOWN]
Stopping Schema Registry
Schema Registry is [DOWN]
Stopping Kafka
Kafka is [DOWN]
Stopping Zookeeper
Zookeeper is [DOWN]
Deleting: /var/folders/ty/rqbqmjv54rg_v10ykmrgd1_80000gp/T/confluent.PkQpsKfE
Exactly-once delivery
The Azure Blob Storage connector is able to provide exactly-once semantics to
consumers of the objects it exports to Azure Blob Storage, under the condition
that the connector is supplied with a deterministic partitioner.
Currently, out of the available partitioners, the default and field partitioners
are always deterministic. TimeBasedPartitioner
can be deterministic with
some configurations, discussed below. This implies that, when any of these
partitioners is used, splitting of files always happens at the same offsets for
a given set of Kafka records. These partitioners take into account flush.size
and schema.compatibility
to decide when to roll and save a new file to Azure
Blob Storage. The connector always delivers files in Azure Blob Storage that
contain the same records, even under the presence of failures. If a connector
task fails before an upload completes, the file does not become visible to Azure
Blob Storage. If, on the other hand, a failure occurs after the upload has
completed but before the corresponding offset is committed to Kafka by the
connector, then a re-upload will take place. However, such a re-upload is
transparent to the user of the Azure Blob Storage container, who at any time will
have access to the same records made eventually available by successful uploads
to Azure Blob Storage.
To guarantee exactly-once semantics with the TimeBasedPartitioner
, the
connector must be configured to use a deterministic implementation of
TimestampExtractor
and a deterministic rotation strategy. The deterministic
timestamp extractors are Kafka records (timestamp.extractor=Record
) or record
fields (timestamp.extractor=RecordField
). The deterministic rotation
strategy configuration is rotate.interval.ms
(setting
rotate.schedule.interval.ms
is nondeterministic and will invalidate
exactly-once guarantees).
Schema Evolution
The Azure Blob Storage connector supports schema evolution and reacts to schema
changes of data according to the schema.compatibility
configuration. This
section, will explain how the connector reacts to schema evolution under
different values of schema.compatibility
. The schema.compatibility
can
be set to NONE
, BACKWARD
, FORWARD
and FULL
, which means NO
compatibility, BACKWARD compatibility, FORWARD compatibility and FULL
compatibility respectively.
NO Compatibility: By default, the schema.compatibility
is set to
NONE
. In this case, the connector ensures that each file written to Azure
Blob Storage has the proper schema. When the connector observes a schema
change in data, it commits the current set of files for the affected topic
partitions and writes the data with new schema in new files.
BACKWARD Compatibility: If a schema is evolved in a backward compatible
way, the connector can always use the latest schema to query all the data
uniformly. For example, removing fields is backward compatible change to a
schema, since when the connector encounters records written with the old
schema that contain these fields the connector can just ignore them. Adding a
field with a default value is also backward compatible.
If BACKWARD
is specified in the schema.compatibility
, the connector
keeps track of the latest schema used in writing data to Azure Blob Storage,
and if a data record with a schema version larger than current latest schema
arrives, the connector commits the current set of files and writes the data
record with new schema to new files. For data records arriving at a later time
with schema of an earlier version, the connector projects the data record to
the latest schema before writing to the same set of files in Azure Blob
Storage.
FORWARD Compatibility: If a schema is evolved in a forward compatible way,
the connector can always use the oldest schema to query all the data
uniformly. Removing a field that had a default value is forward compatible,
since the old schema will use the default value when the field is missing.
If FORWARD
is specified in the schema.compatibility
, the connector
projects the data to the oldest schema before writing to the same set of files
in Azure Blob Storage.
Full Compatibility: Full compatibility means that old data can be read
with the new schema and new data can also be read with the old schema.
If FULL
is specified in the schema.compatibility
, the connector
performs the same action as BACKWARD
.
Schema evolution in the Azure Blob Storage connector works the same way as the
Schema Evolution.
Automatic Retries
The Azure Blob Storage connector may experience intermittent problems writing to
the Azure Blob Storage container. This is because of network partitioning,
interruptions, or throttling limits. In many cases, the connector retries the
request a number of times before failing. To prevent overloading the network or
Azure Blob Storage service, the connector uses an exponential backoff technique
to give the network and service time to recover. The retries occur on requests
which return HTTP status codes of greater than 500 excluding 501 and 505.
The maximum number of retry attempts is dictated by the azblob.retry.retries
Azure Blob Storage connector configuration property. This property defaults to
three retry attempts (one initial attempt and three retry attempts). The delay
for retries is dependent upon the connector’s azblob.retry.backoff.ms
configuration property, which defaults to 4000 milliseconds. The maximum delay
between retries is capped by azblob.retry.max.backoff.ms
, which defaults to
120000 milliseconds. The actual delay is calculated by taking the lower value
between ${azblob.retry.backoff.ms} * (-1 + 2 ^ (attempt-1))
and
azblob.retry.max.backoff.ms
, where attempt
is the number of attempts
taken so far in the current iteration. For example, the following table shows
the possible wait times before submitting each of the three retries:
Range of backoff times for each retry using the default configuration
Attempt |
Calculated Backoff (sec) |
Actual Backoff (sec) |
1 |
0.0 |
0.0 |
2 |
4.0 |
4.0 |
3 |
12.0 |
12.0 |
4 |
28.0 |
28.0 |
The maximum backoff time eventually reaches saturation and is capped at
azblob.retry.max.backoff.ms
. From the example below, all attempts starting
with attempt six will have the maximum backoff time of 120 seconds:
Range of backoff times when reaching the default cap of two minutes
Attempt |
Calculated Backoff (sec) |
Actual Backoff (sec) |
4 |
28.0 |
28.0 |
5 |
60.0 |
60.0 |
6 |
124.0 |
120.0 |
7 |
252.0 |
120.0 |
8 |
508.0 |
120.0 |
9 |
1020.0 |
120.0 |
10 |
2044.0 |
120.0 |
In the case of a secondary host being provided the behaviour is a little
different. After a failed attempt to your normal store.url
it will then
delay by (.1 second * random(0.8, 1.2))
before calling the
azblob.retry.secondary.host
.
It’s not advised to set azblob.retry.max.backoff.ms
too high since making
more attempts after reaching 24 hours isn’t practical. You can adjust both the
azblob.retry.retries
, azblob.retry.backoff.ms
,
azblob.retry.max.backoff.ms
and azblob.retry.secondary.host
connector
configuration properties to achieve the desired retry and backoff
characteristics.
Also the timeout of a single HTTP request can be configured by editing
azblob.connection.timeout.ms
.
Note
When transferring large amounts of data, the default TryTimeout is probably
not sufficient. You should override this value based on the bandwidth
available to the host machine and proximity to the Storage service. A good
starting point may be something like (60 seconds per MB of anticipated
payload size).
Write JSON message values into Azure Blob Storage
The example settings file is shown below:
name=blob-sink
connector.class=io.confluent.connect.azure.blob.AzureBlobStorageSinkConnector
tasks.max=1
topics=blob_topic
flush.size=100
# Required configuration
azblob.account.name=account
azblob.account.key=accountkey
# The following define the information used to validate the license stored in Kafka
confluent.license=
confluent.topic.bootstrap.servers=localhost:9092
The first few settings are common to most connectors. topics
specifies the
topics to export data from, in this case blob_topic
. The property
flush.size
specifies the number of records per partition the connector needs
to write to before completing a multiblock upload to Azure Blob Storage.
The azblob.account.name
and azblob.account.key
are your required Azure
credentials. To create a storage account, see Create a storage account. For information about license properties, see Confluent Platform license.
azblob.container.name=confluent-kafka-connect-azure-blob-storage-testing
azblob.block.size=5242880
The next settings are specific to Azure Blob Storage. A mandatory setting is the
name of your Azure Blob Storage container to host the exported Kafka records.
Another useful configuration setting is azblob.block.size
. This setting
controls the size of each block in the multiblock uploads used to upload a
single chunk of Kafka records.
storage.class=io.confluent.connect.azure.blob.storage.AzureBlobStorage
format.class=io.confluent.connect.azure.blob.format.json.JsonFormat
schema.generator.class=io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator
partitioner.class=io.confluent.connect.storage.partitioner.DefaultPartitioner
These class settings are required to specify the Azure Block Storage interface,
the output file format, which is currently
io.confluent.connect.azure.blob.format.avro.AvroFormat
,
io.confluent.connect.azure.blob.format.json.JsonFormat
or
io.confluent.connect.azure.blob.format.bytearray.ByteArrayFormat
, and the
partitioner class along with its schema generator class. When using a format
with no schema definition, it is sufficient to set the schema generator class to
its default value.
schema.compatibility=NONE
Finally, schema evolution is disabled in this example by setting
schema.compatibility
to NONE
, as explained above.
For detailed descriptions for all the available configuration options of the
Azure Blob Storage connector go to
Azure Blob Storage Sink Connector Configuration Properties.
Maximum file size exception
If a file is created that exceeds the maximum of 50000 blocks, an exception
occurs with the following error:
org.apache.kafka.connect.errors.ConnectException: Due to the Azure
constraints, you can only have a block blob of up to a maximum of 50000
blocks and you've exceeded that limit. To resolve the issue please either
decrease your flush.size to make the files smaller or increase your
azblob.block.size.
This error indicates that the file you are trying to create is greater than
50000 multiplied by your azblob.block.size
value. To resolve this issue
either decrease the flush.size
property, or increase the
azblob.block.size
property. To learn more about Azure Blob Storage’s constraints, see Scalability and performance targets for standard storage accounts.
Write raw message values into Azure Blob Storage
It is possible to use the Azure Blob Storage connector to write out the
unmodified original message values into newline-separated files in Azure Blob
Storage. To accomplish this configure Kafka Connect so it does not deserialize
any of the messages, and configure the Azure Blob Storage connector to store the
message values in a binary format in Azure Blob Storage.
The first part of the Azure Blob Storage connector configuration is similar to
other examples.
name=blob-raw-sink
connector.class=io.confluent.connect.azure.blob.AzureBlobStorageSinkConnector
tasks.max=1
topics=blob_topic
flush.size=3
The topics
setting specifies the topics you want to export data from, which
is blob_topic
in the example. The property flush.size
specifies the
number of records per partition the connector needs to write before completing a
multipart upload to Azure Blob Storage.
Next, configure container name, block size, and compression type.
azblob.container.name=confluent-kafka-connect-azure-blob-storage-testing
azblob.block.size=5242880
azblob.compression.type=gzip
The azblob.container.name
is mandatory and names your Azure Blob Storage
container where the exported Kafka records should be written. And since the Azure
Blob Storage connector uses multiblock uploads,
you can use the azblob.block.size
to control the size of each of these
continuous parts used to upload Kafka records into a single Azure Blob Storage
object. The part size affects throughput and latency, as an Azure Blob Storage
object is visible/available only after all parts are uploaded. The
azblob.compression.type
specifies that the Azure Blob Storage connector
should compress all Azure Blob Storage objects using GZIP compression, adding the
.gz
extension to any files (see below).
So far this example configuration is relatively typical of most Azure Blob
Storage connectors. Now configure the connector to read the raw message values
and write them in binary format:
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
format.class=io.confluent.connect.azure.blob.format.bytearray.ByteArrayFormat
storage.class=io.confluent.connect.azure.blob.storage.AzureBlobStorage
schema.compatibility=NONE
The value.converter
setting overrides the connector default in the
Connect worker configuration. ByteArrayConverter
is used to instruct
Connect to skip deserializing the message values and instead give the connector
the message values in their raw binary form. The format.class
setting is
used to instruct the Azure Blob Storage connector to write these binary message
values as-is into Azure Blob Storage objects. By default the message values
written to the same Azure Blob Storage object are separated by a newline
character sequence, but you can control this with the
format.bytearray.separator
setting, and you may want to consider this if
your messages might contain newlines. Also, by default the files written to Azure
Blob Storage have an extension of .bin
(before compression, if enabled), or
you can use the format.bytearray.extension
setting to change the
pre-compression filename extension.
Next, you need to decide how you want to partition the consumed messages in Azure
Blob Storage objects. You have a few options, including the default partitioner
that preserves the same partitions as in Kafka:
partitioner.class=io.confluent.connect.storage.partitioner.DefaultPartitioner
Or, you could partition using the timestamp of the Kafka messages.
partitioner.class=io.confluent.connect.storage.partitioner.TimeBasedPartitioner
timestamp.extractor=Record
Or, you can use the timestamp that the Azure Blob Storage connector processes each message.
partitioner.class=io.confluent.connect.storage.partitioner.TimeBasedPartitioner
timestamp.extractor=Wallclock
Custom partitioners are always an option, too. Just be aware that since the
record value is an opaque binary value, Connect cannot extract timestamps
from fields using the RecordField
option.
The Azure Blob Storage connector configuration outlined above results in
newline-delimited gzipped objects in Azure Blob Storage with .bin.gz
.