Backup and Restore Azure Blob Storage Source Connector for Confluent Platform¶
The Kafka Connect Backup and Restore Azure Blob Storage Source connector provides the capability to read data exported to Azure Blob Storage by the Kafka Connect Azure Blob Storage Sink connector and publish it back to a Kafka topic. Depending on the format and partitioner used to write the data to Azure Blob Storage, this connector can write to the destination topic using the same partitions as the original messages exported to Azure Blob Storage and maintain the same message order. The connector selects folders based on the partitioner configuration and reads each folders Azure Blob objects in alphabetical order. Each record is read based on the format selected. Configuration is designed to mirror the Kafka Connect Azure Blob Storage Sink Connector and should be possible to create source connector configs with only minor changes to the original sink configuration.
Important
The recommended practice is to create topics manually in the destination Kafka cluster with the correct number of partitions before running the source connector. If the topics don’t exist, Connect relies on Auto Topic Creation and the number of partitions are based upon the Kafka broker defaults. If there are more partitions in the destination cluster, the extra partitions aren’t used. If there are fewer partitions in the destination cluster, the connector task continuously checks for the presence of the required partition and does not produce records but remains in the running state.
Be aware of the following connector actions:
- The connector ignores any Azure Blob object with a name that doesn’t start with the configured
topics
directory. This name is “/topics/” by default. - The connector ignores any Azure Blob object that is below the
topics
directory but has an extension that doesn’t match the configured format. For example, a JSON file is ignored whenformat.class
is set for Avro files. - The connector stops and fails if the Azure Blob object’s name doesn’t match the expected format or is in an unexpected location.
Avoid the following configuration issues:
- A file with the correct extension and a valid name format like
<topic>+<partition>+<offset>.<extension>
, placed in a folder of a different topic is read normally and written to the topic defined by its filename. - If a field partitioner is incorrectly configured to match the expected folder, it can break the ordering guarantees of Azure Blob Storage sink that used a deterministic sink partitioner.
Features¶
The Backup and Restore Azure Blob Storage Source connector includes the following features:
- Pluggable Data Format with or without Schema: Out of the box, the connector supports reading data from Azure Blob Storage in Avro and JSON format. Besides records with schema, the connector supports importing plain JSON records without schema in text files, one record per line. In general, the connector may accept any format that provides an implementation of the Format interface.
- At Least Once Delivery: In the event of a task failure the connector guarantees no messages are lost, although the last few messages may be processed again.
- Matching Source Partitioning: Messages are put back on the same Kafka partition for that topic when it was written.
- Source Partition Ordering: If the
DefaultPartitioner
or aTimeBasedPartitioner
is used, the connector reads records back in time order in each topic-source partition. If aFieldPartitioner
is used, it isn’t possible to guarantee the order of these messages. - Pluggable Partitioner: The connector comes out of the box with partitioners that support default partitioning based on Kafka partitions, field partitioning, and time-based partitioning in days or hours. You may implement your own partitioners by extending the
Partitioner
class. Additionally, you can customize time based partitioning by extending theTimeBasedPartitioner
class.
Important
- All partitioners will notice new topic folders with the built-in task
reconfiguration thread. The
DefaultPartitioner
detects new partition folders. TheFieldPartitioner
notices new folders for the fields specified. However, theTimeBasedPartititoner
doesn’t detect new files for a new time period. - Be careful when both the Azure Blob Storage Sink connector and the Azure Blob Storage Source connector use the same Kafka cluster, since this results in the source connector writing to the same topic being consumed by the sink connector. This causes a continuous feedback loop that creates an ever-increasing number of duplicate Kafka records and Azure Blob Storage objects. You can avoid this feedback loop by writing to a different topic than the one being consumed by the sink connector. Use the RegexRouter with the source connector to change the names of the topics where the records are written. Or, use the ExtractTopic SMT with the source connector to change the topic name based upon a field in each message.
Limitations¶
For the TimeBasedPartitioner
, the capacity to scale the connector across
various time ranges is limited in Backup and Restore mode. Currently, the
connector does not support processing data that spans several years.
Prerequisites¶
At least once delivery¶
In the event of a task failure the connector guarantees no messages are lost, although the last few messages may be processed again.
Multiple tasks¶
The Backup and Restore Azure Blob Storage Source connector supports running one
or more tasks. You can specify the number of tasks in the tasks.max
configuration parameter. Multiple tasks may improve performance when moving a
large amount of data.
Pluggable data format with or without schema¶
Out of the box, the connector supports reading data from Azure Blob Storage in Avro and JSON format. Besides records with schema, the connector supports importing plain JSON records without schema in text files, one record per line. In general, the connector may accept any format that provides an implementation of the Format interface.
Matching source partitioning¶
Messages are put back on the same Kafka partition for the topic when it was written.
Source partition ordering¶
If the DefaultPartitioner
or a TimeBasedPartitioner
is used, the
connector reads records back in time order in each topic-source partition. If a
FieldPartitioner
is used, it isn’t possible to guarantee the order of these
messages.
Pluggable partitioner¶
The connector comes out of the box with partitioners that support default
partitioning based on Kafka partitions, field partitioning, and time-based
partitioning in days or hours. You may implement your own partitioners by
extending the Partitioner
class. Additionally, you can customize time-based
partitioning by extending the TimeBasedPartitioner
class.
All partitioners will notice new topic folders with the inbuilt task
reconfiguration thread. The DefaultPartitioner
detects new partition
folders. The FieldPartitioner
notices new folders for the fields specified.
However, the TimeBasedPartititoner
doesn’t detect new files for a new time
period.
Be careful when both the Connect Azure Blob Storage Sink connector and the Backup and Restore Azure Blob Storage Source connector use the same Kafka cluster, since this results in the source connector writing to the same topic being consumed by the sink connector. This causes a continuous feedback loop that creates an ever-increasing number of duplicate Kafka records and Azure Blob Storage objects. You can avoid this feedback loop by writing to a different topic than the one being consumed by the sink connector. You can use either the RegexRouter with the source connector to change the names of the topics where the records are written or the ExtractTopic SMT with the source connector to change the topic name based upon a field in each message.
Tip
By default, connectors inherit the partitioner used for the Kafka topic. You
can create a custom partitioner for a connector which you must place in the
connector’s /lib
folder.
You can also put partitioners in a common location of choice. If you choose
this option, you must add a symlink to the location from each connector’s
/lib
folder. For example, you would place a custom partitioner in the
path share/confluent-hub-components/partitioners
and then add the symlink
share/confluent-hub-components/kafka-connect-s3/lib/partitioners -> ../../partitioners
.
License¶
You can use this connector for a 30-day trial period without a license key.
After 30 days, you must purchase a connector subscription which includes Confluent enterprise license keys to subscribers, along with enterprise-level support for Confluent Platform and your connectors. If you are a subscriber, you can contact Confluent Support at support@confluent.io for more information.
See Confluent Platform license for license properties and Confluent license properties for information about the license topic.
Configuration Properties¶
For a complete list of configuration properties for this connector, see Configuration Reference for Azure Blob Storage Source Connector for Confluent Platform.
Install the Backup and Restore Azure Blob Storage Source Connector¶
You can install this connector by using the confluent connect plugin install command, or by manually downloading the ZIP file.
Prerequisites¶
You must install the connector on every machine where Connect will run.
Kafka Broker: Confluent Platform 3.3.0 or later, or Kafka 0.11.0 or later.
Connect: Confluent Platform 4.0.0 or later, or Kafka 1.0.0 or later.
Java 1.8.
An installation of the latest (
latest
) connector version.To install the
latest
connector version, navigate to your Confluent Platform installation directory and run the following command:confluent connect plugin install confluentinc/kafka-connect-azure-blob-storage-source:latest
You can install a specific version by replacing
latest
with a version number as shown in the following example:confluent connect plugin install confluentinc/kafka-connect-azure-blob-storage-source:2.6.0
Install the connector manually¶
Download and extract the ZIP file for your connector and then follow the manual connector installation instructions.
Quick Start¶
The following quick start uses the AzureBlobStorageSinkConnector
to write an Avro file from the Kafka topic named blob_topic
to Azure Blob Storage.
Also, the AzureBlobStorageSinkConnector
should be completely stopped before starting the AzureBlobStorageSourceConnector
to avoid creating
source/sink cycle.
Then, the AzureBlobStorageSourceConnector
loads that Avro file from Azure Blob Storage to the Kafka topic named copy_of_blob_topic
.
For an example of how to get Kafka Connect connected to Confluent Cloud, see Connect Self-Managed Kafka Connect to Confluent Cloud.
Follow the instructions from Connect Azure Blob Storage Sink connector <https://docs.confluent.io/kafka-connect-azure-blob-storage/current/source/index.html#quick-start> to set up the data to use below.
Install the connector through the Confluent Hub Client.
# run from your Confluent Platform installation directory confluent connect plugin install confluentinc/kafka-connect-azure-blob-storage-source:latest
Tip
By default, the connector will install the plugin into the
share/confluent-hub-components
directory and add the directory to the plugin path. For the plugin path change to take effect, you must restart the Connect worker.
Property-based example¶
Create a
quickstart-azureblobstoragesource.properties
file with the following contents. This file should be placed under Confluent Platform installation directory. This configuration is used typically along with standalone workers.name=azure-blob-storage-source tasks.max=1 connector.class=io.confluent.connect.azure.blob.storage.AzureBlobStorageSourceConnector # enter your Azure blob account, key and container name here azblob.account.name=<your-account> azblob.account.key=<your-key> azblob.container.name=<container-name> format.class=io.confluent.connect.azure.blob.storage.format.avro.AvroFormat confluent.topic.bootstrap.servers=localhost:9092 confluent.topic.replication.factor=1
Tip
The following define the Confluent license stored in Kafka, so we need the Kafka bootstrap addresses.
replication.factor
may not be larger than the number of Kafka brokers in the destination cluster, so here we set this to ‘1’ for demonstration purposes. Always use at least ‘3’ in production configurations.Edit the
quickstart-azureblobstoragesource.properties
to add the following properties:transforms=AddPrefix transforms.AddPrefix.type=org.apache.kafka.connect.transforms.RegexRouter transforms.AddPrefix.regex=.* transforms.AddPrefix.replacement=copy_of_$0
Important
Adding this renames the output of topic of the messages to
copy_of_blob_topic
. This prevents a continuous feedback loop of messages.Load the Backup and Restore Azure Blob Storage Source connector.
Caution
You must include a double dash (
--
) between the connector name and your flag. For more information, see this post.confluent local services connect connector load azblobstorage-source --config quickstart-azureblobstoragesource.properties
Important
Don’t use the Confluent CLI in production environments.
Confirm that the connector is in a
RUNNING
state.confluent local services connect connector status azureblobstorage-source
Confirm that the messages are being sent to Kafka.
kafka-avro-console-consumer \ --bootstrap-server localhost:9092 \ --property schema.registry.url=http://localhost:8081 \ --topic copy_of_blob_topic \ --from-beginning | jq '.'
The response should be 9 records as follows.
{"f1": "value1"} {"f1": "value2"} {"f1": "value3"} {"f1": "value4"} {"f1": "value5"} {"f1": "value6"} {"f1": "value7"} {"f1": "value8"} {"f1": "value9"}
REST-based example¶
Use this setting with distributed workers. Write the following JSON to config.json, configure all of the required values, and use the following command to post the configuration to one of the distributed connect workers. Check here for more information about the Kafka Connect Kafka Connect REST Interface
{ "name" : "AzureBlobStorageSourceConnector", "config" : { "connector.class" : "io.confluent.connect.azure.blob.storage.AzureBlobStorageSourceConnector", "tasks.max" : "1", "azblob.account.name" : "your-account", "azblob.account.key" : "your-key", "azblob.container.name" : "confluent-kafka-connect-azBlobStorage-testing", "format.class" : "io.confluent.connect.azure.blob.storage.format.avro.AvroFormat", "confluent.topic.bootstrap.servers" : "localhost:9092", "confluent.topic.replication.factor" : "1", "transforms" : "AddPrefix", "transforms.AddPrefix.type" : "org.apache.kafka.connect.transforms.RegexRouter", "transforms.AddPrefix.regex" : ".*", "transforms.AddPrefix.replacement" : "copy_of_$0" } }
Note
Change the
confluent.topic.bootstrap.servers
property to include your broker address(es), and change theconfluent.topic.replication.factor
to 3 for staging or production use.Use curl to post a configuration to one of the Kafka Connect Workers. Change
http://localhost:8083/
to the endpoint of one of your Kafka Connect worker(s).curl -s -X POST -H 'Content-Type: application/json' --data @config.json http://localhost:8083/connectors
Use the following command to update the configuration of existing connector.
curl -s -X PUT -H 'Content-Type: application/json' --data @config.json http://localhost:8083/connectors/AzureBlobStorageSourceConnector/config
To consume records written by connector to the configured Kafka topic, run the following command:
kafka-avro-console-consumer --bootstrap-server localhost:9092 --property schema.registry.url=http://localhost:8081 --topic copy_of_blob_topic --from-beginning
Backup and Restore Azure Blob Storage Source Connector Partitions¶
The Backup and Restore Azure Blob Storage Source connector’s partitioner determines how records read from Azure Blob Storage objects are pushed into a Kafka topic.
The partitioner is specified in the connector configuration with the partitioner.class
configuration property.
The Backup and Restore Azure Blob Storage Source connector comes with the following partitioner:
- Default Partitioner: The
io.confluent.connect.storage.partitioner.DefaultPartitioner
reads records from each Azure Blob Storage objects with names that include the Kafka topic and push it to the same topic partitions as in Kafka. The<encodedPartition>
is always<topicName>/partition=<kafkaPartition>
, resulting in Azure Blob Storage object names such as<prefix>/<topic>/partition=<kafkaPartition>/<topic>+<kafkaPartition>+<startOffset>.<format>
. - Field Partitioner: The
io.confluent.connect.storage.partitioner.FieldPartitioner
determines the partition from the field within each each record identified by the connector’spartition.field.name
configuration property, which has no default. This partitioner requiresSTRUCT
record type values. The<encodedPartition>
is always<topicName>/<fieldName>=<fieldValue>
, resulting in Azure Blob Storage object names of the form<prefix>/<topic>/<fieldName>=<fieldValue>/<topic>+<kafkaPartition>+<startOffset>.<format>
. - Time Based Partitioner: The
io.confluent.connect.storage.partitioner.TimeBasedPartitioner
determines the partition from the year, month, day, hour, minutes, and/or seconds. This partitioner requires the following connector configuration properties:- The
path.format
configuration property specifies the pattern used for the<encodedPartition>
path of the Azure Blob Storage object. For example, whenpath.format='year'=YYYY/'month'=MM/'day'=dd/'hour'=HH
, Azure Blob Storage object names will have the form<prefix>/<topic>/year=YYYY/month=MM/day=dd/hour=HH/<topic>+<kafkaPartition>+<startOffset>.<format>
. - The
partition.duration.ms
configuration property defines the maximum granularity of the Azure Blob Storage objects within a single encoded partition directory. For example, settingpartition.duration.ms=600000
(10 minutes) will result in each Azure Blob Storage object in that directory having no more than 10 minutes of records. - The
locale
configuration property specifies the JDK’s locale used for formatting dates and times. For example, useen-US
for US English,en-GB
for UK English, andfr-FR
for French (in France). These may vary by Java version; see the available locales. - The
timezone
configuration property specifies the current timezone in which the dates and times will be treated. Use standard short names for timezones such asUTC
or (without daylight savings)PST
,EST
, andECT
, or longer standard names such asAmerica/Los_Angeles
,America/New_York
, andEurope/Paris
. These may vary by Java version; see the available timezones within each locale, such as those within the “en_US” locale. - The
timestamp.extractor
configuration property determines how to obtain a timestamp from each record. Values can includeWallclock
(the default) to use the system time when the record is processed,Record
to use the timestamp of the Kafka record denoting when it was produced or stored by the broker,RecordField
to extract the timestamp from one of the fields in the record’s value as specified by thetimestamp.field
configuration property.
- The
- Daily Partitioner: The
io.confluent.connect.storage.partitioner.DailyPartitioner
is equivalent to the TimeBasedPartitioner withpath.format='year'=YYYY/'month'=MM/'day'=dd
andpartition.duration.ms=86400000
(one day, for one Azure Blob Storage object in each daily directory). This partitioner always reads from Azure Blob Storage object names of the form<prefix>/<topic>/year=YYYY/month=MM/day=dd/<topic>+<kafkaPartition>+<startOffset>.<format>
. This partitioner requires the following connector configuration properties:- The
locale
configuration property specifies the JDK’s locale used for formatting dates and times. For example, useen-US
for US English,en-GB
for UK English, andfr-FR
for French (in France). These may vary by Java version; see the available locales. - The
timezone
configuration property specifies the current timezone in which the dates and times will be treated. Use standard short names for timezones such asUTC
or (without daylight savings)PST
,EST
, andECT
, or longer standard names such asAmerica/Los_Angeles
,America/New_York
, andEurope/Paris
. These may vary by Java version; see the available timezones within each locale, such as those within the “en_US” locale. - The
timestamp.extractor
configuration property determines how to obtain a timestamp from each record. Values can includeWallclock
(the default) to use the system time when the record is processed,Record
to use the timestamp of the Kafka record denoting when it was produced or stored by the broker,RecordField
to extract the timestamp from one of the fields in the record’s value as specified by thetimestamp.field
configuration property.
- The
- Hourly Partitioner: The
io.confluent.connect.storage.partitioner.HourlyPartitioner
is equivalent to the TimeBasedPartitioner withpath.format='year'=YYYY/'month'=MM/'day'=dd/'hour'=HH
andpartition.duration.ms=3600000
(one hour, for one Azure Blob Storage object in each hourly directory). This partitioner always reads from Azure Blob Storage object names of the form<prefix>/<topic>/year=YYYY/month=MM/day=dd/hour=HH/<topic>+<kafkaPartition>+<startOffset>.<format>
. This partitioner requires the following connector configuration properties:- The
locale
configuration property specifies the JDK’s locale used for formatting dates and times. For example, useen-US
for US English,en-GB
for UK English,fr-FR
for French (in France). These may vary by Java version; see the available locales. - The
timezone
configuration property specifies the current timezone in which the dates and times will be treated. Use standard short names for timezones such asUTC
or (without daylight savings)PST
,EST
, andECT
, or longer standard names such asAmerica/Los_Angeles
,America/New_York
, andEurope/Paris
. These may vary by Java version; see the available timezones within each locale, such as those within the “en_US” locale. - The
timestamp.extractor
configuration property determines how to obtain a timestamp from each record. Values can includeWallclock
(the default) to use the system time when the record is processed,Record
to use the timestamp of the Kafka record denoting when it was produced or stored by the broker,RecordField
to extract the timestamp from one of the fields in the record’s value as specified by thetimestamp.field
configuration property.
- The
You can also choose to use a custom partitioner by implementing the
io.confluent.connect.storage.partitioner.Partitioner
interface, packaging your implementation into a JAR file,
and then:
- Place the JAR file into the
share/java/kafka-connect-azure-blob-storage
directory of your Confluent Platform installation on each worker node. - Restart all of the Connect worker nodes.
- Configure Backup and Restore Azure Blob Storage Source connectors to use your fully-qualified partitioner class name.
Backup and Restore Azure Blob Storage Source Connector Data Formats¶
Backup and Restore Azure Blob Storage Source connector supports several data formats:
- Avro Format: For supporting Avro Format. You must configure the
format.class
=io.confluent.connect.azure.blob.storage.format.avro.AvroFormat
. - JSON Format: For supporting JSON Format. You must configure the
format.class
=io.confluent.connect.azure.blob.storage.format.json.JsonFormat
. - Raw Bytes Format: For supporting Raw Bytes Format. You must configure the
format.class
=io.confluent.connect.azure.blob.storage.format.bytearray.ByteArrayFormat
.
Troubleshooting Connector and Task Failures¶
Stack Trace¶
You can use the Connect Kafka Connect REST Interface to check the
status of the connectors and tasks. If a task or connector has failed, the
trace
field will include a reason and a stack trace.
Error Handling¶
The following behavior.on.error
configuration properties set how the connector handles errors.
fail
: The connector stops processing when an error occurs. The full batch of records will not be sent to Kafka if any record in the batch is corrupted.ignore
: The corrupted record is ignored. The connector continues processing the next record. For Avro, the connector ignores the file containing a corrupted record and continues processing records for the next file.log
: Logs an error message and continues processing the next record. For Avro, the connector ignores the file containing a corrupted record and continues processing records for the next file.
Note
The connector always ignores a file which is not in <topic>+<partition>+<offset>.<extension>
format.